import functools import inspect import logging import os import time from enum import Enum, auto from logging import Logger, LoggerAdapter from types import (BuiltinFunctionType, BuiltinMethodType, ClassMethodDescriptorType, FunctionType, MethodDescriptorType, MethodType) from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from django.db import connections from .logging import TAG_DATA_FEED, TAG_PERF, get_logger logger = get_logger(__name__) # attribut de classe : logger CLASS_ATTR_LOGGER = 'logger' # clé du dictionnaire contexte : désactivé CTX_KEY_DISABLED = 'disabled' # clé du dictionnaire contexte : logger CTX_KEY_LOGGER = 'logger' # attribut de décorateur : contexte DECORATOR_ATTR_CTX = 'decorator_ctx' # attribut de décorateur : clé DECORATOR_ATTR_KEY = 'decorator_key' # attribut de décorateur : cible (classe, fonction) DECORATOR_ATTR_TARGET = 'decorator_target' # types de fonctions FUNCTION_TYPES = ( BuiltinFunctionType, BuiltinMethodType, ClassMethodDescriptorType, FunctionType, MethodDescriptorType, MethodType, ) def _get_query_count() -> int: """ renvoie le nombre total de requêtes (il peut y avoir plusieurs connexions) """ return sum(len(c.queries) for c in connections.all()) class OnConflict(Enum): """ choix de gestion de conflit quand le même décorateur est déjà utilisé """ # laisse l'ancien décorateur SKIP = auto() # remplace l'ancien décorateur REPLACE = auto() # ajoute à l'ancien décorateur STACK = auto() # amélioration possible : et s'il y a plusieurs décorateurs de même clé ? def _find_former_decorator(key: str, target: Callable) -> Tuple[Optional[Callable], Optional[Callable]]: """ Trouve le décorateur de même clé si la cible est déjà décorée. Un décorateur désactivé est ignoré. note : ne fonctionne pas correctement s'il existe un décorateur intermédiaire sans attribut DECORATOR_ATTR_TARGET :param key: clé de décorateur :type key: str :param target: fonction potentiellement décorée :type target: Callable :return: ancien décorateur + ancienne cible :rtype: Tuple[Callable, Callable] """ current = target decorator_key = getattr(current, DECORATOR_ATTR_KEY, None) decorator_target = getattr(current, DECORATOR_ATTR_TARGET, None) while decorator_key and decorator_key != key and decorator_target: current = decorator_target decorator_key = getattr(current, DECORATOR_ATTR_KEY, None) decorator_target = getattr(current, DECORATOR_ATTR_TARGET, None) # les décorateurs désactivés ne comptent pas if decorator_key == key and not getattr(current, DECORATOR_ATTR_CTX, {}).get(CTX_KEY_DISABLED, False): return current, decorator_target return None, None def _create_decorator_from_action( action: Callable, decorator_key: str, on_conflict: OnConflict = OnConflict.SKIP): """ Fonction d'ordre supérieur qui construit un décorateur de fonction/méthode à partir des paramètres. Ce décorateur exécute l'action fournie. Un contexte (dictionnaire) est passé à l'appel. Il contient : - l'état désactivé : clé CTX_KEY_DISABLED optionnelle :param action: action à exécuter :type action: Callable[[dict, Callable[P, R], P.args, P.kwargs], R] :param decorator_key: clé correspondant au décorateur, permet de gérer les conflits :type decorator_key: str, optional :param on_conflict: stratégie de gestion de conflit quand le décorateur est déjà utilisé, defaults to OnConflict.SKIP :type on_conflict: class:`OnConflict` """ def inner(func): if on_conflict is not OnConflict.STACK: former_decorator, former_target = _find_former_decorator(decorator_key, func) if former_decorator: if on_conflict is OnConflict.SKIP: # pas de nouveau décorateur return func if on_conflict is OnConflict.REPLACE: if former_decorator is func and former_target: # ancien décorateur déjà fourni : on le remplace vraiment func = former_target else: # ancien décorateur déjà décoré : on le désactive former_ctx = getattr(former_decorator, DECORATOR_ATTR_CTX, None) if former_ctx is None: former_ctx = {} getattr(former_decorator, DECORATOR_ATTR_CTX, former_ctx) former_ctx.update({CTX_KEY_DISABLED: True}) ctx = {} @functools.wraps(func) def wrapper(*args, **kwargs): if ctx.get(CTX_KEY_DISABLED, False): # désactivé : simple appel de la fonction d'origine return func(*args, **kwargs) return action(ctx, func, *args, **kwargs) setattr(wrapper, DECORATOR_ATTR_CTX, ctx) setattr(wrapper, DECORATOR_ATTR_KEY, decorator_key) setattr(wrapper, DECORATOR_ATTR_TARGET, func) return wrapper return inner def execution_time( level: int = logging.DEBUG, logger_name: Any = None, logger_factory: Callable[[Any], Union[Logger, LoggerAdapter]] = None, on_conflict: OnConflict = OnConflict.SKIP, warn_after: int = None): """ Décorateur de fonction pour tracer le temps d'exécution (au niveau avec un logger de performances). Pour une classe, le décorateur s'applique aux fonctions/méthodes qui passent le filtre. :param level: niveau de base du logger, defaults to logging.DEBUG :type level: int :param logger_name: valeur pour déterminer le nom du logger, ignorée si 'logger_factory' est renseigné, defaults to None :type logger_name: str, optional :param logger_factory: fonction de création du logger à partir de la fonction annotée, annule 'logger_name', defaults to None. :type logger_factory: Callable[[Any], Union[Logger, LoggerAdapter]], optional :param on_conflict: stratégie de gestion de conflit quand le décorateur est déjà utilisé, defaults to OnConflict.SKIP :type on_conflict: class:`OnConflict` :param warn_after: durée (en ms) au-delà de laquelle on souhaite un log d'avertissement, defaults to None :type warn_after: int, optional :return: résultat de la fonction d'origine """ def action(ctx: Dict, func, *args, **kwargs): if CTX_KEY_LOGGER not in ctx: # initialisation du logger, pas toujours possible avant le premier appel ctx.update({CTX_KEY_LOGGER: logger_factory(func) if logger_factory else get_logger(logger_name or func, TAG_PERF)}) temps_debut = time.time() try: return func(*args, **kwargs) finally: try: temps_ecoule = round((time.time() - temps_debut) * 1000) log_level = logging.WARNING if isinstance(warn_after, int) and temps_ecoule > warn_after else level ctx[CTX_KEY_LOGGER].log(log_level, "'%s' exécuté en %d ms", func.__name__, temps_ecoule) except Exception: logger.exception('impossible de tracer la durée') return _create_decorator_from_action(action, inspect.currentframe().f_code.co_name, on_conflict=on_conflict) def query_count( level: int = logging.DEBUG, logger_name: Any = None, logger_factory: Callable[[Any], Union[Logger, LoggerAdapter]] = None, on_conflict: OnConflict = OnConflict.SKIP, warn_after: int = None): """ Décorateur de fonction pour tracer le nombre de requêtes (au niveau avec un logger de performances). Pour une classe, le décorateur s'applique aux fonctions/méthodes qui passent le filtre. :param level: niveau de base du logger, defaults to logging.DEBUG :type level: int :param logger_name: valeur pour déterminer le nom du logger, ignorée si 'logger_factory' est renseigné, defaults to None :type logger_name: str, optional :param logger_factory: fonction de création du logger à partir de la fonction annotée, annule 'logger_name', defaults to None. :type logger_factory: Callable[[Any], Union[Logger, LoggerAdapter]], optional :param on_conflict: stratégie de gestion de conflit quand le décorateur est déjà utilisé, defaults to OnConflict.SKIP :type on_conflict: class:`OnConflict` :param warn_after: nombre de requêtes au-delà duquel on souhaite un log d'avertissement, defaults to None :type warn_after: int, optional :return: résultat de la fonction d'origine """ def action(ctx: Dict, func, *args, **kwargs): if CTX_KEY_LOGGER not in ctx: # initialisation du logger, pas toujours possible avant le premier appel ctx.update({CTX_KEY_LOGGER: logger_factory(func) if logger_factory else get_logger(logger_name or func, TAG_PERF)}) nb_debut = _get_query_count() try: return func(*args, **kwargs) finally: try: nb_requetes = _get_query_count() - nb_debut log_level = logging.WARNING if isinstance(warn_after, int) and nb_requetes > warn_after else level ctx[CTX_KEY_LOGGER].log(log_level, "'%s' a exécuté %d requête(s)", func.__name__, nb_requetes) except Exception: logger.exception('impossible de tracer le nombre de requêtes') return _create_decorator_from_action(action, inspect.currentframe().f_code.co_name, on_conflict=on_conflict) def class_logger(cls): """ Décorateur de classe pour stocker un logger standard dans l'attribut 'logger' (aucune gestion de conflit) """ if not inspect.isclass(cls): return cls @functools.wraps(cls, updated=()) class Wrapper(cls): decorated = 1 setattr(Wrapper, CLASS_ATTR_LOGGER, get_logger(Wrapper)) return Wrapper def decorate_functions( decorator: Callable, func_filter: Callable[[Callable], bool], factory: bool = False): """ Décorateur de classe qui applique le décorateur donné aux fonctions/méthodes (attributs d'un type de FUNCTION_TYPES) qui passent le filtre. :param decorator: décorateur à appliquer :type decorator: Callable :param func_filter: filtre permettant de sélectionner les fonctions/méthodes à décorer :type func_filter: Callable[[Callable], bool] :param factory: True indique que le décorateur est une méthode "factory" à exécuter avec la classe, defaults to False :type factory: bool """ def decorate(cls): if not inspect.isclass(cls): return cls @functools.wraps(cls, updated=()) class Wrapper(cls): decorated = 1 _decorator = decorator(Wrapper) if factory else decorator for attr_name in dir(Wrapper): # __new__ et __init__ sont complexes à décorer, à gérer au cas par cas if attr_name != '__new__' and attr_name != '__init__': value = getattr(Wrapper, attr_name) if isinstance(value, FUNCTION_TYPES) and func_filter(value): setattr(Wrapper, attr_name, _decorator(value)) return Wrapper return decorate