127 lines
4.2 KiB
Python
127 lines
4.2 KiB
Python
import inspect
|
|
import sys
|
|
import types
|
|
from logging import Filter, Logger, LoggerAdapter, getLogger
|
|
from typing import Any, List, Optional, Set, Tuple, Union
|
|
|
|
from .functions import find_class
|
|
|
|
# nom du module de fonction built-in
|
|
BUILTIN_MODULE = 'builtins'
|
|
|
|
# tag pour l'alimentation
|
|
TAG_DATA_FEED = 'alimentation'
|
|
|
|
# tag pour la performance
|
|
TAG_PERF = 'performance'
|
|
|
|
# clé du dictionnaire 'kwargs' (standard pour 'logging')
|
|
KEY_EXTRA = 'extra'
|
|
|
|
# clé du dictionnaire 'extra' (le type attendu est une liste ou un tuple)
|
|
KEY_EXTRA_TAGS = 'tags'
|
|
|
|
# cache des adaptateurs
|
|
cache_adapters = {}
|
|
|
|
|
|
def __get_full_name(module_name: Optional[str] = None, cls_name: Optional[str] = None) -> Optional[str]:
|
|
""" renvoie le nom complet à partir du nom de module et du nom de classe """
|
|
|
|
if module_name or cls_name:
|
|
return f'{module_name}.{cls_name}' if module_name and cls_name else cls_name or module_name
|
|
return None
|
|
|
|
|
|
def get_logger_name(value: Any = None) -> Optional[str]:
|
|
"""
|
|
Renvoie un nom de logger pour la valeur donnée :
|
|
- None ou chaîne vide : None
|
|
- chaîne non vide : la valeur initiale
|
|
- un objet sans module ou de module BUILTIN_MODULE : None (utiliser une chaîne pour forcer le nom)
|
|
- classe, instance de classe, fonction ou méthode : <module>.<nom qualifié de la classe> si possible, sinon <module>
|
|
|
|
:param valeur: valeur initiale
|
|
:type valeur: Any, optional
|
|
|
|
:return: nom de logger
|
|
:rtype: str
|
|
"""
|
|
if value is None or isinstance(value, str):
|
|
return value or None
|
|
|
|
module = inspect.getmodule(value)
|
|
module_name = module.__name__ if module else None
|
|
if not module_name or module_name == BUILTIN_MODULE:
|
|
return None
|
|
|
|
if inspect.isclass(value):
|
|
return __get_full_name(module_name, value.__qualname__)
|
|
|
|
if isinstance(value, (
|
|
types.BuiltinFunctionType,
|
|
types.BuiltinMethodType,
|
|
types.ClassMethodDescriptorType,
|
|
types.FunctionType,
|
|
types.MethodDescriptorType,
|
|
types.MethodType,
|
|
)):
|
|
cls = find_class(value)
|
|
return __get_full_name(module_name, cls.__qualname__ if cls else None)
|
|
|
|
return __get_full_name(module_name, value.__class__.__qualname__)
|
|
|
|
|
|
def get_logger(value: Any = None, tags: Union[str, List[str], Set[str], Tuple[str]] = None) -> Union[Logger, LoggerAdapter]:
|
|
"""
|
|
Renvoie un logger standard avec un nom obtenu par 'get_logger_name'
|
|
|
|
:param valeur: valeur initiale
|
|
:type valeur: Any, optional
|
|
|
|
:param tags: tag(s) à ajouter dans le contexte de log
|
|
:type tags: Union[str, List[str], Set[str], Tuple[str]], optional
|
|
|
|
:raises TypeError: type inattendu
|
|
|
|
:return: logger
|
|
:rtype: class:`logging.Logger` ou class:`logging.LoggerAdapter`
|
|
"""
|
|
logger_name = get_logger_name(value)
|
|
if not tags:
|
|
return getLogger(logger_name)
|
|
|
|
adapters = cache_adapters.setdefault(tags if isinstance(tags, str) else frozenset(tags), {})
|
|
return adapters.get(logger_name) or adapters.setdefault(logger_name, TagLoggerAdapter(getLogger(logger_name), {KEY_EXTRA_TAGS: tags}))
|
|
|
|
|
|
class TagLoggerAdapter(LoggerAdapter):
|
|
"""
|
|
Adaptateur qui ajout si nécessaire des tags.
|
|
"""
|
|
|
|
def __init__(self, logger, extra):
|
|
super().__init__(logger, extra)
|
|
if not isinstance(self.extra, dict):
|
|
self.extra = {}
|
|
tags = self.extra.setdefault(KEY_EXTRA_TAGS, frozenset())
|
|
if not isinstance(tags, frozenset):
|
|
self.extra.update({KEY_EXTRA_TAGS: frozenset(tags) if isinstance(tags, (list, set, tuple)) else frozenset([tags])})
|
|
|
|
def process(self, msg, kwargs):
|
|
""" ajoute les tags au contexte si nécessaire """
|
|
tags = self.extra.get(KEY_EXTRA_TAGS)
|
|
if tags:
|
|
kw_extra = kwargs.setdefault(KEY_EXTRA, {})
|
|
kw_tags = kw_extra.setdefault(KEY_EXTRA_TAGS, set())
|
|
if not isinstance(kw_tags, (list, tuple, set)):
|
|
kw_tags = set(kw_tags)
|
|
kw_extra.update({KEY_EXTRA_TAGS: kw_tags})
|
|
for tag in tags:
|
|
if isinstance(kw_tags, set):
|
|
kw_tags.add(tag)
|
|
elif tag not in kw_tags:
|
|
kw_tags = set(*kw_tags, tag)
|
|
kw_extra.update({KEY_EXTRA_TAGS: kw_tags})
|
|
return msg, kwargs
|