init
This commit is contained in:
404
backend-django/backend/utils/permissions.py
Normal file
404
backend-django/backend/utils/permissions.py
Normal file
@@ -0,0 +1,404 @@
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from django.db.models import Exists, OuterRef, Q, QuerySet
|
||||
|
||||
|
||||
from ..models import (Administre, CustomUser, Decision, FormationEmploi, Poste,
|
||||
RefOrg, RefSvFil, Administres_Pams, Postes_Pams)
|
||||
from ..utils.attributes import safe_rgetattr
|
||||
from ..utils.logging import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# clé d'un élément de "get_profiles_by_adm" : pour la lecture
|
||||
KEY_READ = 'read'
|
||||
|
||||
# clé d'un élément de "get_profiles_by_adm" : pour l'écriture
|
||||
KEY_WRITE = 'write'
|
||||
|
||||
|
||||
class Profiles(str, Enum):
|
||||
""" profils applicatifs """
|
||||
|
||||
# super utilisateur appelé administrateur
|
||||
SUPER = 'SUPER'
|
||||
|
||||
# gestionnaire de filière appelé gestionnaire BGCAT
|
||||
FILIERE = 'FILIERE'
|
||||
|
||||
# gestionnaire BVT
|
||||
BVT = 'BVT'
|
||||
|
||||
# gestionnaire inter-domaine
|
||||
ITD = 'ITD'
|
||||
|
||||
# gestionnaire PCP (sans détail actuel/futur) appelé pameur BMOB
|
||||
PCP = 'PCP'
|
||||
|
||||
# gestionnaire PCP actuel (ou gagnant), disponible quand l'administré change de FE
|
||||
PCP_ACTUEL = 'PCP_ACTUEL'
|
||||
|
||||
# gestionnaire PCP futur (ou perdant), disponible quand l'administré change de FE
|
||||
PCP_FUTUR = 'PCP_FUTUR'
|
||||
|
||||
# expert HME
|
||||
HME = 'HME'
|
||||
|
||||
def __repr__(self):
|
||||
return "%s.%s" % (self.__class__.__name__, self._name_)
|
||||
|
||||
|
||||
class ProfileSummary:
|
||||
"""
|
||||
résumé des profils de l'utilisateur
|
||||
il s'agit d'une vision simplifiée, sans le détail des sous-viviers ou des groupes FE
|
||||
"""
|
||||
|
||||
"""
|
||||
Méthode d'initialisation
|
||||
|
||||
:param org_code: code du référentiel organique lié à l'utilisateur, défaut : None
|
||||
:type org_code: str, optional
|
||||
|
||||
:param profiles: profils de l'utilisateur, dictionnaire {<KEY_READ>: <profils>, <KEY_WRITE>: <profils>} }
|
||||
:type profiles: Dict[str, Tuple[Profiles]], optional
|
||||
"""
|
||||
def __init__(self, org_code: str = None, profiles: Dict[str, Tuple[Profiles]] = {}):
|
||||
self.org_code = org_code
|
||||
self.profiles = profiles
|
||||
|
||||
|
||||
def is_truthy(val: Any) -> bool:
|
||||
"""indique si une valeur des référentiels est considérée comme vraie"""
|
||||
return val and val != 'nan'
|
||||
|
||||
def get_queryset_org_by_user(user: CustomUser) -> QuerySet:
|
||||
"""
|
||||
Crée un QuerySet pour récupérer le référentiel organique lié à l'utilisateur.
|
||||
|
||||
:param user: utilisateur
|
||||
:type user: class:`CustomUser`
|
||||
|
||||
:return: QuerySet pour récupérer le référentiel
|
||||
:rtype: class:`QuerySet`
|
||||
"""
|
||||
return RefOrg.objects.filter(ref_gest__isnull=False, ref_gest__pk=user.id)
|
||||
|
||||
def get_queryset_org_by_any_code(org_code: str) -> QuerySet:
|
||||
"""
|
||||
Crée un QuerySet pour récupérer les référentiel organiques liés au code donné.
|
||||
|
||||
:param org_code: code de référentiel (de niveau 1 à 4)
|
||||
:type org_code: str
|
||||
|
||||
:return: QuerySet pour récupérer les référentiels
|
||||
:rtype: class:`QuerySet`
|
||||
"""
|
||||
return RefOrg.objects.filter(Q(ref_org_code_niv_org1=org_code)
|
||||
| Q(ref_org_code_niv_org2=org_code)
|
||||
| Q(ref_org_code_niv_org3=org_code)
|
||||
| Q(ref_org_code_niv_org4=org_code))
|
||||
|
||||
def get_lvl4_org_codes_by_any_code(org_code: str) -> Tuple[str]:
|
||||
"""
|
||||
Retourne les codes de niveau 4 des référentiels organiques liés au code donné.
|
||||
|
||||
:param org_code: code de référentiel (de niveau 1 à 4)
|
||||
:type org_code: str
|
||||
|
||||
:return: codes de niveau 4
|
||||
:rtype: Tuple[str]
|
||||
"""
|
||||
return tuple(code for code in get_queryset_org_by_any_code(org_code).values_list('ref_org_code_niv_org4', flat=True).distinct() if is_truthy(code))
|
||||
|
||||
def get_adm_filter_by_lvl4_codes_fil(org_codes: Union[List[str], Tuple[str]]) -> Q:
|
||||
"""
|
||||
Crée un filtre pour récupérer les administrés liés à un gestionnaire de filière à partir de codes de référentiels de niveau 4.
|
||||
|
||||
:param org_codes: codes de référentiels de niveau 4
|
||||
:type org_code: str
|
||||
|
||||
:return: filtre pour récupérer les administrés
|
||||
:rtype: class:`Q`
|
||||
"""
|
||||
Cols = Administre.Cols
|
||||
Cols_Pams = Administres_Pams.Cols
|
||||
return Q(Exists(
|
||||
RefSvFil.objects
|
||||
.filter(ref_sv_fil_code__in=org_codes)
|
||||
.filter(ref_sv_fil_dom=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.REL_DOMAINE}'),
|
||||
ref_sv_fil_fil=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.REL_FILIERE}'),
|
||||
ref_sv_fil_cat=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.CATEGORIE}'))
|
||||
))
|
||||
|
||||
def get_adm_filter_by_lvl4_codes_pcp(org_codes: Union[List[str], Tuple[str]]) -> Q:
|
||||
"""
|
||||
Crée un filtre pour récupérer les administrés liés à un gestionnaire PCP à partir de codes de référentiels de niveau 4.
|
||||
|
||||
:param org_codes: codes de référentiels de niveau 4
|
||||
:type org_code: str
|
||||
|
||||
:return: filtre pour récupérer les administrés
|
||||
:rtype: class:`Q`
|
||||
"""
|
||||
Cols = Administre.Cols
|
||||
Cols_Pams = Administres_Pams.Cols
|
||||
cat_filter = Q(**{f'{Cols_Pams.REL_ADMINISTRE}__{Cols.CATEGORIE}': 'MDR'})
|
||||
sub_qs = FormationEmploi.objects.filter(fe_code=OuterRef(f'{Cols_Pams.REL_ADMINISTRE}__{Cols.REL_FORMATION_EMPLOI}'))
|
||||
return (
|
||||
(~cat_filter & Exists(sub_qs.filter(fe_code_niv_org4__in=org_codes)))
|
||||
| (cat_filter & Exists(sub_qs.filter(fe_code_niv_org4_mdr__in=org_codes)))
|
||||
)
|
||||
|
||||
def get_adm_filter_by_lvl4_codes_future_pcp(org_codes: Union[List[str], Tuple[str]]) -> Q:
|
||||
"""
|
||||
Crée un filtre pour récupérer les administrés liés à un futur gestionnaire PCP à partir de codes de référentiels de niveau 4.
|
||||
|
||||
:param org_codes: codes de référentiels de niveau 4
|
||||
:type org_code: str
|
||||
|
||||
:return: filtre pour récupérer les administrés
|
||||
:rtype: class:`Q`
|
||||
"""
|
||||
Cols = Administres_Pams.Cols
|
||||
cat_filter = Q(**{f'{Cols.REL_ADMINISTRE}__{Administre.Cols.CATEGORIE}': 'MDR'})
|
||||
fe_code_path = f'{Cols.REL_DECISION}__{Decision.Cols.REL_POSTE}__{Poste.Cols.REL_FORMATION_EMPLOI}'
|
||||
sub_qs = FormationEmploi.objects.filter(fe_code=OuterRef(fe_code_path))
|
||||
return (Q(**{f'{fe_code_path}__isnull': False}) & (
|
||||
(~cat_filter & Exists(sub_qs.filter(fe_code_niv_org4__in=org_codes)))
|
||||
| (cat_filter & Exists(sub_qs.filter(fe_code_niv_org4_mdr__in=org_codes)))
|
||||
))
|
||||
|
||||
def get_poste_filter_by_lvl4_codes_fil(org_codes: Union[List[str], Tuple[str]]) -> Q:
|
||||
"""
|
||||
Crée un filtre pour récupérer les postes liés à un gestionnaire fil à partir de codes de référentiels de niveau 4.
|
||||
|
||||
:param org_codes: codes de référentiels de niveau 4
|
||||
:type org_code: str
|
||||
|
||||
:return: filtre pour récupérer les postes
|
||||
:rtype: class:`Q`
|
||||
"""
|
||||
Cols = Poste.Cols
|
||||
Cols_Pam = Postes_Pams.Cols
|
||||
return Q(Exists(
|
||||
RefSvFil.objects
|
||||
.filter(ref_sv_fil_code__in=org_codes)
|
||||
.filter(sous_vivier_id=OuterRef(f'{Cols_Pam.REL_POSTE}__{Cols.M2M_SOUS_VIVIERS}'))
|
||||
))
|
||||
|
||||
def get_poste_filter_by_lvl4_codes_pcp(org_codes: Union[List[str], Tuple[str]]) -> Q:
|
||||
"""
|
||||
Crée un filtre pour récupérer les postes liés à un gestionnaire PCP à partir de codes de référentiels de niveau 4.
|
||||
|
||||
:param org_codes: codes de référentiels de niveau 4
|
||||
:type org_code: str
|
||||
|
||||
:return: filtre pour récupérer les postes
|
||||
:rtype: class:`Q`
|
||||
"""
|
||||
Cols = Poste.Cols
|
||||
Cols_Pam = Postes_Pams.Cols
|
||||
cat_filter = Q(**{f'{Cols_Pam.REL_POSTE}__{Cols.CATEGORIE}': 'MDR'})
|
||||
sub_qs = FormationEmploi.objects.filter(fe_code=OuterRef(f'{Cols_Pam.REL_POSTE}__{Cols.REL_FORMATION_EMPLOI}'))
|
||||
return (
|
||||
(~cat_filter & Exists(sub_qs.filter(fe_code_niv_org4__in=org_codes)))
|
||||
| (cat_filter & Exists(sub_qs.filter(fe_code_niv_org4_mdr__in=org_codes)))
|
||||
)
|
||||
|
||||
def get_profile_summary(user: Optional[CustomUser]) -> ProfileSummary:
|
||||
"""
|
||||
Renvoie le résumé des profils de l'utilisateur.
|
||||
|
||||
:param user: utilisateur
|
||||
:type user: class:`CustomUser`
|
||||
|
||||
:return: résumé des profils
|
||||
:rtype: class:`ProfileSummary`
|
||||
"""
|
||||
if not user or not user.is_authenticated:
|
||||
return ProfileSummary()
|
||||
|
||||
org = (get_queryset_org_by_user(user)
|
||||
.only('pk', 'ref_org_droit_lect', 'ref_org_droit_ecr', 'ref_org_ref_fe', 'ref_org_ref_sv_fil', 'ref_org_expert_hme', 'ref_org_bvt', 'ref_org_itd')
|
||||
.first())
|
||||
|
||||
is_super = user.is_superuser
|
||||
is_read = org and is_truthy(org.ref_org_droit_lect)
|
||||
is_write = org and is_truthy(org.ref_org_droit_ecr)
|
||||
is_fil = org and is_truthy(org.ref_org_ref_sv_fil)
|
||||
is_hme = org and is_truthy(org.ref_org_expert_hme)
|
||||
is_bvt = org and is_truthy(org.ref_org_bvt)
|
||||
is_itd = org and is_truthy(org.ref_org_itd)
|
||||
is_pcp = org and is_truthy(org.ref_org_ref_fe)
|
||||
# logger.debug('user %s => superuser: %s, READ: %s, WRITE: %s, FIL: %s, PCP: %s, BVT: %s, ITD: %s, HME: %s', user.pk, is_super, is_read, is_write, is_fil, is_pcp, is_bvt, is_itd, is_hme)
|
||||
|
||||
profiles = []
|
||||
if is_super:
|
||||
profiles.append(Profiles.SUPER)
|
||||
if is_fil:
|
||||
profiles.append(Profiles.FILIERE)
|
||||
if is_pcp:
|
||||
profiles.append(Profiles.PCP)
|
||||
if is_bvt:
|
||||
profiles.append(Profiles.BVT)
|
||||
if is_itd:
|
||||
profiles.append(Profiles.ITD)
|
||||
if is_hme:
|
||||
profiles.append(Profiles.HME)
|
||||
|
||||
|
||||
return ProfileSummary(org_code=org.pk if org else None, profiles={
|
||||
KEY_READ: tuple(profiles) if is_read else (Profiles.SUPER,) if is_super else (),
|
||||
KEY_WRITE: tuple(profiles) if is_write else (Profiles.SUPER,) if is_super else(),
|
||||
})
|
||||
|
||||
def _group_adm_by_profile(org_code: Optional[str], administres: Tuple[Administres_Pams], profiles: Tuple[Profiles]) -> Dict[Profiles, Tuple[int]]:
|
||||
"""
|
||||
Groupe les administrés liés à certains profils, uniquement parmi les administrés en paramètre.
|
||||
Il ne s'agit pas d'un partitionnement car un même administré peut être lié à plusieurs profils.
|
||||
|
||||
:param org_code: code de référentiel (de niveau 1 à 4)
|
||||
:type org_code: str
|
||||
|
||||
:param administres: administrés à grouper
|
||||
:type administres: Tuple[Administres_Pams]
|
||||
|
||||
:param profiles: profils utilisés
|
||||
:type profiles: Tuple[Profiles]
|
||||
|
||||
:return: dictionnaire {<profil>: <ID d'administrés>}
|
||||
:rtype: Dict[Profiles, Tuple[int]]
|
||||
"""
|
||||
A = Administre.Cols
|
||||
AP = Administres_Pams.Cols
|
||||
D = Decision.Cols
|
||||
P = Profiles
|
||||
|
||||
adm_ids_fil = ()
|
||||
adm_ids_bvt = ()
|
||||
adm_ids_pcp = []
|
||||
adm_ids_current_pcp = []
|
||||
adm_ids_future_pcp = ()
|
||||
|
||||
if org_code:
|
||||
is_fil = P.FILIERE in profiles
|
||||
is_bvt = P.BVT in profiles
|
||||
is_pcp = P.PCP in profiles
|
||||
if is_fil or is_bvt or is_pcp:
|
||||
adm_by_id = {adm.pk: adm for adm in administres}
|
||||
qs = Administres_Pams.objects
|
||||
if is_bvt:
|
||||
adm_ids_bvt = tuple(
|
||||
qs.filter(pk__in=adm_by_id.keys())
|
||||
.filter(Q(**{f'{AP.REL_ADMINISTRE}__{A.REL_SOUS_VIVIER}': 'BVT'})).values_list('pk', flat=True))
|
||||
if is_fil or is_pcp:
|
||||
codes_lvl4 = get_lvl4_org_codes_by_any_code(org_code)
|
||||
if codes_lvl4:
|
||||
if is_fil:
|
||||
adm_ids_fil = tuple(
|
||||
qs.filter(pk__in=adm_by_id.keys())
|
||||
.filter(get_adm_filter_by_lvl4_codes_fil(codes_lvl4)).values_list('pk', flat=True))
|
||||
if is_pcp:
|
||||
raw_adm_ids_current_pcp = set(
|
||||
qs.filter(pk__in=adm_by_id.keys())
|
||||
.filter(get_adm_filter_by_lvl4_codes_pcp(codes_lvl4)).values_list('pk', flat=True))
|
||||
for adm_id in raw_adm_ids_current_pcp:
|
||||
adm = adm_by_id.get(adm_id)
|
||||
fe_avant = safe_rgetattr(adm, f'{AP.REL_ADMINISTRE}.{A.REL_FORMATION_EMPLOI}_id')
|
||||
fe_apres = safe_rgetattr(adm, f'{AP.REL_DECISION}.{D.REL_POSTE}.{Poste.Cols.REL_FORMATION_EMPLOI}_id')
|
||||
if fe_apres is None or fe_avant == fe_apres:
|
||||
# pas de décision ou même FE : juste PCP
|
||||
adm_ids_pcp.append(adm_id)
|
||||
else:
|
||||
# sinon PCP_ACTUEL
|
||||
adm_ids_current_pcp.append(adm_id)
|
||||
adm_ids_future_pcp = tuple(
|
||||
qs.filter(pk__in=[adm_id for adm_id in adm_by_id.keys() if adm_id not in adm_ids_pcp])
|
||||
.filter(get_adm_filter_by_lvl4_codes_future_pcp(codes_lvl4)).values_list('pk', flat=True))
|
||||
return {
|
||||
P.FILIERE: adm_ids_fil,
|
||||
P.BVT: adm_ids_bvt,
|
||||
P.PCP: tuple(adm_ids_pcp),
|
||||
P.PCP_ACTUEL: tuple(adm_ids_current_pcp),
|
||||
P.PCP_FUTUR: tuple(adm_ids_future_pcp)
|
||||
}
|
||||
|
||||
def _get_profiles_for_adm(adm_id: int, global_profiles: Tuple[Profiles], adm_ids_by_profile: Dict[Profiles, Tuple[int]]) -> Tuple[Profiles]:
|
||||
"""
|
||||
Renvoie les profils qui concernent l'administré. Ils sont différents des profils globaux car un utilisateur qui a le profil PCP
|
||||
n'est pas forcément PCP pour tous les administrés, il peut être PCP_ACTUEL et/ou PCP_FUTUR en fonction de l'administré.
|
||||
note : Profiles.SUPER est ignoré car il n'a pas de sens dans ce contexte
|
||||
|
||||
:param adm_id: ID de l'administré
|
||||
:type adm_id: int
|
||||
|
||||
:param global_profiles: profils globaux, ceux qu'on trouve dans un class:`ProfileSummary`.
|
||||
:type global_profiles: Tuple[Administre]
|
||||
|
||||
:param adm_ids_by_profile: IDs d'administrés groupés par profil, dictionnaire {<profil>: <ID d'administrés>}
|
||||
:type adm_ids_by_profile: Dict[Profiles, Tuple[int]]
|
||||
|
||||
:return: profils qui concernent l'administré
|
||||
:rtype: Tuple[Profiles]
|
||||
"""
|
||||
P = Profiles
|
||||
profiles = set()
|
||||
if P.HME in global_profiles:
|
||||
profiles.add(P.HME)
|
||||
if P.FILIERE in global_profiles and adm_id in adm_ids_by_profile.get(P.FILIERE):
|
||||
profiles.add(P.FILIERE)
|
||||
if P.BVT in global_profiles and adm_id in adm_ids_by_profile.get(P.BVT):
|
||||
profiles.add(P.BVT)
|
||||
if P.PCP in global_profiles:
|
||||
if adm_id in adm_ids_by_profile.get(P.PCP):
|
||||
profiles.add(P.PCP)
|
||||
if adm_id in adm_ids_by_profile.get(P.PCP_ACTUEL):
|
||||
profiles.add(P.PCP_ACTUEL)
|
||||
if adm_id in adm_ids_by_profile.get(P.PCP_FUTUR):
|
||||
profiles.add(P.PCP_FUTUR)
|
||||
return tuple(profiles)
|
||||
|
||||
def get_profiles_by_adm(user: Optional[CustomUser], administre: Administres_Pams, *args: Administres_Pams) -> Dict[int, Dict[str, Tuple[Profiles]]]:
|
||||
"""
|
||||
Renvoie un dictionnaire dont les clés sont les ID SAP des administrés donnés et les valeurs sont les profils de l'utilisateur.
|
||||
|
||||
:param user: utilisateur
|
||||
:type user: class:`CustomUser`
|
||||
|
||||
:param administre: administré
|
||||
:type administre: class:`Administres_Pams`
|
||||
|
||||
:param args: administrés supplémentaires
|
||||
:type args: class:`Administres_Pams` (multiple)
|
||||
|
||||
:return: dictionnaire de dictionnaires {<ID SAP>: {<KEY_READ>: <profils>, <KEY_WRITE>: <profils>} }
|
||||
:rtype: Dict[int, Dict[str, Tuple[Profiles]]]
|
||||
"""
|
||||
values = (administre, *args)
|
||||
if not values:
|
||||
return {}
|
||||
|
||||
summary = get_profile_summary(user)
|
||||
r_profiles = summary.profiles.get(KEY_READ, ())
|
||||
w_profiles = summary.profiles.get(KEY_WRITE, ())
|
||||
|
||||
if not r_profiles and not w_profiles:
|
||||
return {adm.administre.pk: {KEY_READ: (), KEY_WRITE: ()} for adm in values}
|
||||
|
||||
same_profiles = sorted(set(r_profiles)) == sorted(set(w_profiles))
|
||||
org_code = summary.org_code
|
||||
r_adm_ids_by_profile = _group_adm_by_profile(org_code, values, r_profiles)
|
||||
w_adm_ids_by_profile = _group_adm_by_profile(org_code, values, w_profiles) if not same_profiles else r_adm_ids_by_profile
|
||||
|
||||
result = {}
|
||||
for adm in values:
|
||||
adm_id = adm.pk
|
||||
r_profiles_adm = _get_profiles_for_adm(adm_id, r_profiles, r_adm_ids_by_profile)
|
||||
w_profiles_adm = _get_profiles_for_adm(adm_id, w_profiles, w_adm_ids_by_profile) if not same_profiles else r_profiles_adm
|
||||
result.setdefault(adm_id, {
|
||||
KEY_READ: r_profiles_adm,
|
||||
KEY_WRITE: w_profiles_adm
|
||||
})
|
||||
return result
|
||||
Reference in New Issue
Block a user