405 lines
16 KiB
Python
405 lines
16 KiB
Python
from enum import Enum
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
from django.db.models import Exists, OuterRef, Q, QuerySet
|
|
|
|
|
|
from ..models import (Administre, CustomUser, Decision, FormationEmploi, Poste,
|
|
RefOrg, RefSvFil, Administres_Pams, Postes_Pams)
|
|
from ..utils.attributes import safe_rgetattr
|
|
from ..utils.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# clé d'un élément de "get_profiles_by_adm" : pour la lecture
|
|
KEY_READ = 'read'
|
|
|
|
# clé d'un élément de "get_profiles_by_adm" : pour l'écriture
|
|
KEY_WRITE = 'write'
|
|
|
|
|
|
class Profiles(str, Enum):
|
|
""" profils applicatifs """
|
|
|
|
# super utilisateur appelé administrateur
|
|
SUPER = 'SUPER'
|
|
|
|
# gestionnaire de filière appelé gestionnaire BGCAT
|
|
FILIERE = 'FILIERE'
|
|
|
|
# gestionnaire BVT
|
|
BVT = 'BVT'
|
|
|
|
# gestionnaire inter-domaine
|
|
ITD = 'ITD'
|
|
|
|
# gestionnaire PCP (sans détail actuel/futur) appelé pameur BMOB
|
|
PCP = 'PCP'
|
|
|
|
# gestionnaire PCP actuel (ou gagnant), disponible quand l'administré change de FE
|
|
PCP_ACTUEL = 'PCP_ACTUEL'
|
|
|
|
# gestionnaire PCP futur (ou perdant), disponible quand l'administré change de FE
|
|
PCP_FUTUR = 'PCP_FUTUR'
|
|
|
|
# expert HME
|
|
HME = 'HME'
|
|
|
|
def __repr__(self):
|
|
return "%s.%s" % (self.__class__.__name__, self._name_)
|
|
|
|
|
|
class ProfileSummary:
|
|
"""
|
|
résumé des profils de l'utilisateur
|
|
il s'agit d'une vision simplifiée, sans le détail des sous-viviers ou des groupes FE
|
|
"""
|
|
|
|
"""
|
|
Méthode d'initialisation
|
|
|
|
:param org_code: code du référentiel organique lié à l'utilisateur, défaut : None
|
|
:type org_code: str, optional
|
|
|
|
:param profiles: profils de l'utilisateur, dictionnaire {<KEY_READ>: <profils>, <KEY_WRITE>: <profils>} }
|
|
:type profiles: Dict[str, Tuple[Profiles]], optional
|
|
"""
|
|
def __init__(self, org_code: str = None, profiles: Dict[str, Tuple[Profiles]] = {}):
|
|
self.org_code = org_code
|
|
self.profiles = profiles
|
|
|
|
|
|
def is_truthy(val: Any) -> bool:
|
|
"""indique si une valeur des référentiels est considérée comme vraie"""
|
|
return val and val != 'nan'
|
|
|
|
def get_queryset_org_by_user(user: CustomUser) -> QuerySet:
|
|
"""
|
|
Crée un QuerySet pour récupérer le référentiel organique lié à l'utilisateur.
|
|
|
|
:param user: utilisateur
|
|
:type user: class:`CustomUser`
|
|
|
|
:return: QuerySet pour récupérer le référentiel
|
|
:rtype: class:`QuerySet`
|
|
"""
|
|
return RefOrg.objects.filter(ref_gest__isnull=False, ref_gest__pk=user.id)
|
|
|
|
def get_queryset_org_by_any_code(org_code: str) -> QuerySet:
|
|
"""
|
|
Crée un QuerySet pour récupérer les référentiel organiques liés au code donné.
|
|
|
|
:param org_code: code de référentiel (de niveau 1 à 4)
|
|
:type org_code: str
|
|
|
|
:return: QuerySet pour récupérer les référentiels
|
|
:rtype: class:`QuerySet`
|
|
"""
|
|
return RefOrg.objects.filter(Q(ref_org_code_niv_org1=org_code)
|
|
| Q(ref_org_code_niv_org2=org_code)
|
|
| Q(ref_org_code_niv_org3=org_code)
|
|
| Q(ref_org_code_niv_org4=org_code))
|
|
|
|
def get_lvl4_org_codes_by_any_code(org_code: str) -> Tuple[str]:
|
|
"""
|
|
Retourne les codes de niveau 4 des référentiels organiques liés au code donné.
|
|
|
|
:param org_code: code de référentiel (de niveau 1 à 4)
|
|
:type org_code: str
|
|
|
|
:return: codes de niveau 4
|
|
:rtype: Tuple[str]
|
|
"""
|
|
return tuple(code for code in get_queryset_org_by_any_code(org_code).values_list('ref_org_code_niv_org4', flat=True).distinct() if is_truthy(code))
|
|
|
|
def get_adm_filter_by_lvl4_codes_fil(org_codes: Union[List[str], Tuple[str]]) -> Q:
|
|
"""
|
|
Crée un filtre pour récupérer les administrés liés à un gestionnaire de filière à partir de codes de référentiels de niveau 4.
|
|
|
|
:param org_codes: codes de référentiels de niveau 4
|
|
:type org_code: str
|
|
|
|
:return: filtre pour récupérer les administrés
|
|
:rtype: class:`Q`
|
|
"""
|
|
Cols = Administre.Cols
|
|
Cols_Pams = Administres_Pams.Cols
|
|
return Q(Exists(
|
|
RefSvFil.objects
|
|
.filter(ref_sv_fil_code__in=org_codes)
|
|
.filter(ref_sv_fil_dom=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.REL_DOMAINE}'),
|
|
ref_sv_fil_fil=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.REL_FILIERE}'),
|
|
ref_sv_fil_cat=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.CATEGORIE}'))
|
|
))
|
|
|
|
def get_adm_filter_by_lvl4_codes_pcp(org_codes: Union[List[str], Tuple[str]]) -> Q:
|
|
"""
|
|
Crée un filtre pour récupérer les administrés liés à un gestionnaire PCP à partir de codes de référentiels de niveau 4.
|
|
|
|
:param org_codes: codes de référentiels de niveau 4
|
|
:type org_code: str
|
|
|
|
:return: filtre pour récupérer les administrés
|
|
:rtype: class:`Q`
|
|
"""
|
|
Cols = Administre.Cols
|
|
Cols_Pams = Administres_Pams.Cols
|
|
cat_filter = Q(**{f'{Cols_Pams.REL_ADMINISTRE}__{Cols.CATEGORIE}': 'MDR'})
|
|
sub_qs = FormationEmploi.objects.filter(fe_code=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.REL_FORMATION_EMPLOI}'))
|
|
return (
|
|
(~cat_filter & Exists(sub_qs.filter(fe_code_niv_org4__in=org_codes)))
|
|
| (cat_filter & Exists(sub_qs.filter(fe_code_niv_org4_mdr__in=org_codes)))
|
|
)
|
|
|
|
def get_adm_filter_by_lvl4_codes_future_pcp(org_codes: Union[List[str], Tuple[str]]) -> Q:
|
|
"""
|
|
Crée un filtre pour récupérer les administrés liés à un futur gestionnaire PCP à partir de codes de référentiels de niveau 4.
|
|
|
|
:param org_codes: codes de référentiels de niveau 4
|
|
:type org_code: str
|
|
|
|
:return: filtre pour récupérer les administrés
|
|
:rtype: class:`Q`
|
|
"""
|
|
Cols = Administres_Pams.Cols
|
|
cat_filter = Q(**{f'{Cols.REL_ADMINISTRE}__{Administre.Cols.CATEGORIE}': 'MDR'})
|
|
fe_code_path = f'{Cols.REL_DECISION}__{Decision.Cols.REL_POSTE}__{Poste.Cols.REL_FORMATION_EMPLOI}'
|
|
sub_qs = FormationEmploi.objects.filter(fe_code=OuterRef(fe_code_path))
|
|
return (Q(**{f'{fe_code_path}__isnull': False}) & (
|
|
(~cat_filter & Exists(sub_qs.filter(fe_code_niv_org4__in=org_codes)))
|
|
| (cat_filter & Exists(sub_qs.filter(fe_code_niv_org4_mdr__in=org_codes)))
|
|
))
|
|
|
|
def get_poste_filter_by_lvl4_codes_fil(org_codes: Union[List[str], Tuple[str]]) -> Q:
|
|
"""
|
|
Crée un filtre pour récupérer les postes liés à un gestionnaire fil à partir de codes de référentiels de niveau 4.
|
|
|
|
:param org_codes: codes de référentiels de niveau 4
|
|
:type org_code: str
|
|
|
|
:return: filtre pour récupérer les postes
|
|
:rtype: class:`Q`
|
|
"""
|
|
Cols = Poste.Cols
|
|
Cols_Pam = Postes_Pams.Cols
|
|
return Q(Exists(
|
|
RefSvFil.objects
|
|
.filter(ref_sv_fil_code__in=org_codes)
|
|
.filter(sous_vivier_id=OuterRef(f'{Cols_Pam.REL_POSTE}__{Cols.M2M_SOUS_VIVIERS}'))
|
|
))
|
|
|
|
def get_poste_filter_by_lvl4_codes_pcp(org_codes: Union[List[str], Tuple[str]]) -> Q:
|
|
"""
|
|
Crée un filtre pour récupérer les postes liés à un gestionnaire PCP à partir de codes de référentiels de niveau 4.
|
|
|
|
:param org_codes: codes de référentiels de niveau 4
|
|
:type org_code: str
|
|
|
|
:return: filtre pour récupérer les postes
|
|
:rtype: class:`Q`
|
|
"""
|
|
Cols = Poste.Cols
|
|
Cols_Pam = Postes_Pams.Cols
|
|
cat_filter = Q(**{f'{Cols_Pam.REL_POSTE}__{Cols.CATEGORIE}': 'MDR'})
|
|
sub_qs = FormationEmploi.objects.filter(fe_code=OuterRef(f'{Cols_Pam.REL_POSTE}__{Cols.REL_FORMATION_EMPLOI}'))
|
|
return (
|
|
(~cat_filter & Exists(sub_qs.filter(fe_code_niv_org4__in=org_codes)))
|
|
| (cat_filter & Exists(sub_qs.filter(fe_code_niv_org4_mdr__in=org_codes)))
|
|
)
|
|
|
|
def get_profile_summary(user: Optional[CustomUser]) -> ProfileSummary:
|
|
"""
|
|
Renvoie le résumé des profils de l'utilisateur.
|
|
|
|
:param user: utilisateur
|
|
:type user: class:`CustomUser`
|
|
|
|
:return: résumé des profils
|
|
:rtype: class:`ProfileSummary`
|
|
"""
|
|
if not user or not user.is_authenticated:
|
|
return ProfileSummary()
|
|
|
|
org = (get_queryset_org_by_user(user)
|
|
.only('pk', 'ref_org_droit_lect', 'ref_org_droit_ecr', 'ref_org_ref_fe', 'ref_org_ref_sv_fil', 'ref_org_expert_hme', 'ref_org_bvt', 'ref_org_itd')
|
|
.first())
|
|
|
|
is_super = user.is_superuser
|
|
is_read = org and is_truthy(org.ref_org_droit_lect)
|
|
is_write = org and is_truthy(org.ref_org_droit_ecr)
|
|
is_fil = org and is_truthy(org.ref_org_ref_sv_fil)
|
|
is_hme = org and is_truthy(org.ref_org_expert_hme)
|
|
is_bvt = org and is_truthy(org.ref_org_bvt)
|
|
is_itd = org and is_truthy(org.ref_org_itd)
|
|
is_pcp = org and is_truthy(org.ref_org_ref_fe)
|
|
# logger.debug('user %s => superuser: %s, READ: %s, WRITE: %s, FIL: %s, PCP: %s, BVT: %s, ITD: %s, HME: %s', user.pk, is_super, is_read, is_write, is_fil, is_pcp, is_bvt, is_itd, is_hme)
|
|
|
|
profiles = []
|
|
if is_super:
|
|
profiles.append(Profiles.SUPER)
|
|
if is_fil:
|
|
profiles.append(Profiles.FILIERE)
|
|
if is_pcp:
|
|
profiles.append(Profiles.PCP)
|
|
if is_bvt:
|
|
profiles.append(Profiles.BVT)
|
|
if is_itd:
|
|
profiles.append(Profiles.ITD)
|
|
if is_hme:
|
|
profiles.append(Profiles.HME)
|
|
|
|
|
|
return ProfileSummary(org_code=org.pk if org else None, profiles={
|
|
KEY_READ: tuple(profiles) if is_read else (Profiles.SUPER,) if is_super else (),
|
|
KEY_WRITE: tuple(profiles) if is_write else (Profiles.SUPER,) if is_super else(),
|
|
})
|
|
|
|
def _group_adm_by_profile(org_code: Optional[str], administres: Tuple[Administres_Pams], profiles: Tuple[Profiles]) -> Dict[Profiles, Tuple[int]]:
|
|
"""
|
|
Groupe les administrés liés à certains profils, uniquement parmi les administrés en paramètre.
|
|
Il ne s'agit pas d'un partitionnement car un même administré peut être lié à plusieurs profils.
|
|
|
|
:param org_code: code de référentiel (de niveau 1 à 4)
|
|
:type org_code: str
|
|
|
|
:param administres: administrés à grouper
|
|
:type administres: Tuple[Administres_Pams]
|
|
|
|
:param profiles: profils utilisés
|
|
:type profiles: Tuple[Profiles]
|
|
|
|
:return: dictionnaire {<profil>: <ID d'administrés>}
|
|
:rtype: Dict[Profiles, Tuple[int]]
|
|
"""
|
|
A = Administre.Cols
|
|
AP = Administres_Pams.Cols
|
|
D = Decision.Cols
|
|
P = Profiles
|
|
|
|
adm_ids_fil = ()
|
|
adm_ids_bvt = ()
|
|
adm_ids_pcp = []
|
|
adm_ids_current_pcp = []
|
|
adm_ids_future_pcp = ()
|
|
|
|
if org_code:
|
|
is_fil = P.FILIERE in profiles
|
|
is_bvt = P.BVT in profiles
|
|
is_pcp = P.PCP in profiles
|
|
if is_fil or is_bvt or is_pcp:
|
|
adm_by_id = {adm.pk: adm for adm in administres}
|
|
qs = Administres_Pams.objects
|
|
if is_bvt:
|
|
adm_ids_bvt = tuple(
|
|
qs.filter(pk__in=adm_by_id.keys())
|
|
.filter(Q(**{f'{AP.REL_ADMINISTRE}__{A.REL_SOUS_VIVIER}': 'BVT'})).values_list('pk', flat=True))
|
|
if is_fil or is_pcp:
|
|
codes_lvl4 = get_lvl4_org_codes_by_any_code(org_code)
|
|
if codes_lvl4:
|
|
if is_fil:
|
|
adm_ids_fil = tuple(
|
|
qs.filter(pk__in=adm_by_id.keys())
|
|
.filter(get_adm_filter_by_lvl4_codes_fil(codes_lvl4)).values_list('pk', flat=True))
|
|
if is_pcp:
|
|
raw_adm_ids_current_pcp = set(
|
|
qs.filter(pk__in=adm_by_id.keys())
|
|
.filter(get_adm_filter_by_lvl4_codes_pcp(codes_lvl4)).values_list('pk', flat=True))
|
|
for adm_id in raw_adm_ids_current_pcp:
|
|
adm = adm_by_id.get(adm_id)
|
|
fe_avant = safe_rgetattr(adm, f'{AP.REL_ADMINISTRE}.{A.REL_FORMATION_EMPLOI}_id')
|
|
fe_apres = safe_rgetattr(adm, f'{AP.REL_DECISION}.{D.REL_POSTE}.{Poste.Cols.REL_FORMATION_EMPLOI}_id')
|
|
if fe_apres is None or fe_avant == fe_apres:
|
|
# pas de décision ou même FE : juste PCP
|
|
adm_ids_pcp.append(adm_id)
|
|
else:
|
|
# sinon PCP_ACTUEL
|
|
adm_ids_current_pcp.append(adm_id)
|
|
adm_ids_future_pcp = tuple(
|
|
qs.filter(pk__in=[adm_id for adm_id in adm_by_id.keys() if adm_id not in adm_ids_pcp])
|
|
.filter(get_adm_filter_by_lvl4_codes_future_pcp(codes_lvl4)).values_list('pk', flat=True))
|
|
return {
|
|
P.FILIERE: adm_ids_fil,
|
|
P.BVT: adm_ids_bvt,
|
|
P.PCP: tuple(adm_ids_pcp),
|
|
P.PCP_ACTUEL: tuple(adm_ids_current_pcp),
|
|
P.PCP_FUTUR: tuple(adm_ids_future_pcp)
|
|
}
|
|
|
|
def _get_profiles_for_adm(adm_id: int, global_profiles: Tuple[Profiles], adm_ids_by_profile: Dict[Profiles, Tuple[int]]) -> Tuple[Profiles]:
|
|
"""
|
|
Renvoie les profils qui concernent l'administré. Ils sont différents des profils globaux car un utilisateur qui a le profil PCP
|
|
n'est pas forcément PCP pour tous les administrés, il peut être PCP_ACTUEL et/ou PCP_FUTUR en fonction de l'administré.
|
|
note : Profiles.SUPER est ignoré car il n'a pas de sens dans ce contexte
|
|
|
|
:param adm_id: ID de l'administré
|
|
:type adm_id: int
|
|
|
|
:param global_profiles: profils globaux, ceux qu'on trouve dans un class:`ProfileSummary`.
|
|
:type global_profiles: Tuple[Administre]
|
|
|
|
:param adm_ids_by_profile: IDs d'administrés groupés par profil, dictionnaire {<profil>: <ID d'administrés>}
|
|
:type adm_ids_by_profile: Dict[Profiles, Tuple[int]]
|
|
|
|
:return: profils qui concernent l'administré
|
|
:rtype: Tuple[Profiles]
|
|
"""
|
|
P = Profiles
|
|
profiles = set()
|
|
if P.HME in global_profiles:
|
|
profiles.add(P.HME)
|
|
if P.FILIERE in global_profiles and adm_id in adm_ids_by_profile.get(P.FILIERE):
|
|
profiles.add(P.FILIERE)
|
|
if P.BVT in global_profiles and adm_id in adm_ids_by_profile.get(P.BVT):
|
|
profiles.add(P.BVT)
|
|
if P.PCP in global_profiles:
|
|
if adm_id in adm_ids_by_profile.get(P.PCP):
|
|
profiles.add(P.PCP)
|
|
if adm_id in adm_ids_by_profile.get(P.PCP_ACTUEL):
|
|
profiles.add(P.PCP_ACTUEL)
|
|
if adm_id in adm_ids_by_profile.get(P.PCP_FUTUR):
|
|
profiles.add(P.PCP_FUTUR)
|
|
return tuple(profiles)
|
|
|
|
def get_profiles_by_adm(user: Optional[CustomUser], administre: Administres_Pams, *args: Administres_Pams) -> Dict[int, Dict[str, Tuple[Profiles]]]:
|
|
"""
|
|
Renvoie un dictionnaire dont les clés sont les ID SAP des administrés donnés et les valeurs sont les profils de l'utilisateur.
|
|
|
|
:param user: utilisateur
|
|
:type user: class:`CustomUser`
|
|
|
|
:param administre: administré
|
|
:type administre: class:`Administres_Pams`
|
|
|
|
:param args: administrés supplémentaires
|
|
:type args: class:`Administres_Pams` (multiple)
|
|
|
|
:return: dictionnaire de dictionnaires {<ID SAP>: {<KEY_READ>: <profils>, <KEY_WRITE>: <profils>} }
|
|
:rtype: Dict[int, Dict[str, Tuple[Profiles]]]
|
|
"""
|
|
values = (administre, *args)
|
|
if not values:
|
|
return {}
|
|
|
|
summary = get_profile_summary(user)
|
|
r_profiles = summary.profiles.get(KEY_READ, ())
|
|
w_profiles = summary.profiles.get(KEY_WRITE, ())
|
|
|
|
if not r_profiles and not w_profiles:
|
|
return {adm.administre.pk: {KEY_READ: (), KEY_WRITE: ()} for adm in values}
|
|
|
|
same_profiles = sorted(set(r_profiles)) == sorted(set(w_profiles))
|
|
org_code = summary.org_code
|
|
r_adm_ids_by_profile = _group_adm_by_profile(org_code, values, r_profiles)
|
|
w_adm_ids_by_profile = _group_adm_by_profile(org_code, values, w_profiles) if not same_profiles else r_adm_ids_by_profile
|
|
|
|
result = {}
|
|
for adm in values:
|
|
adm_id = adm.pk
|
|
r_profiles_adm = _get_profiles_for_adm(adm_id, r_profiles, r_adm_ids_by_profile)
|
|
w_profiles_adm = _get_profiles_for_adm(adm_id, w_profiles, w_adm_ids_by_profile) if not same_profiles else r_profiles_adm
|
|
result.setdefault(adm_id, {
|
|
KEY_READ: r_profiles_adm,
|
|
KEY_WRITE: w_profiles_adm
|
|
})
|
|
return result
|