Files
2022-11-08 21:19:51 +01:00

293 lines
12 KiB
Python

# --------------------------------------------------------------------------------
"""
Définitions communes à plusieurs vues
"""
# --------------------------------------------------------------------------------
from django.db.models import Q
from rest_framework.exceptions import APIException
from rest_framework.permissions import SAFE_METHODS, BasePermission
from ..models import Administre, Decision, Poste, SpecifiqueChoices, Administres_Pams, Postes_Pams
from ..utils.decorators import (class_logger, decorate_functions,
execution_time, query_count)
from ..utils.permissions import (Profiles, KEY_READ, KEY_WRITE,
get_profile_summary,
get_adm_filter_by_lvl4_codes_fil,
get_adm_filter_by_lvl4_codes_future_pcp,
get_adm_filter_by_lvl4_codes_pcp,
get_lvl4_org_codes_by_any_code,
get_poste_filter_by_lvl4_codes_fil,
get_poste_filter_by_lvl4_codes_pcp,
get_queryset_org_by_user, is_truthy)
from ..utils.view_predicates import viewset_functions
# décorateur pour tracer le temps d'exécution d'une vue
execution_time_viewset = decorate_functions(
lambda cls: execution_time(warn_after=5000, logger_name=cls),
viewset_functions,
factory=True
)
# décorateur pour tracer le nombre de requêtes d'une vue
query_count_viewset = decorate_functions(
lambda cls: query_count(warn_after=20, logger_name=cls),
viewset_functions,
factory=True
)
def api_exception(status_code: int, message: str = None) -> APIException:
"""
Crée une APIException avec un statut personnalisé.
note : inutile d'appeler cette fonction pour un code 500 car APIException(message) suffit
:param status_code: code de statut HTTP
:type status_code: int
:param message: message de l'exception
:type message: str, optional
:return: exception
:rtype: class:`APIException`
"""
return type('MyException', (APIException,), {'status_code': status_code, 'default_detail': message})
@class_logger
class GestionnairePermission(BasePermission):
"""
Classe de gestion des droits de lecture et d'écriture d'un gestionnaire
"""
message = "Le gestionnaire n'a pas les droits de lecture ou d'écriture"
def is_in_list_ok(self, list, list_ok): # A modif en utilisant set
"""
Cette fonction vérifie que tous les objets de la liste list soient dans la liste list_ok
:param list: liste 1
:type list: list
:param list: liste 2
:type list: list
:return: booléen indiquant si les objets de la liste 1 sont dans la liste 2
:rtype: bool
"""
for obj in list:
if obj not in list_ok:
return False
return True
def verif_droits_adm(self, pk, codes_lvl4, is_fil, is_pcp, is_bvt):
"""
Cette fonction vérifie si le gestionnaire a bien le droit de modifier (mettre à jour)
les attributs du ou des administres d'identifiant pk.
:param pk: id sap du ou des administrés édités
:type org_code: int ou list
:param codes_lvl4: codes de niveau 4 des référentiels organiques liés à l'utilisateur
:type codes_lvl4: Tuple[str]
:param is_fil: profil filiere
:type is_fil: bool
:param is_pcp: profil pcp
:type is_pcp: bool
:param is_bvt: profil bvt
:type is_bvt: bool
:return: booléen pour indiqué si l'utilisateur a les droits ou non sur l'édition
du ou des administrés concernés
:rtype: bool
"""
# On récupère le ou les administrés dont l'id match avec pk (liste ou non)
qs = Administre.objects.filter(a_id_sap__in=pk) if isinstance(pk, list) else Administre.objects.filter(a_id_sap=pk)
list_a = list(qs.values_list('pk', flat=True))
# On récupère le ou les administrés autorisés
adm_filter = None
if is_fil:
adm_filter = get_adm_filter_by_lvl4_codes_fil(codes_lvl4)
if is_pcp:
pcp_filter = get_adm_filter_by_lvl4_codes_pcp(codes_lvl4) | get_adm_filter_by_lvl4_codes_future_pcp(codes_lvl4)
adm_filter = adm_filter | pcp_filter if adm_filter else pcp_filter
if is_bvt:
bvt_filter = Q(**{f'{Administres_Pams.Cols.REL_ADMINISTRE}__{Administre.Cols.REL_SOUS_VIVIER}': 'BVT'})
adm_filter = adm_filter | bvt_filter if adm_filter else bvt_filter
list_a_ok = Administres_Pams.objects.filter(adm_filter).values_list('administre_id', flat=True)
perm = self.is_in_list_ok(list_a, list_a_ok)
msg = 'Administre rights verification ok\n' if perm else 'Administre rights verification not ok\n'
self.logger.debug(msg)
return perm
def verif_droits_poste(self, pk, codes_lvl4, is_fil, is_pcp, is_bvt, is_itd):
"""
Cette fonction vérifie si le gestionnaire a bien le droit de modifier (mettre à jour)
les attributs du ou des postes d'identifiant pk.
:param pk: id sap du ou des administrés édités
:type org_code: int ou list
:param codes_lvl4: codes de niveau 4 des référentiels organiques liés à l'utilisateur
:type codes_lvl4: Tuple[str]
:param is_fil: profil filiere
:type is_fil: bool
:param is_pcp: profil pcp
:type is_pcp: bool
:param is_bvt: profil bvt
:type is_bvt: bool
:param is_itd: profil itd
:type is_itd: bool
:return: booléen pour indiqué si l'utilisateur a les droits ou non sur l'édition
du ou des postes concernés
:rtype: bool
"""
# On récupère le ou les postes dont l'id match avec pk (liste ou non)
qs = Poste.objects.filter(p_id__in=pk) if isinstance(pk, list) else Poste.objects.filter(p_id=pk)
list_p = list(qs.values_list('pk', flat=True))
# On créer le filtre avec le ou les postes autorisés
poste_filter = None
if is_fil:
poste_filter = get_poste_filter_by_lvl4_codes_fil(codes_lvl4)
if is_pcp:
pcp_filter = get_poste_filter_by_lvl4_codes_pcp(codes_lvl4)
poste_filter = poste_filter | pcp_filter if poste_filter else pcp_filter
if is_itd:
itd_filter = Q(**{f'{Postes_Pams.Cols.REL_POSTE}__p_specifique' : SpecifiqueChoices.ITD})
poste_filter = poste_filter | itd_filter if poste_filter else itd_filter
if is_bvt:
bvt_filter = Q(**{f'{Postes_Pams.Cols.REL_POSTE}__{Poste.Cols.M2M_SOUS_VIVIERS}': 'BVT'})
poste_filter = poste_filter | bvt_filter if poste_filter else bvt_filter
list_p_ok = Postes_Pams.objects.filter(poste_filter).values_list('poste_id', flat=True)
perm = self.is_in_list_ok(list_p, list_p_ok)
msg = 'Poste rights verification ok\n' if perm else 'Poste rights verification not ok\n'
self.logger.debug(msg)
return perm
def commun_verif(self, request, view, obj=None):
"""
Partie commune de la vérification des droits de lecture aux fonctions has_permission et has_object_permission
"""
P = Profiles
user = request.user
if not user or not user.is_authenticated:
self.logger.debug('No user connected\n')
return False
is_super = user.is_superuser
if is_super and str(view) in ['ReportingView']:
self.logger.debug('Rights verification ok : the user is a superuser\n')
return True
summary = get_profile_summary(user)
profiles = summary.profiles
r_profiles = profiles.get(KEY_READ, ())
w_profiles = profiles.get(KEY_WRITE, ())
if not profiles:
self.logger.debug('The user as no profile\n')
return False
if request.method in SAFE_METHODS:
resp = True if (r_profiles and r_profiles!=(P.SUPER,)) else False
if not resp:
msg = 'The user does not have reading rights\n'
self.logger.debug(msg)
return resp
# La gestion de droits des administrateurs est faite via IsAdminUser pour les api d'alimentation
is_fil = P.FILIERE in w_profiles
is_pcp = P.PCP in w_profiles
is_bvt = P.BVT in w_profiles
is_itd = P.ITD in w_profiles
is_hme = P.HME in w_profiles
codes_lvl4 = get_lvl4_org_codes_by_any_code(summary.org_code)
if obj: # Modification d'une seule instance
if not w_profiles or w_profiles==(P.SUPER,):
self.logger.debug('The user does not have writing rights\n')
return False
if isinstance(obj, Administre):
pk = obj.pk
return self.verif_droits_adm(pk, codes_lvl4, is_fil, is_pcp, is_bvt)
elif isinstance(obj, Poste):
pk = obj.pk
return self.verif_droits_poste(pk, codes_lvl4, is_fil, is_pcp, is_bvt, is_itd)
elif isinstance(obj, Decision):
a_pk = obj.administre.pk
p_pk = obj.poste.pk
return self.verif_droits_poste(p_pk, codes_lvl4, is_fil, is_pcp, is_bvt, is_itd) or \
self.verif_droits_adm(a_pk, codes_lvl4, is_fil, is_pcp, is_bvt)
else: # Vérification globale ou modification multiple
self.logger.debug('---- Writing profiles ----')
self.logger.debug('Expert HME : %s', is_hme)
self.logger.debug('Pameur BMOB : %s', is_pcp)
self.logger.debug('Gestionnaire BVT : %s', is_bvt)
self.logger.debug('Gestionnaire ITD : %s', is_itd)
self.logger.debug('Gestionnaire BGCAT : %s', is_fil)
self.logger.debug('--------------------------')
if isinstance(request.data, list):
if not w_profiles or w_profiles==(P.SUPER,):
self.logger.debug('The user does not have writing rights\n')
return False
if 'a_id_sap' in request.data[0].keys():
pk = [adm['a_id_sap'] for adm in request.data]
return self.verif_droits_adm(pk, codes_lvl4, is_fil, is_pcp, is_bvt)
elif 'p_id' in request.data[0].keys():
pk = [p['p_id'] for p in request.data]
return self.verif_droits_poste(pk, codes_lvl4, is_fil, is_pcp, is_bvt, is_itd)
self.logger.debug('Global rights verification ok\n')
return True # On autorise une permission globale pour ensuite permettre des vérifications au niveau des instances avec has_object_permission
self.logger.debug('User rights verification failed\n')
return False
def has_permission(self, request, view):
if request.method != 'GET':
self.logger.debug('------------------------- Permissions verification ---------------------')
self.logger.debug('Request method : %s', request.method)
self.logger.debug('Request data : %s', request.data)
self.logger.debug('View : %s', view)
return self.commun_verif(request=request, view=view)
def has_object_permission(self, request, view, obj):
self.logger.debug('--------------------- Object permissions verification ------------------')
self.logger.debug('Request method : %s', request.method)
self.logger.debug('Request data : %s', request.data)
self.logger.debug('View : %s', view)
self.logger.debug('Object : %s', obj)
return self.commun_verif(request=request, view=view, obj=obj)