from enum import Enum from typing import Any, Dict, List, Optional, Tuple, Union from django.db.models import Exists, OuterRef, Q, QuerySet from ..models import (Administre, CustomUser, Decision, FormationEmploi, Poste, RefOrg, RefSvFil, Administres_Pams, Postes_Pams) from ..utils.attributes import safe_rgetattr from ..utils.logging import get_logger logger = get_logger(__name__) # clé d'un élément de "get_profiles_by_adm" : pour la lecture KEY_READ = 'read' # clé d'un élément de "get_profiles_by_adm" : pour l'écriture KEY_WRITE = 'write' class Profiles(str, Enum): """ profils applicatifs """ # super utilisateur appelé administrateur SUPER = 'SUPER' # gestionnaire de filière appelé gestionnaire BGCAT FILIERE = 'FILIERE' # gestionnaire BVT BVT = 'BVT' # gestionnaire inter-domaine ITD = 'ITD' # gestionnaire PCP (sans détail actuel/futur) appelé pameur BMOB PCP = 'PCP' # gestionnaire PCP actuel (ou gagnant), disponible quand l'administré change de FE PCP_ACTUEL = 'PCP_ACTUEL' # gestionnaire PCP futur (ou perdant), disponible quand l'administré change de FE PCP_FUTUR = 'PCP_FUTUR' # expert HME HME = 'HME' def __repr__(self): return "%s.%s" % (self.__class__.__name__, self._name_) class ProfileSummary: """ résumé des profils de l'utilisateur il s'agit d'une vision simplifiée, sans le détail des sous-viviers ou des groupes FE """ """ Méthode d'initialisation :param org_code: code du référentiel organique lié à l'utilisateur, défaut : None :type org_code: str, optional :param profiles: profils de l'utilisateur, dictionnaire {: , : } } :type profiles: Dict[str, Tuple[Profiles]], optional """ def __init__(self, org_code: str = None, profiles: Dict[str, Tuple[Profiles]] = {}): self.org_code = org_code self.profiles = profiles def is_truthy(val: Any) -> bool: """indique si une valeur des référentiels est considérée comme vraie""" return val and val != 'nan' def get_queryset_org_by_user(user: CustomUser) -> QuerySet: """ Crée un QuerySet pour récupérer le référentiel organique lié à l'utilisateur. :param user: utilisateur :type user: class:`CustomUser` :return: QuerySet pour récupérer le référentiel :rtype: class:`QuerySet` """ return RefOrg.objects.filter(ref_gest__isnull=False, ref_gest__pk=user.id) def get_queryset_org_by_any_code(org_code: str) -> QuerySet: """ Crée un QuerySet pour récupérer les référentiel organiques liés au code donné. :param org_code: code de référentiel (de niveau 1 à 4) :type org_code: str :return: QuerySet pour récupérer les référentiels :rtype: class:`QuerySet` """ return RefOrg.objects.filter(Q(ref_org_code_niv_org1=org_code) | Q(ref_org_code_niv_org2=org_code) | Q(ref_org_code_niv_org3=org_code) | Q(ref_org_code_niv_org4=org_code)) def get_lvl4_org_codes_by_any_code(org_code: str) -> Tuple[str]: """ Retourne les codes de niveau 4 des référentiels organiques liés au code donné. :param org_code: code de référentiel (de niveau 1 à 4) :type org_code: str :return: codes de niveau 4 :rtype: Tuple[str] """ return tuple(code for code in get_queryset_org_by_any_code(org_code).values_list('ref_org_code_niv_org4', flat=True).distinct() if is_truthy(code)) def get_adm_filter_by_lvl4_codes_fil(org_codes: Union[List[str], Tuple[str]]) -> Q: """ Crée un filtre pour récupérer les administrés liés à un gestionnaire de filière à partir de codes de référentiels de niveau 4. :param org_codes: codes de référentiels de niveau 4 :type org_code: str :return: filtre pour récupérer les administrés :rtype: class:`Q` """ Cols = Administre.Cols Cols_Pams = Administres_Pams.Cols return Q(Exists( RefSvFil.objects .filter(ref_sv_fil_code__in=org_codes) .filter(ref_sv_fil_dom=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.REL_DOMAINE}'), ref_sv_fil_fil=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.REL_FILIERE}'), ref_sv_fil_cat=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.CATEGORIE}')) )) def get_adm_filter_by_lvl4_codes_pcp(org_codes: Union[List[str], Tuple[str]]) -> Q: """ Crée un filtre pour récupérer les administrés liés à un gestionnaire PCP à partir de codes de référentiels de niveau 4. :param org_codes: codes de référentiels de niveau 4 :type org_code: str :return: filtre pour récupérer les administrés :rtype: class:`Q` """ Cols = Administre.Cols Cols_Pams = Administres_Pams.Cols cat_filter = Q(**{f'{Cols_Pams.REL_ADMINISTRE}__{Cols.CATEGORIE}': 'MDR'}) sub_qs = FormationEmploi.objects.filter(fe_code=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.REL_FORMATION_EMPLOI}')) return ( (~cat_filter & Exists(sub_qs.filter(fe_code_niv_org4__in=org_codes))) | (cat_filter & Exists(sub_qs.filter(fe_code_niv_org4_mdr__in=org_codes))) ) def get_adm_filter_by_lvl4_codes_future_pcp(org_codes: Union[List[str], Tuple[str]]) -> Q: """ Crée un filtre pour récupérer les administrés liés à un futur gestionnaire PCP à partir de codes de référentiels de niveau 4. :param org_codes: codes de référentiels de niveau 4 :type org_code: str :return: filtre pour récupérer les administrés :rtype: class:`Q` """ Cols = Administres_Pams.Cols cat_filter = Q(**{f'{Cols.REL_ADMINISTRE}__{Administre.Cols.CATEGORIE}': 'MDR'}) fe_code_path = f'{Cols.REL_DECISION}__{Decision.Cols.REL_POSTE}__{Poste.Cols.REL_FORMATION_EMPLOI}' sub_qs = FormationEmploi.objects.filter(fe_code=OuterRef(fe_code_path)) return (Q(**{f'{fe_code_path}__isnull': False}) & ( (~cat_filter & Exists(sub_qs.filter(fe_code_niv_org4__in=org_codes))) | (cat_filter & Exists(sub_qs.filter(fe_code_niv_org4_mdr__in=org_codes))) )) def get_poste_filter_by_lvl4_codes_fil(org_codes: Union[List[str], Tuple[str]]) -> Q: """ Crée un filtre pour récupérer les postes liés à un gestionnaire fil à partir de codes de référentiels de niveau 4. :param org_codes: codes de référentiels de niveau 4 :type org_code: str :return: filtre pour récupérer les postes :rtype: class:`Q` """ Cols = Poste.Cols Cols_Pam = Postes_Pams.Cols return Q(Exists( RefSvFil.objects .filter(ref_sv_fil_code__in=org_codes) .filter(sous_vivier_id=OuterRef(f'{Cols_Pam.REL_POSTE}__{Cols.M2M_SOUS_VIVIERS}')) )) def get_poste_filter_by_lvl4_codes_pcp(org_codes: Union[List[str], Tuple[str]]) -> Q: """ Crée un filtre pour récupérer les postes liés à un gestionnaire PCP à partir de codes de référentiels de niveau 4. :param org_codes: codes de référentiels de niveau 4 :type org_code: str :return: filtre pour récupérer les postes :rtype: class:`Q` """ Cols = Poste.Cols Cols_Pam = Postes_Pams.Cols cat_filter = Q(**{f'{Cols_Pam.REL_POSTE}__{Cols.CATEGORIE}': 'MDR'}) sub_qs = FormationEmploi.objects.filter(fe_code=OuterRef(f'{Cols_Pam.REL_POSTE}__{Cols.REL_FORMATION_EMPLOI}')) return ( (~cat_filter & Exists(sub_qs.filter(fe_code_niv_org4__in=org_codes))) | (cat_filter & Exists(sub_qs.filter(fe_code_niv_org4_mdr__in=org_codes))) ) def get_profile_summary(user: Optional[CustomUser]) -> ProfileSummary: """ Renvoie le résumé des profils de l'utilisateur. :param user: utilisateur :type user: class:`CustomUser` :return: résumé des profils :rtype: class:`ProfileSummary` """ if not user or not user.is_authenticated: return ProfileSummary() org = (get_queryset_org_by_user(user) .only('pk', 'ref_org_droit_lect', 'ref_org_droit_ecr', 'ref_org_ref_fe', 'ref_org_ref_sv_fil', 'ref_org_expert_hme', 'ref_org_bvt', 'ref_org_itd') .first()) is_super = user.is_superuser is_read = org and is_truthy(org.ref_org_droit_lect) is_write = org and is_truthy(org.ref_org_droit_ecr) is_fil = org and is_truthy(org.ref_org_ref_sv_fil) is_hme = org and is_truthy(org.ref_org_expert_hme) is_bvt = org and is_truthy(org.ref_org_bvt) is_itd = org and is_truthy(org.ref_org_itd) is_pcp = org and is_truthy(org.ref_org_ref_fe) # logger.debug('user %s => superuser: %s, READ: %s, WRITE: %s, FIL: %s, PCP: %s, BVT: %s, ITD: %s, HME: %s', user.pk, is_super, is_read, is_write, is_fil, is_pcp, is_bvt, is_itd, is_hme) profiles = [] if is_super: profiles.append(Profiles.SUPER) if is_fil: profiles.append(Profiles.FILIERE) if is_pcp: profiles.append(Profiles.PCP) if is_bvt: profiles.append(Profiles.BVT) if is_itd: profiles.append(Profiles.ITD) if is_hme: profiles.append(Profiles.HME) return ProfileSummary(org_code=org.pk if org else None, profiles={ KEY_READ: tuple(profiles) if is_read else (Profiles.SUPER,) if is_super else (), KEY_WRITE: tuple(profiles) if is_write else (Profiles.SUPER,) if is_super else(), }) def _group_adm_by_profile(org_code: Optional[str], administres: Tuple[Administres_Pams], profiles: Tuple[Profiles]) -> Dict[Profiles, Tuple[int]]: """ Groupe les administrés liés à certains profils, uniquement parmi les administrés en paramètre. Il ne s'agit pas d'un partitionnement car un même administré peut être lié à plusieurs profils. :param org_code: code de référentiel (de niveau 1 à 4) :type org_code: str :param administres: administrés à grouper :type administres: Tuple[Administres_Pams] :param profiles: profils utilisés :type profiles: Tuple[Profiles] :return: dictionnaire {: } :rtype: Dict[Profiles, Tuple[int]] """ A = Administre.Cols AP = Administres_Pams.Cols D = Decision.Cols P = Profiles adm_ids_fil = () adm_ids_bvt = () adm_ids_pcp = [] adm_ids_current_pcp = [] adm_ids_future_pcp = () if org_code: is_fil = P.FILIERE in profiles is_bvt = P.BVT in profiles is_pcp = P.PCP in profiles if is_fil or is_bvt or is_pcp: adm_by_id = {adm.pk: adm for adm in administres} qs = Administres_Pams.objects if is_bvt: adm_ids_bvt = tuple( qs.filter(pk__in=adm_by_id.keys()) .filter(Q(**{f'{AP.REL_ADMINISTRE}__{A.REL_SOUS_VIVIER}': 'BVT'})).values_list('pk', flat=True)) if is_fil or is_pcp: codes_lvl4 = get_lvl4_org_codes_by_any_code(org_code) if codes_lvl4: if is_fil: adm_ids_fil = tuple( qs.filter(pk__in=adm_by_id.keys()) .filter(get_adm_filter_by_lvl4_codes_fil(codes_lvl4)).values_list('pk', flat=True)) if is_pcp: raw_adm_ids_current_pcp = set( qs.filter(pk__in=adm_by_id.keys()) .filter(get_adm_filter_by_lvl4_codes_pcp(codes_lvl4)).values_list('pk', flat=True)) for adm_id in raw_adm_ids_current_pcp: adm = adm_by_id.get(adm_id) fe_avant = safe_rgetattr(adm, f'{AP.REL_ADMINISTRE}.{A.REL_FORMATION_EMPLOI}_id') fe_apres = safe_rgetattr(adm, f'{AP.REL_DECISION}.{D.REL_POSTE}.{Poste.Cols.REL_FORMATION_EMPLOI}_id') if fe_apres is None or fe_avant == fe_apres: # pas de décision ou même FE : juste PCP adm_ids_pcp.append(adm_id) else: # sinon PCP_ACTUEL adm_ids_current_pcp.append(adm_id) adm_ids_future_pcp = tuple( qs.filter(pk__in=[adm_id for adm_id in adm_by_id.keys() if adm_id not in adm_ids_pcp]) .filter(get_adm_filter_by_lvl4_codes_future_pcp(codes_lvl4)).values_list('pk', flat=True)) return { P.FILIERE: adm_ids_fil, P.BVT: adm_ids_bvt, P.PCP: tuple(adm_ids_pcp), P.PCP_ACTUEL: tuple(adm_ids_current_pcp), P.PCP_FUTUR: tuple(adm_ids_future_pcp) } def _get_profiles_for_adm(adm_id: int, global_profiles: Tuple[Profiles], adm_ids_by_profile: Dict[Profiles, Tuple[int]]) -> Tuple[Profiles]: """ Renvoie les profils qui concernent l'administré. Ils sont différents des profils globaux car un utilisateur qui a le profil PCP n'est pas forcément PCP pour tous les administrés, il peut être PCP_ACTUEL et/ou PCP_FUTUR en fonction de l'administré. note : Profiles.SUPER est ignoré car il n'a pas de sens dans ce contexte :param adm_id: ID de l'administré :type adm_id: int :param global_profiles: profils globaux, ceux qu'on trouve dans un class:`ProfileSummary`. :type global_profiles: Tuple[Administre] :param adm_ids_by_profile: IDs d'administrés groupés par profil, dictionnaire {: } :type adm_ids_by_profile: Dict[Profiles, Tuple[int]] :return: profils qui concernent l'administré :rtype: Tuple[Profiles] """ P = Profiles profiles = set() if P.HME in global_profiles: profiles.add(P.HME) if P.FILIERE in global_profiles and adm_id in adm_ids_by_profile.get(P.FILIERE): profiles.add(P.FILIERE) if P.BVT in global_profiles and adm_id in adm_ids_by_profile.get(P.BVT): profiles.add(P.BVT) if P.PCP in global_profiles: if adm_id in adm_ids_by_profile.get(P.PCP): profiles.add(P.PCP) if adm_id in adm_ids_by_profile.get(P.PCP_ACTUEL): profiles.add(P.PCP_ACTUEL) if adm_id in adm_ids_by_profile.get(P.PCP_FUTUR): profiles.add(P.PCP_FUTUR) return tuple(profiles) def get_profiles_by_adm(user: Optional[CustomUser], administre: Administres_Pams, *args: Administres_Pams) -> Dict[int, Dict[str, Tuple[Profiles]]]: """ Renvoie un dictionnaire dont les clés sont les ID SAP des administrés donnés et les valeurs sont les profils de l'utilisateur. :param user: utilisateur :type user: class:`CustomUser` :param administre: administré :type administre: class:`Administres_Pams` :param args: administrés supplémentaires :type args: class:`Administres_Pams` (multiple) :return: dictionnaire de dictionnaires {: {: , : } } :rtype: Dict[int, Dict[str, Tuple[Profiles]]] """ values = (administre, *args) if not values: return {} summary = get_profile_summary(user) r_profiles = summary.profiles.get(KEY_READ, ()) w_profiles = summary.profiles.get(KEY_WRITE, ()) if not r_profiles and not w_profiles: return {adm.administre.pk: {KEY_READ: (), KEY_WRITE: ()} for adm in values} same_profiles = sorted(set(r_profiles)) == sorted(set(w_profiles)) org_code = summary.org_code r_adm_ids_by_profile = _group_adm_by_profile(org_code, values, r_profiles) w_adm_ids_by_profile = _group_adm_by_profile(org_code, values, w_profiles) if not same_profiles else r_adm_ids_by_profile result = {} for adm in values: adm_id = adm.pk r_profiles_adm = _get_profiles_for_adm(adm_id, r_profiles, r_adm_ids_by_profile) w_profiles_adm = _get_profiles_for_adm(adm_id, w_profiles, w_adm_ids_by_profile) if not same_profiles else r_profiles_adm result.setdefault(adm_id, { KEY_READ: r_profiles_adm, KEY_WRITE: w_profiles_adm }) return result