# -------------------------------------------------------------------------------- """ Définitions communes à plusieurs vues """ # -------------------------------------------------------------------------------- from django.db.models import Q from rest_framework.exceptions import APIException from rest_framework.permissions import SAFE_METHODS, BasePermission from ..models import Administre, Decision, Poste, SpecifiqueChoices, Administres_Pams, Postes_Pams from ..utils.decorators import (class_logger, decorate_functions, execution_time, query_count) from ..utils.permissions import (Profiles, KEY_READ, KEY_WRITE, get_profile_summary, get_adm_filter_by_lvl4_codes_fil, get_adm_filter_by_lvl4_codes_future_pcp, get_adm_filter_by_lvl4_codes_pcp, get_lvl4_org_codes_by_any_code, get_poste_filter_by_lvl4_codes_fil, get_poste_filter_by_lvl4_codes_pcp, get_queryset_org_by_user, is_truthy) from ..utils.view_predicates import viewset_functions # décorateur pour tracer le temps d'exécution d'une vue execution_time_viewset = decorate_functions( lambda cls: execution_time(warn_after=5000, logger_name=cls), viewset_functions, factory=True ) # décorateur pour tracer le nombre de requêtes d'une vue query_count_viewset = decorate_functions( lambda cls: query_count(warn_after=20, logger_name=cls), viewset_functions, factory=True ) def api_exception(status_code: int, message: str = None) -> APIException: """ Crée une APIException avec un statut personnalisé. note : inutile d'appeler cette fonction pour un code 500 car APIException(message) suffit :param status_code: code de statut HTTP :type status_code: int :param message: message de l'exception :type message: str, optional :return: exception :rtype: class:`APIException` """ return type('MyException', (APIException,), {'status_code': status_code, 'default_detail': message}) @class_logger class GestionnairePermission(BasePermission): """ Classe de gestion des droits de lecture et d'écriture d'un gestionnaire """ message = "Le gestionnaire n'a pas les droits de lecture ou d'écriture" def is_in_list_ok(self, list, list_ok): # A modif en utilisant set """ Cette fonction vérifie que tous les objets de la liste list soient dans la liste list_ok :param list: liste 1 :type list: list :param list: liste 2 :type list: list :return: booléen indiquant si les objets de la liste 1 sont dans la liste 2 :rtype: bool """ for obj in list: if obj not in list_ok: return False return True def verif_droits_adm(self, pk, codes_lvl4, is_fil, is_pcp, is_bvt): """ Cette fonction vérifie si le gestionnaire a bien le droit de modifier (mettre à jour) les attributs du ou des administres d'identifiant pk. :param pk: id sap du ou des administrés édités :type org_code: int ou list :param codes_lvl4: codes de niveau 4 des référentiels organiques liés à l'utilisateur :type codes_lvl4: Tuple[str] :param is_fil: profil filiere :type is_fil: bool :param is_pcp: profil pcp :type is_pcp: bool :param is_bvt: profil bvt :type is_bvt: bool :return: booléen pour indiqué si l'utilisateur a les droits ou non sur l'édition du ou des administrés concernés :rtype: bool """ # On récupère le ou les administrés dont l'id match avec pk (liste ou non) qs = Administre.objects.filter(a_id_sap__in=pk) if isinstance(pk, list) else Administre.objects.filter(a_id_sap=pk) list_a = list(qs.values_list('pk', flat=True)) # On récupère le ou les administrés autorisés adm_filter = None if is_fil: adm_filter = get_adm_filter_by_lvl4_codes_fil(codes_lvl4) if is_pcp: pcp_filter = get_adm_filter_by_lvl4_codes_pcp(codes_lvl4) | get_adm_filter_by_lvl4_codes_future_pcp(codes_lvl4) adm_filter = adm_filter | pcp_filter if adm_filter else pcp_filter if is_bvt: bvt_filter = Q(**{f'{Administres_Pams.Cols.REL_ADMINISTRE}__{Administre.Cols.REL_SOUS_VIVIER}': 'BVT'}) adm_filter = adm_filter | bvt_filter if adm_filter else bvt_filter list_a_ok = Administres_Pams.objects.filter(adm_filter).values_list('administre_id', flat=True) perm = self.is_in_list_ok(list_a, list_a_ok) msg = 'Administre rights verification ok\n' if perm else 'Administre rights verification not ok\n' self.logger.debug(msg) return perm def verif_droits_poste(self, pk, codes_lvl4, is_fil, is_pcp, is_bvt, is_itd): """ Cette fonction vérifie si le gestionnaire a bien le droit de modifier (mettre à jour) les attributs du ou des postes d'identifiant pk. :param pk: id sap du ou des administrés édités :type org_code: int ou list :param codes_lvl4: codes de niveau 4 des référentiels organiques liés à l'utilisateur :type codes_lvl4: Tuple[str] :param is_fil: profil filiere :type is_fil: bool :param is_pcp: profil pcp :type is_pcp: bool :param is_bvt: profil bvt :type is_bvt: bool :param is_itd: profil itd :type is_itd: bool :return: booléen pour indiqué si l'utilisateur a les droits ou non sur l'édition du ou des postes concernés :rtype: bool """ # On récupère le ou les postes dont l'id match avec pk (liste ou non) qs = Poste.objects.filter(p_id__in=pk) if isinstance(pk, list) else Poste.objects.filter(p_id=pk) list_p = list(qs.values_list('pk', flat=True)) # On créer le filtre avec le ou les postes autorisés poste_filter = None if is_fil: poste_filter = get_poste_filter_by_lvl4_codes_fil(codes_lvl4) if is_pcp: pcp_filter = get_poste_filter_by_lvl4_codes_pcp(codes_lvl4) poste_filter = poste_filter | pcp_filter if poste_filter else pcp_filter if is_itd: itd_filter = Q(**{f'{Postes_Pams.Cols.REL_POSTE}__p_specifique' : SpecifiqueChoices.ITD}) poste_filter = poste_filter | itd_filter if poste_filter else itd_filter if is_bvt: bvt_filter = Q(**{f'{Postes_Pams.Cols.REL_POSTE}__{Poste.Cols.M2M_SOUS_VIVIERS}': 'BVT'}) poste_filter = poste_filter | bvt_filter if poste_filter else bvt_filter list_p_ok = Postes_Pams.objects.filter(poste_filter).values_list('poste_id', flat=True) perm = self.is_in_list_ok(list_p, list_p_ok) msg = 'Poste rights verification ok\n' if perm else 'Poste rights verification not ok\n' self.logger.debug(msg) return perm def commun_verif(self, request, view, obj=None): """ Partie commune de la vérification des droits de lecture aux fonctions has_permission et has_object_permission """ P = Profiles user = request.user if not user or not user.is_authenticated: self.logger.debug('No user connected\n') return False is_super = user.is_superuser if is_super and str(view) in ['ReportingView']: self.logger.debug('Rights verification ok : the user is a superuser\n') return True summary = get_profile_summary(user) profiles = summary.profiles r_profiles = profiles.get(KEY_READ, ()) w_profiles = profiles.get(KEY_WRITE, ()) if not profiles: self.logger.debug('The user as no profile\n') return False if request.method in SAFE_METHODS: resp = True if (r_profiles and r_profiles!=(P.SUPER,)) else False if not resp: msg = 'The user does not have reading rights\n' self.logger.debug(msg) return resp # La gestion de droits des administrateurs est faite via IsAdminUser pour les api d'alimentation is_fil = P.FILIERE in w_profiles is_pcp = P.PCP in w_profiles is_bvt = P.BVT in w_profiles is_itd = P.ITD in w_profiles is_hme = P.HME in w_profiles codes_lvl4 = get_lvl4_org_codes_by_any_code(summary.org_code) if obj: # Modification d'une seule instance if not w_profiles or w_profiles==(P.SUPER,): self.logger.debug('The user does not have writing rights\n') return False if isinstance(obj, Administre): pk = obj.pk return self.verif_droits_adm(pk, codes_lvl4, is_fil, is_pcp, is_bvt) elif isinstance(obj, Poste): pk = obj.pk return self.verif_droits_poste(pk, codes_lvl4, is_fil, is_pcp, is_bvt, is_itd) elif isinstance(obj, Decision): a_pk = obj.administre.pk p_pk = obj.poste.pk return self.verif_droits_poste(p_pk, codes_lvl4, is_fil, is_pcp, is_bvt, is_itd) or \ self.verif_droits_adm(a_pk, codes_lvl4, is_fil, is_pcp, is_bvt) else: # Vérification globale ou modification multiple self.logger.debug('---- Writing profiles ----') self.logger.debug('Expert HME : %s', is_hme) self.logger.debug('Pameur BMOB : %s', is_pcp) self.logger.debug('Gestionnaire BVT : %s', is_bvt) self.logger.debug('Gestionnaire ITD : %s', is_itd) self.logger.debug('Gestionnaire BGCAT : %s', is_fil) self.logger.debug('--------------------------') if isinstance(request.data, list): if not w_profiles or w_profiles==(P.SUPER,): self.logger.debug('The user does not have writing rights\n') return False if 'a_id_sap' in request.data[0].keys(): pk = [adm['a_id_sap'] for adm in request.data] return self.verif_droits_adm(pk, codes_lvl4, is_fil, is_pcp, is_bvt) elif 'p_id' in request.data[0].keys(): pk = [p['p_id'] for p in request.data] return self.verif_droits_poste(pk, codes_lvl4, is_fil, is_pcp, is_bvt, is_itd) self.logger.debug('Global rights verification ok\n') return True # On autorise une permission globale pour ensuite permettre des vérifications au niveau des instances avec has_object_permission self.logger.debug('User rights verification failed\n') return False def has_permission(self, request, view): if request.method != 'GET': self.logger.debug('------------------------- Permissions verification ---------------------') self.logger.debug('Request method : %s', request.method) self.logger.debug('Request data : %s', request.data) self.logger.debug('View : %s', view) return self.commun_verif(request=request, view=view) def has_object_permission(self, request, view, obj): self.logger.debug('--------------------- Object permissions verification ------------------') self.logger.debug('Request method : %s', request.method) self.logger.debug('Request data : %s', request.data) self.logger.debug('View : %s', view) self.logger.debug('Object : %s', obj) return self.commun_verif(request=request, view=view, obj=obj)