""" Ce module contient les Utilitaires d'insertion des données par groupe dans une table fixé """ # import des pré requis from ast import Mod import time import logging import pandas as pd import numpy as np from django.db.models import Choices, Model, Q from django.forms import model_to_dict from itertools import islice import math from django.contrib.auth import get_user_model from .models import Poste,Postes_Pams, Administre, Grade, MarquesGroupe, Garnison, SousVivier, Domaine, PAM, Administres_Pams,\ Filiere, FMOB, Fonction, FormationEmploi, Marque, Notation, PreferencesListe, SousVivierAssociation, \ ZoneGeographique, RefGest, RefOrg, RefSvFil, Diplome, Affectation, FUD, Administre_Notation, \ StatutPamChoices as StatutPam, CustomUser from typing import List, Optional, Tuple, Union # Definition genrique des fonctions suivantes from .utils import intOrNone, cleanString, generate_sv_id from .utils.permissions import get_lvl4_org_codes_by_any_code from .utils.alimentation_decorators import get_data_logger, data_perf_logger_factory from .utils.decorators import execution_time from .utils.insertion.commun import batch_iterator, is_same_model from .utils_extraction import APP_NAN logger = get_data_logger(__name__) # TODO: formaliser les logs # fonction d'insertion dans la table marques def insert_PAM(df): """ Insertion des données de la table Marque dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ liste_create = [] liste_update = [] ModelType = PAM update_header = ['pam_date','pam_libelle','pam_statut'] for i in range(df.shape[0]): pams = ModelType.objects.filter(pam_id=df.at[i,'pam_id']) pam = PAM(pam_id=df.at[i, 'pam_id'],pam_date=df.at[i, 'pam_date'], pam_libelle=df.at[i, 'pam_libelle'], pam_statut=df.at[i, 'pam_statut']) if pam.pam_id in pams.values_list('pam_id', flat = True): liste_update.append(pam) else: cloture=PAM.objects.filter(pam_statut="PAM en cours") cloture.delete() liste_create.append(pam) if liste_create: PAM.objects.bulk_create(liste_create) if liste_update: PAM.objects.bulk_update(liste_update, fields=update_header) return 1 # fonction d'insertion dans la table Grade def insert_Grade(): """ Insertion des données de la table Grade dans la base. Cette fonction a été écrite en dur :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ liste_create = [] grades = [ { "id": "SDT", "code": "SDT", "categorie": "Militaires du rang", "ordre": "100" }, { "id": "SDT TA", "code": "SDT TA", "categorie": "Militaires du rang", "ordre": "101" }, { "id": "SDT PF", "code": "SDT PF", "categorie": "Militaires du rang", "ordre": "102" }, { "id": "1CL", "code": "1CL", "categorie": "Militaires du rang", "ordre": "110" }, { "id": "1CL TA", "code": "1CL TA", "categorie": "Militaires du rang", "ordre": "111" }, { "id": "1CL PF", "code": "1CL PF", "categorie": "Militaires du rang", "ordre": "112" }, { "id": "CPL", "code": "CPL", "categorie": "Militaires du rang", "ordre": "120" }, { "id": "CPL TA", "code": "CPL TA", "categorie": "Militaires du rang", "ordre": "121" }, { "id": "CPL PF", "code": "CPL PF", "categorie": "Militaires du rang", "ordre": "122" }, { "id": "CCH", "code": "CCH", "categorie": "Militaires du rang", "ordre": "130" }, { "id": "CCH TA", "code": "CCH TA", "categorie": "Militaires du rang", "ordre": "131" }, { "id": "CCH PF", "code": "CCH PF", "categorie": "Militaires du rang", "ordre": "132" }, { "id": "CC1", "code": "CC1", "categorie": "Militaires du rang", "ordre": "140" }, { "id": "CC1 TA", "code": "CC1 TA", "categorie": "Militaires du rang", "ordre": "141" }, { "id": "CC1 PF", "code": "CC1 PF", "categorie": "Militaires du rang", "ordre": "142" }, { "id": "SGT", "code": "SGT", "categorie": "Sous-officiers", "ordre": "210" }, { "id": "SGT TA", "code": "SGT TA", "categorie": "Sous-officiers", "ordre": "211" }, { "id": "SGT PF", "code": "SGT PF", "categorie": "Sous-officiers", "ordre": "212" }, { "id": "SCH", "code": "SCH", "categorie": "Sous-officiers", "ordre": "220" }, { "id": "SCH TA", "code": "SCH TA", "categorie": "Sous-officiers", "ordre": "221" }, { "id": "SCH PF", "code": "SCH PF", "categorie": "Sous-officiers", "ordre": "222" }, { "id": "ADJ", "code": "ADJ", "categorie": "Sous-officiers", "ordre": "230" }, { "id": "ADJ TA", "code": "ADJ TA", "categorie": "Sous-officiers", "ordre": "231" }, { "id": "ADJ PF", "code": "ADJ PF", "categorie": "Sous-officiers", "ordre": "232" }, { "id": "ADC", "code": "ADC", "categorie": "Sous-officiers", "ordre": "240" }, { "id": "ADC TA", "code": "ADC TA", "categorie": "Sous-officiers", "ordre": "241" }, { "id": "ADC TS", "code": "ADC TS", "categorie": "Sous-officiers", "ordre": "242" }, { "id": "ADC PF", "code": "ADC PF", "categorie": "Sous-officiers", "ordre": "243" }, { "id": "MAJ", "code": "MAJ", "categorie": "Sous-officiers", "ordre": "250" }, { "id": "MAJ TA", "code": "MAJ TA", "categorie": "Sous-officiers", "ordre": "251" }, { "id": "MAJ PF", "code": "MAJ PF", "categorie": "Sous-officiers", "ordre": "252" }, { "id": "ASP", "code": "ASP", "categorie": "Officiers subalternes", "ordre": "310" }, { "id": "ASP TA", "code": "ASP TA", "categorie": "Officiers subalternes", "ordre": "311" }, { "id": "ASP PF", "code": "ASP PF", "categorie": "Officiers subalternes", "ordre": "312" }, { "id": "SLT", "code": "SLT", "categorie": "Officiers subalternes", "ordre": "320" }, { "id": "SLT TA", "code": "SLT TA", "categorie": "Officiers subalternes", "ordre": "321" }, { "id": "SLT PF", "code": "SLT PF", "categorie": "Officiers subalternes", "ordre": "322" }, { "id": "EOX", "code": "EOX", "categorie": "Officiers subalternes", "ordre": "327" }, { "id": "LTN", "code": "LTN", "categorie": "Officiers subalternes", "ordre": "330" }, { "id": "LTN TA", "code": "LTN TA", "categorie": "Officiers subalternes", "ordre": "331" }, { "id": "LTN PF", "code": "LTN PF", "categorie": "Officiers subalternes", "ordre": "332" }, { "id": "CNE", "code": "CNE", "categorie": "Officiers subalternes", "ordre": "340" }, { "id": "CNE TA", "code": "CNE TA", "categorie": "Officiers subalternes", "ordre": "341" }, { "id": "CNE PF", "code": "CNE PF", "categorie": "Officiers subalternes", "ordre": "342" }, { "id": "CNE TS", "code": "CNE TS", "categorie": "Officiers subalternes", "ordre": "343" }, { "id": "CASP", "code": "CASP", "categorie": "Officiers subalternes", "ordre": "390" }, { "id": "CASP TA", "code": "CASP TA", "categorie": "Officiers subalternes", "ordre": "390" }, { "id": "CASP PF", "code": "CASP PF", "categorie": "Officiers subalternes", "ordre": "390" }, { "id": "CR3", "code": "CR3", "categorie": "Officiers subalternes", "ordre": "391" }, { "id": "CR3 TA", "code": "CR3 TA", "categorie": "Officiers subalternes", "ordre": "391" }, { "id": "CR3 PF", "code": "CR3 PF", "categorie": "Officiers subalternes", "ordre": "391" }, { "id": "CR2", "code": "CR2", "categorie": "Officiers subalternes", "ordre": "392" }, { "id": "CR2 TA", "code": "CR2 TA", "categorie": "Officiers subalternes", "ordre": "392" }, { "id": "CR2 PF", "code": "CR2 PF", "categorie": "Officiers subalternes", "ordre": "392" }, { "id": "CR1", "code": "CR1", "categorie": "Officiers subalternes", "ordre": "393" }, { "id": "CR1 TA", "code": "CR1 TA", "categorie": "Officiers subalternes", "ordre": "393" }, { "id": "CR1 PF", "code": "CR1 PF", "categorie": "Officiers subalternes", "ordre": "393" }, { "id": "CDT", "code": "CDT", "categorie": "Officiers supérieurs", "ordre": "400" }, { "id": "CDT PF", "code": "CDT PF", "categorie": "Officiers supérieurs", "ordre": "401" }, { "id": "CDT TA", "code": "CDT TA", "categorie": "Officiers supérieurs", "ordre": "402" }, { "id": "LCL", "code": "LCL", "categorie": "Officiers supérieurs", "ordre": "410" }, { "id": "LCL TA", "code": "LCL TA", "categorie": "Officiers supérieurs", "ordre": "411" }, { "id": "LCL PF", "code": "LCL PF", "categorie": "Officiers supérieurs", "ordre": "412" }, { "id": "COL", "code": "COL", "categorie": "Officiers supérieurs", "ordre": "420" }, { "id": "COL PF", "code": "COL PF", "categorie": "Officiers supérieurs", "ordre": "421" }, { "id": "COL TA", "code": "COL TA", "categorie": "Officiers supérieurs", "ordre": "422" } ] for i in range(len(grades)): grade = Grade.objects.filter(gr_code=grades[i]['code']) grade_create = Grade(gr_code=grades[i]['code'], gr_categorie=grades[i]['categorie'], gr_ordre=grades[i]['ordre']) if grade.exists(): # Update grade.update(gr_categorie=grades[i]['categorie'], gr_ordre=grades[i]['ordre']) else: liste_create.append(grade_create) Grade.objects.bulk_create(liste_create) return 1 def update_domaine(df): """ Met à jour la table des domaines à partir du DataFrame. :param df: données :type df: class:`pandas.DataFrame` """ ModelType = Domaine Cols = ModelType.Cols col_pk = Cols.PK fields_to_update = () models_in_db = {m.pk: m for m in ModelType.objects.only('pk', *fields_to_update)} batch_size = 100 dict_create = {} dict_update = {} dict_up_to_date = {} error_count = 0 for idx, rec in enumerate(df.to_dict('records')): pk = rec.get(col_pk) try: in_db = models_in_db.get(pk) model = ModelType(pk=pk, **{f: rec.get(f) for f in fields_to_update}) if not in_db: # model.full_clean(validate_unique=False) dict_create.setdefault(pk, model) elif not is_same_model(fields_to_update, in_db, model): # model.full_clean(validate_unique=False) dict_update.setdefault(pk, model) else: dict_up_to_date.setdefault(pk, model) except Exception: error_count = error_count + 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', ModelType.__name__, idx, pk) if error_count: logger.warning("%s(s) en erreur : %s", ModelType.__name__, error_count) if dict_create: ModelType.objects.bulk_create(dict_create.values(), batch_size=batch_size) logger.info('%s(s) créé(s) : %s', ModelType.__name__, len(dict_create)) if fields_to_update: if dict_update: ModelType.objects.bulk_update(dict_update.values(), batch_size=batch_size, fields=fields_to_update) logger.info('%s(s) mis(s) à jour : %s', ModelType.__name__, len(dict_update)) if dict_up_to_date: logger.info('%s(s) déjà à jour : %s', ModelType.__name__, len(dict_up_to_date)) deleted = ModelType.objects.filter(~Q(pk__in={*dict_create.keys(), *dict_update.keys(), *dict_up_to_date.keys()})).delete()[0] if deleted: logger.info('%s(s) supprimé(s) : %s', ModelType.__name__, deleted) # fonction d'insertion dans la table Filiere def insert_Filiere(df): """ Insertion des données de la table Filiere dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ ModelType = Filiere liste_create = [] liste_update = [] update_header = ['domaine_id'] error_count = 0 fil_dict = {o.f_code: o for o in ModelType.objects.all()} dom_ids = set(Domaine.objects.values_list('pk', flat=True)) for i in range(df.shape[0]): f_code = df.at[i, 'f_code'] try: filiere = ModelType.objects.filter(f_code=f_code) filiere_create = ModelType(f_code=f_code, domaine_id=df.at[i, 'domaine_id']) dom_id = df.at[i,'domaine_id'] if dom_id not in dom_ids: logger.warning("%s[pk=%s] filière ignorée car le domaine d'id '%s' est absent de la base", Filiere.__name__, f_code, dom_id) continue if filiere.exists(): fil_pop = fil_dict.pop(f_code) liste_update.append(filiere_create) else: liste_create.append(filiere_create) except Exception: error_count = error_count + 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', Filiere.__name__, i, f_code) if error_count: logger.warning('%s(s) en erreur : %s',ModelType.__name__, error_count) if liste_create: ModelType.objects.bulk_create(liste_create) logger.info('%s(s) créé(s) : %s', ModelType.__name__, len(liste_create)) if liste_update and update_header: ModelType.objects.bulk_update(liste_update, fields=update_header) logger.info('%s(s) mis à jour: %s', ModelType.__name__, len(liste_update)) deleted = ModelType.objects.filter(pk__in=fil_dict.keys()).delete()[0] if deleted: logger.info('%s(s) supprimé(s) : %s', ModelType.__name__, deleted) return len(liste_create), len(liste_update), error_count, deleted # fonction d'insertion dans la table Sous viviers def insert_SousVivier(df: pd.DataFrame) -> None: """ Insertion des données du Sous Vivier dans la base :param df: Dataframe contenant les données pretraitées à inserer :type df: class:`pandas.DataFrame` """ ModelType = SousVivier Cols = SousVivier.Cols update_header = [ Cols.LIBELLE, Cols.DOMAINE, Cols.FILIERE, Cols.CATEGORIE ] list_create = [] list_update = [] error_count = 0 sv_dict = {o.sv_id: o for o in ModelType.objects.all()} for i in range(df.shape[0]): sv_id = df.at[i, 'sv_id'] try: sous_viviers = ModelType.objects.filter(sv_id=sv_id) sous_vivier = ModelType(sv_id=sv_id, sv_libelle=df.at[i, 'sv_libelle'], sv_dom=df.at[i, 'sv_dom'], sv_fil=df.at[i, 'sv_fil'], sv_cat=df.at[i, 'sv_cat']) sous_vivier.full_clean(validate_unique=False) if sous_viviers.exists(): sous_vivier_pop = sv_dict.pop(sv_id) list_update.append(sous_vivier) else: list_create.append(sous_vivier) except Exception: error_count = error_count + 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', ModelType.__name__, i, sv_id) sv_bvt_id = 'BVT' sv_bvt = ModelType(sv_id=sv_bvt_id, sv_libelle=sv_bvt_id) sv_bvt.full_clean(validate_unique=False) if ModelType.objects.filter(sv_id=sv_bvt_id).exists(): sous_vivier_pop = sv_dict.pop(sv_bvt_id) list_update.append(sv_bvt) else: list_create.append(sv_bvt) if error_count: logger.warning("%s(s) en erreur : %s", ModelType.__name__, error_count) if list_create: ModelType.objects.bulk_create(list_create) logger.info('%s(s) créé(s) : %s', ModelType.__name__, len(list_create)) if list_update and update_header: ModelType.objects.bulk_update(list_update, fields=update_header) logger.info('%s(s) mis à jour : %s', ModelType.__name__, len(list_update)) deleted = ModelType.objects.filter(pk__in=sv_dict.keys()).delete()[0] if deleted: logger.info('%s(s) supprimé(s) : %s', ModelType.__name__, deleted) return len(list_create), len(list_update), error_count, deleted def insert_SousVivier_instances(): """ Insertion des données sous-viviers dans les tables des postes et des adminsitrés :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ Cols = SousVivier.Cols list_sv = SousVivier.objects.values_list( Cols.PK, Cols.DOMAINE, Cols.FILIERE, Cols.CATEGORIE ) sv_bvt_id = 'BVT' for (sv_id, dom, fil, cat) in list_sv: if sv_id != sv_bvt_id: a_qs = Administre.objects.filter(a_domaine_id=dom, a_filiere_id=fil, a_categorie=cat) p_qs = Poste.objects.filter(p_domaine_id=dom, p_filiere_id=fil, p_categorie=cat) a_qs.update(sous_vivier_id=sv_id) list_postes = list(p_qs.values_list('p_id', flat=True)) SousVivier.objects.get(sv_id=sv_id).poste.set(list_postes) if sv_bvt_id in np.array(list_sv)[:,0]: a_bvt_qs = Administre.objects.filter(a_domaine_gestion=sv_bvt_id) p_bvt_qs = Poste.objects.filter(p_nfs=sv_bvt_id) a_bvt_qs.update(sous_vivier_id=sv_bvt_id) list_postes_bvt = list(p_bvt_qs.values_list('p_id', flat=True)) SousVivier.objects.get(sv_id=sv_bvt_id).poste.set(list_postes_bvt) return 1 # fonction d'insertion dans la table RefGest def insert_RefGest(df: pd.DataFrame) -> None: """ Insertion des données de la table RefGest dans la base :param df: Dataframe contenant les données pretraitées à inserer :type df: class:`pandas.DataFrame` """ ModelType = RefGest UserType = get_user_model() list_create_ref_gest = [] list_update_ref_gest = [] list_tokeep_ref_gest = [] ref_gest_update_header = ['ref_gest_username', 'ref_gest_email', 'ref_gest_first_name', 'ref_gest_last_name', 'ref_gest_grade', 'ref_gest_niv_org', 'ref_gest_org_id'] dict_create_user = {} dict_update_user = {} users_to_keep = set() user_update_header = ['username', 'is_superuser', 'is_staff', 'first_name', 'last_name', 'email', 'grade'] users_in_db_by_id = {} users_in_db_by_username = {} for o in UserType.objects.all(): users_in_db_by_id[o.id] = o users_in_db_by_username[o.username] = o def add_user_to_dict(user_id: int, username: str) -> Tuple[bool, Optional[UserType], bool]: """ Vérifie l'existence d'un utilisateur en base (par ID ou par username) et ajoute si nécessaire un utilisateur dans le bon dictionnaire (création ou MAJ). Dans ce cas l'utilisateur est renvoyé par la fonction et il est possible de le modifier. Cas possibles : - l'utilisateur existe mais n'a pas le bon couple (ID, username) : renvoie (False, user, False) car c'est une donnée invalide - l'utilisateur existe avec le bon couple (ID, username) et est déjà dans le dictionnaire de MAJ : renvoie (True, None, False) - l'utilisateur existe avec le bon couple (ID, username) et n'est pas encore dans le dictionnaire de MAJ : renvoie (True, user, False) - l'utilisateur n'existe pas et est déjà dans le dictionnaire de création : renvoie (True, None, True) - l'utilisateur n'existe pas et n'est pas encore dans le dictionnaire de création : renvoie (True, user, True) :param user_id: ID de l'utilisateur :type user_id: int :param username: nom d'utilisateur :type username: str :return: tuple (valide, utilisateur, nouveau) :rtype: Tuple[bool, Optional[UserType], bool] """ user_in_db = users_in_db_by_id.get(user_id) or users_in_db_by_username.get(username) if user_in_db: expected = (user_id, username) actual = (user_in_db.id, user_in_db.username) if actual != expected: logger.warning('%s ignoré car le couple (id, username) ne correspond pas : %s au lieu de %s', UserType.__name__, actual, expected) return (False, user_in_db, False) if username not in dict_update_user: user = user_in_db dict_update_user[username] = user return (True, user, False) else: return (True, None, False) elif username not in dict_create_user: user = UserType(id=user_id, username=username) dict_create_user[username] = user return (True, user, True) return (True, None, True) error_count = 0 user_ignore_count = 0 for i in range(df.shape[0]): pk = int(df.at[i, 'ref_gest_sap']) try: ref_gest = ModelType.objects.filter(ref_gest_sap=pk) administre = Administre.objects.filter(a_id_sap=pk) username = df.at[i, 'ref_gest_username'].replace('\xa0', '') user_id = pk users_to_keep.add(pk) if pk not in list_tokeep_ref_gest: list_tokeep_ref_gest.append(pk) nom = df.at[i, 'ref_gest_last_name'] prenom = df.at[i, 'ref_gest_first_name'] if prenom == None: prenom = '' logger.warning("%s[pk=%s] le gestionnaire n'a pas de prénom", ModelType.__name__, pk) if nom == None: nom = '' logger.warning("%s[pk=%s] le gestionnaire n'a pas de nom", ModelType.__name__, pk) ref_gest_create = ModelType(ref_gest_sap=pk, ref_gest_username=username, ref_gest_email=df.at[i, 'ref_gest_email'], ref_gest_first_name=prenom, ref_gest_last_name=nom, ref_gest_grade=df.at[i, 'ref_gest_grade'], ref_gest_niv_org=df.at[i, 'ref_gest_niv_org'], ref_gest_org_id=df.at[i, 'ref_gest_org_id']) if ref_gest.exists(): list_update_ref_gest.append(ref_gest_create) else: list_create_ref_gest.append(ref_gest_create) valid, user, is_new = add_user_to_dict(user_id, username) if not valid: user_ignore_count = user_ignore_count + 1 continue if user: user.first_name = prenom user.last_name = nom user.email = df.at[i, 'ref_gest_email'] user.grade = df.at[i, 'ref_gest_grade'] if is_new: user.set_password(str(user_id)) user.full_clean(validate_unique=False) except Exception: error_count = error_count + 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', ModelType.__name__, i, pk) if error_count: logger.warning("%s(s) en erreur : %s", ModelType.__name__, error_count) if user_ignore_count: logger.warning('%s(s) ignoré(s) : %s', UserType.__name__, user_ignore_count) admin_username = 'admin' try: valid, user, is_new = add_user_to_dict(0, admin_username) if user: users_to_keep.add(user.id) if not is_new: dict_update_user.pop(admin_username, None) elif valid: user.first_name = 'Admin' user.last_name = 'Ogure' user.is_superuser = True user.is_staff = True user.set_password('admin') user.full_clean(validate_unique=False) except Exception: logger.exception("Une erreur est survenue lors du traitement de l'administrateur %s", admin_username) batch_size = 100 if list_create_ref_gest: ModelType.objects.bulk_create(list_create_ref_gest, batch_size=batch_size) logger.info('%s(s) créé(s) : %s', ModelType.__name__, len(list_create_ref_gest)) if list_update_ref_gest and ref_gest_update_header: ModelType.objects.bulk_update(list_update_ref_gest, batch_size=batch_size, fields=ref_gest_update_header) logger.info('%s(s) mis à jour : %s', ModelType.__name__, len(list_update_ref_gest)) if dict_create_user: UserType.objects.bulk_create(dict_create_user.values(), batch_size=batch_size) logger.info('%s(s) créé(s) : %s', UserType.__name__, len(dict_create_user)) if dict_update_user and user_update_header: UserType.objects.bulk_update(dict_update_user.values(), batch_size=batch_size, fields=user_update_header) logger.info('%s(s) mis à jour : %s', UserType.__name__, len(dict_update_user)) deleted_ref_gest = ModelType.objects.filter(~Q(ref_gest_sap__in=list_tokeep_ref_gest)).delete()[0] if deleted_ref_gest: logger.info('%s(s) supprimé(s) : %s', ModelType.__name__, deleted_ref_gest) deleted = UserType.objects.filter(~Q(id__in=users_to_keep)).delete()[0] if deleted: logger.info('%s(s) supprimé(s) : %s', UserType.__name__, deleted) return len(list_create_ref_gest), len(list_update_ref_gest), error_count, deleted_ref_gest, len(dict_create_user), len(dict_update_user), user_ignore_count, deleted def insert_RefOrg(df: pd.DataFrame) -> None: """ Insertion des données de la table RefOrg dans la base :param df: Dataframe contenant les données pretraitées à inserer :type df: class:`pandas.DataFrame` """ ModelType = RefOrg list_create = [] list_update = [] ref_org_dict = {o.ref_org_code: o for o in ModelType.objects.all()} update_header = ['ref_org_code_niv_org1', 'ref_org_lib_niv_org1', 'ref_org_code_niv_org2', 'ref_org_lib_niv_org2', 'ref_org_code_niv_org3', 'ref_org_lib_niv_org3', 'ref_org_code_niv_org4', 'ref_org_lib_niv_org4', 'ref_org_niv_org', 'ref_org_ref_fe', 'ref_org_ref_sv_fil', 'ref_org_droit_lect', 'ref_org_droit_ecr', 'ref_org_expert_hme', 'ref_org_bvt', 'ref_org_itd'] error_count = 0 for i, row in df.iterrows(): ref_org_code = row['ref_org_code'] try: ref_org = ModelType.objects.filter(ref_org_code=ref_org_code) ref_org_create = ModelType(ref_org_code=ref_org_code, ref_org_code_niv_org1=row['ref_org_code_niv_org1'], ref_org_lib_niv_org1=row['ref_org_lib_niv_org1'], ref_org_code_niv_org2=row['ref_org_code_niv_org2'], ref_org_lib_niv_org2=row['ref_org_lib_niv_org2'], ref_org_code_niv_org3=row['ref_org_code_niv_org3'], ref_org_lib_niv_org3=row['ref_org_lib_niv_org3'], ref_org_code_niv_org4=row['ref_org_code_niv_org4'], ref_org_lib_niv_org4=row['ref_org_lib_niv_org4'], ref_org_niv_org=int(row['ref_org_niv_org']), ref_org_ref_fe=row['ref_org_ref_fe'], ref_org_ref_sv_fil=row['ref_org_ref_sv_fil'], ref_org_droit_lect=row['ref_org_droit_lect'], ref_org_droit_ecr=row['ref_org_droit_ecr'], ref_org_expert_hme=row['ref_org_expert_hme'], ref_org_bvt=row['ref_org_bvt'], ref_org_itd=row['ref_org_itd']) ref_org_create.full_clean(validate_unique=False) if ref_org.exists(): ref_org_pop = ref_org_dict.pop(ref_org_code) list_update.append(ref_org_create) else: list_create.append(ref_org_create) except Exception: error_count = error_count + 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', ModelType.__name__, i, ref_org_code) if error_count: logger.warning("%s(s) en erreur : %s", ModelType.__name__, error_count) if list_create: ModelType.objects.bulk_create(list_create) logger.info('%s(s) créé(s) : %s', ModelType.__name__, len(list_create)) if list_update and update_header: ModelType.objects.bulk_update(list_update, fields=update_header) logger.info('%s(s) mis à jour : %s', ModelType.__name__, len(list_update)) deleted = ModelType.objects.filter(pk__in=ref_org_dict.keys()).delete()[0] if deleted: logger.info('%s(s) supprimé(s) : %s', ModelType.__name__, deleted) return len(list_create), len(list_update), error_count, deleted # fonction d'insertion dans la table RefSvFil def insert_RefSvFil(df): """ Insertion des données de la table RefSvFil dans la base :type df: dataframe :param df: Dataframe contenant les données pretraitées à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ ModelType = RefSvFil list_create = [] list_delete = [] ignore_count = 0 error_count = 0 ref_sv_fil_dict = dict((f"{o.ref_sv_fil_code}_{o.ref_sv_fil_dom_gest}_{o.ref_sv_fil_dom}_{o.ref_sv_fil_fil}_{o.ref_sv_fil_cat}_{o.sous_vivier_id}", o) for o in ModelType.objects.all()) sv_ids = list(SousVivier.objects.all().values_list('pk', flat=True)) for i, row in df.iterrows(): try: sv_id = row['sous_vivier_id'] if sv_id not in sv_ids: logger.warning("%s ignoré car le sous-vivier d'id '%s' n'est pas en base", ModelType.__name__, sv_id) ignore_count = ignore_count + 1 else: ref_sv_fil_id = f"{row['ref_sv_fil_code']}_{row['ref_sv_fil_dom_gest']}_{row['ref_sv_fil_dom']}_{row['ref_sv_fil_fil']}_{row['ref_sv_fil_cat']}_{sv_id}" if ref_sv_fil_id in ref_sv_fil_dict: ref_sv_fil_obj = ref_sv_fil_dict.pop(ref_sv_fil_id) else: dom = row['ref_sv_fil_dom'] fil = row['ref_sv_fil_fil'] cat = row['ref_sv_fil_cat'] ref_sv_fil = ModelType(ref_sv_fil_code=row['ref_sv_fil_code'], ref_sv_fil_dom_gest=row['ref_sv_fil_dom_gest'], ref_sv_fil_dom=dom, ref_sv_fil_fil=fil, ref_sv_fil_cat=cat, sous_vivier_id=sv_id) list_create.append(ref_sv_fil) except Exception: logger.exception('%s une erreur est survenue à la ligne : %s', ModelType.__name__, i) error_count = error_count + 1 list_delete = [ref_sv_fil_dict[key] for key in ref_sv_fil_dict.keys()] size_batch = 100 if ignore_count: logger.warning("%s(s) ignoré(s) : %s", ModelType.__name__, ignore_count) if error_count: logger.warning("%s(s) en erreur : %s", ModelType.__name__, error_count) if list_delete: logger.debug('%s(s) supprimé(s) : %s', ModelType.__name__, len(list_delete)) ModelType.objects.filter(id__in=[o.id for o in list_delete]).delete() if list_create: logger.debug("%s(s) créé(s) : %s", ModelType.__name__, len(list_create)) ModelType.objects.bulk_create(list_create, batch_size=size_batch) return len(list_create), len(list_delete), error_count, ignore_count def insert_RefFeMere(df_referentiel_fe): Cols = FormationEmploi.Cols col_pk = Cols.PK col_pk_mere = Cols.REL_MERE col_libelle_mere = Cols.LIBELLE col_zone_def = Cols.ZONE_DEFENSE def convertir_fe(df: pd.DataFrame) -> pd.DataFrame: """ Fonction de conversion du DataFrame de FE. :param df: DataFrame du référentiel FE :type df: class:`pandas.DataFrame` :return: DataFrame :rtype: class:`pandas.DataFrame` """ col_pk_avant = 'FE CREDO' # B col_pk_mere_avant = 'FE mère CREDO' # D col_libelle_mere_avant = 'FE mère LA' # E col_zone_def_avant = 'Zone de Défense' # O return ( df[[col_pk_avant, col_pk_mere_avant, col_libelle_mere_avant, col_zone_def_avant]] .drop_duplicates(subset=col_pk_avant, keep='first') .rename(columns={ col_pk_avant: col_pk, col_pk_mere_avant: col_pk_mere, col_libelle_mere_avant: col_libelle_mere, col_zone_def_avant: col_zone_def }) .astype({col_pk: 'str', col_pk_mere: 'str'}) ) def mettre_a_jour_fe(df: pd.DataFrame) -> None: """ Met à jour les FE base à partir du DataFrame de FE. :param df: DataFrame du référentiel FE :type df: class:`pandas.DataFrame` :return: DataFrame :rtype: class:`pandas.DataFrame` """ TypeModele = FormationEmploi Cols = TypeModele.Cols champs_maj = (Cols.REL_MERE, Cols.ZONE_DEFENSE) modeles_en_base = {m.pk: m for m in TypeModele.objects.select_related(Cols.REL_MERE).only('pk', *champs_maj)} taille_batch = 100 dict_update = {} dict_mere_create = {} for rec in df.to_dict('records'): pk = rec.get(col_pk) id_mere = rec.get(col_pk_mere) zone_defense = rec.get(col_zone_def) en_base = modeles_en_base.get(pk) # TODO pas de création pour l'instant (ni de suppression) quand en_base est falsy if en_base: mere = None if id_mere is not None: mere = modeles_en_base.get(id_mere) or dict_mere_create.get(id_mere) if not mere: try: # les FE mères manquantes seront créées mere = TypeModele(pk=id_mere, fe_libelle=rec.get(col_libelle_mere)) except Exception as e: raise RuntimeError(f'la création d\'un modèle de type "{TypeModele.__name__}" (mère) a échoué') from e dict_mere_create.setdefault(id_mere, mere) if mere != getattr(en_base, Cols.REL_MERE, None) or zone_defense != getattr(en_base, Cols.ZONE_DEFENSE, None): try: modele = TypeModele(pk=pk, mere=mere, zone_defense=zone_defense) except Exception as e: raise RuntimeError(f'la création d\'un modèle de type "{TypeModele.__name__}" a échoué') from e dict_update.setdefault(pk, modele) if dict_mere_create: TypeModele.objects.bulk_create(dict_mere_create.values(), batch_size=taille_batch) logger.debug(f"{TypeModele.__name__} créé(s): %s", len(dict_mere_create)) if dict_update and champs_maj: TypeModele.objects.bulk_update(dict_update.values(), batch_size=taille_batch, fields=champs_maj) logger.debug(f"{TypeModele.__name__} MAJ: %s", len(dict_update)) df_referentiel_fe = convertir_fe(df_referentiel_fe) mettre_a_jour_fe(df_referentiel_fe) return 1 # return len(dict_mere_create), len(dict_update) def update_m2m_links_gestionnaire(type: str) -> None: """ Met à jour tous les liens many-to-many entre les gestionnaires (= utilisateurs) et des FE ou des sous-viviers. Nécessite des données pour certains modèles : - commun : CustomUser, RefGest, RefOrg - FE : FormationEmploi - sous-vivier : SousVivier, RefSvFil :param type: type :type type: Literal['FE', 'SV'] """ UserType = get_user_model() if type == 'FE': ModelType = FormationEmploi LinkModelType = getattr(UserType, UserType.Cols.M2M_FORMATION_EMPLOIS).through def fn_linked_ids(lvl4_codes): return (ModelType.objects .filter(Q(fe_code_niv_org4__in=lvl4_codes) | Q(fe_code_niv_org4_mdr__in=lvl4_codes)) .values_list('pk', flat=True)) elif type == 'SV': ModelType = SousVivier LinkModelType = getattr(UserType, UserType.Cols.M2M_SOUS_VIVIERS).through user_bvts = list(RefGest.objects.filter(**{f'{RefGest.Cols.REL_ORG}_id__ref_org_bvt': True}).values_list('pk', flat=True)) def fn_linked_ids(lvl4_codes): return (RefSvFil.objects .filter(ref_sv_fil_code__in=lvl4_codes) .filter(**{f'{RefSvFil.Cols.REL_SOUS_VIVIER}_id__isnull': False}) .values_list(f'{RefSvFil.Cols.REL_SOUS_VIVIER}_id', flat=True) .distinct()) else: raise Exception(f'type invalide : {type}') org_codes_by_id = {o[0]: o[1] for o in RefGest.objects.values_list(RefGest.Cols.PK, RefGest.Cols.REL_ORG + '_id')} user_ids = list(UserType.objects.filter(pk__in=org_codes_by_id.keys()).values_list('pk', flat=True)) batch_size = 200 to_create = [] error_count = 0 sv_bvt_id = 'BVT' if 'BVT' in SousVivier.objects.all().values_list(SousVivier.Cols.PK, flat=True) else None lvl4_codes_by_code = {} # un cache for user_id, org_code in org_codes_by_id.items(): try: if user_id in user_ids: lvl4_codes = lvl4_codes_by_code.get(org_code) or lvl4_codes_by_code.setdefault(org_code, get_lvl4_org_codes_by_any_code(org_code)) for linked_id in fn_linked_ids(lvl4_codes): to_create.append(LinkModelType(**{f'{UserType.__name__.lower()}_id': user_id, f'{ModelType.__name__.lower()}_id': linked_id})) if type == 'SV': if user_id in user_bvts and sv_bvt_id: to_create.append(LinkModelType(**{f'{UserType.__name__.lower()}_id': user_id, f'{ModelType.__name__.lower()}_id': sv_bvt_id})) except Exception: error_count = error_count + 1 logger.exception("une erreur est survenue lors de l'ajout de lien(s) %s[pk=%s]/%s", UserType.__name__, user_id, ModelType.__name__) if error_count: logger.warning("%s(s) en erreur : %s", LinkModelType.__name__, error_count) deleted = LinkModelType.objects.all().delete()[0] if deleted: logger.info('lien(s) %s/%s supprimé(s) : %s', UserType.__name__, ModelType.__name__, deleted) if to_create: LinkModelType.objects.bulk_create(to_create, batch_size=batch_size) logger.info('lien(s) %s/%s créé(s) : %s', UserType.__name__, ModelType.__name__, len(to_create)) return 1 # fonction d'insertion dans la table diplome def insert_Diplome(df): """ Insertion des données de la table Diplome dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ liste_create = [] liste_update = [] liste_delete = [] update_header = ['diplome_note'] diplome_dict = dict ((f"{o.administre_id}_{o.diplome_libelle}_{o.diplome_date}", o) for o in Diplome.objects.all()) for i in range(df.shape[0]): dip_id = f"{df.at[i,'Matricule SAP']}_{df.at[i,'Diplôme militaire L']}_{df.at[i,'Diplôme militaire D']}" if dip_id in diplome_dict: diplome = diplome_dict.pop(dip_id) diplome.diplome_note = df.at[i, 'Diplôme militaire note'] liste_update.append(diplome) else: diplome = Diplome(administre_id = df.at[i,'Matricule SAP'], diplome_libelle=df.at[i, 'Diplôme militaire L'],diplome_date=df.at[i, 'Diplôme militaire D'], diplome_note=df.at[i, 'Diplôme militaire note']) liste_create.append(diplome) liste_delete = [diplome_dict[key] for key in diplome_dict.keys()] size_batch = 100 if liste_create: Diplome.objects.bulk_create(liste_create, batch_size=size_batch) if liste_update: Diplome.objects.bulk_update(liste_update, fields=update_header, batch_size=size_batch) if liste_delete: Diplome.objects.filter(id__in=[o.id for o in liste_delete]).delete() return len(liste_create), len(liste_update), len(liste_delete) # fonction d'insertion dans la table Affecation def insert_Affectation(df): """ Insertion des données de la table Affecation dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ liste_create = [] liste_delete = [] affectation_dict = dict ((f"{o.administre_id}_{o.affect_libelle}_{o.affect_date}", o) for o in Affectation.objects.all()) for i in range(df.shape[0]): affect_id = f"{df.at[i,'Matricule SAP']}_{df.at[i,'Affectation L']}_{df.at[i,'Affectation D']}" if affect_id in affectation_dict: affect_object = affectation_dict.pop(affect_id) else : affect = Affectation(administre_id = df.at[i, 'Matricule SAP'], affect_libelle=df.at[i, 'Affectation L'], affect_date=df.at[i, 'Affectation D']) liste_create.append(affect) liste_delete = [affectation_dict[key] for key in affectation_dict.keys()] size_batch = 100 if liste_create: Affectation.objects.bulk_create(liste_create, batch_size=size_batch) if liste_delete: Affectation.objects.filter(id__in=[o.id for o in liste_delete]).delete() return len(liste_create), len(liste_delete) # fonction d'insertion dans la table Affecation def insert_Fud(df): """ Insertion des données de la table FUD dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ liste_create = [] liste_delete = [] fud_dict = dict ((f"{o.administre_id}_{o.fud_libelle}_{o.fud_date_debut}_{o.fud_date_fin}", o) for o in FUD.objects.all()) for i in range(df.shape[0]): fud_id = f"{df.at[i,'Matricule SAP']}_{df.at[i,'FUD L']}_{df.at[i,'FUD DD']}_{df.at[i,'FUD DF']}" if fud_id in fud_dict: fud_object = fud_dict.pop(fud_id) else: fud = FUD(administre_id = df.at[i, 'Matricule SAP'], fud_libelle=df.at[i, 'FUD L'], fud_date_debut=df.at[i, 'FUD DD'], fud_date_fin=df.at[i, 'FUD DF']) liste_create.append(fud) liste_delete = [fud_dict[key] for key in fud_dict.keys()] size_batch = 100 if liste_create: FUD.objects.bulk_create(liste_create, batch_size=size_batch) if liste_delete: FUD.objects.filter(id__in=[o.id for o in liste_delete]).delete() return len(liste_create), len(liste_delete) # fonction d'insertion dans la table Garnison def insert_Garnison(df): """ Insertion des données de la table Garnison dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ liste_create = [] liste_update = [] error_count = 0 update_header = ['gar_lieu', 'gar_code_postal'] for i in range(df.shape[0]): try: garnisons = Garnison.objects.filter(gar_id=df.at[i, 'gar_id']) garnison = Garnison(gar_id=df.at[i, 'gar_id'], gar_lieu=df.at[i, 'gar_lieu'], gar_code_postal=df.at[i, 'gar_code_postal']) if garnisons.exists(): liste_update.append(garnison) else: liste_create.append(garnison) except: error_count +=1 raise Exception("Une erreur est survenue à la ligne : ", i) Garnison.objects.bulk_create(liste_create) Garnison.objects.bulk_update(liste_update, fields=update_header) return len(liste_create), len(liste_update), error_count # fonction d'insertion dans la table Formations emplois def insert_FormationEmploi(df: pd.DataFrame) -> None: """ Insertion des données de la table Formation Emploi dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer """ ModelType = FormationEmploi update_header = ['fe_garnison_lieu', 'fe_code_postal', 'fe_libelle', 'fe_mere_credo', 'fe_mere_la', 'fe_fot', 'fe_abo_fe', 'fe_pilier_niv1', 'fe_code_niv_org4', 'fe_niv_org4', 'fe_code_niv_org4_mdr', 'fe_niv_org4_mdr', 'fe_nb_poste_reo_mdr'] nb_ligne_charger = 0 nb_ligne_lu = 0 nb_ligne_erroner = 0 nb_ligne_ignorer = 0 liste_create = [] liste_update = [] error_count = 0 model_ids_in_db = set(ModelType.objects.values_list('pk', flat=True)) for i, row in df.iterrows(): pk = row['fe_credo'] nb_ligne_lu += 1 try: model = ModelType(**{ 'fe_code': pk, 'fe_garnison_lieu': row['fe_garnison_lieu'], 'fe_code_postal': row['fe_code_postal'], 'fe_libelle': row['fe_libelle'], 'fe_mere_credo': row['fe_mere_credo'], 'fe_mere_la': row['fe_mere_la'], 'fe_fot': row['fe_fot'], 'fe_abo_fe': row['fe_abo_fe'], 'fe_pilier_niv1': row['fe_pilier_niv1'], 'fe_code_niv_org4': row['fe_code_niv_org4'], 'fe_niv_org4': row['fe_niv_org4'], 'fe_code_niv_org4_mdr': row['fe_code_niv_org4_mdr'], 'fe_niv_org4_mdr': row['fe_niv_org4_mdr'] }) nb_ligne_charger += 1 if pk in model_ids_in_db: liste_update.append(model) else: liste_create.append(model) for attr in [ 'fe_nb_poste_reo_mdr', 'fe_nb_poste_reevalue_mdr', 'fe_nb_poste_vacant_mdr', 'fe_nb_poste_occupe_mdr', 'fe_nb_poste_reo_off', 'fe_nb_poste_reevalue_off', 'fe_nb_poste_vacant_off', 'fe_nb_poste_occupe_off', 'fe_nb_poste_reo_soff', 'fe_nb_poste_reevalue_soff', 'fe_nb_poste_vacant_soff', 'fe_nb_poste_occupe_soff', ]: setattr(model, attr, row[attr]) except Exception: error_count +=1 nb_ligne_erroner += 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', ModelType.__name__, i, pk) if nb_ligne_ignorer: logger.warning('%s(s) ignorée(s) : %s', ModelType.__name__, nb_ligne_ignorer) if nb_ligne_erroner: logger.warning("%s(s) en erreur : %s", ModelType.__name__, nb_ligne_erroner) batch_size = 100 if liste_create: ModelType.objects.bulk_create(liste_create, batch_size=batch_size) logger.info('%s(s) créée(s) : %s', ModelType.__name__, len(liste_create)) if liste_update and update_header: ModelType.objects.bulk_update(liste_update, batch_size=batch_size, fields=update_header) logger.info('%s(s) mise(s) à jour : %s', ModelType.__name__, len(liste_update)) return len(liste_create), len(liste_update), error_count # fonction d'insertion dans la table Fonction def insert_Fonction(df): """ Insertion des données de la table Fonction dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ liste_create = [] for i in range(df.shape[0]): fonction = Fonction.objects.filter(fon_id=int(df.at[i, 'fon_id'])) fonction_create = Fonction(fon_id=int(df.at[i, 'fon_id']), fon_libelle=df.at[i, 'fon_libelle']) if fonction.exists(): fonction.update(fon_libelle=df.at[i, 'fon_libelle']) else: liste_create.append(fonction_create) Fonction.objects.bulk_create(liste_create) return len(liste_create) # fonction d'insertion dans la table marques def insert_Marque(df): """ Insertion des données de la table Marque dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ liste_create = [] for i in range(df.shape[0]): marque = Marque.objects.filter(mar_id=df.at[i, 'mar_id']) marque_create = Marque(mar_id=df.at[i, 'mar_id'], groupe_marques_id=df.at[i, 'groupe_marques_id'], mar_code=df.at[i, 'mar_code'], mar_libelle=df.at[i, 'mar_libelle'], mar_ordre=df.at[i, 'mar_ordre']) if marque.exists(): # Update marque.update(groupe_marques_id=df.at[i, 'groupe_marques_id'], mar_code=df.at[i, 'mar_code'], mar_libelle=df.at[i, 'mar_libelle'], mar_ordre=df.at[i, 'mar_ordre']) else: liste_create.append(marque_create) Marque.objects.bulk_create(liste_create) return len(liste_create) # fonction d'insertion dans la table Groupes marques def insert_MarquesGroupe(df): """ Insertion des données de la table MarqueGroupe dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ liste_create = [] for i in range(df.shape[0]): gm = MarquesGroupe.objects.filter(gm_id=df.at[i, 'gm_id']) gm_create = MarquesGroupe(gm_id=df.at[i, 'gm_id'], gm_type=df.at[i, 'gm_type'], gm_code=df.at[i, 'gm_code'], gm_libelle=df.at[i, 'gm_libelle'], gm_ordre=df.at[i, 'gm_ordre'], gm_selection_multiple=df.at[i, 'gm_selection_multiple']) if gm.exists(): # Update gm.update(gm_type=df.at[i, 'gm_type'], gm_code=df.at[i, 'gm_code'], gm_libelle=df.at[i, 'gm_libelle'], gm_ordre=df.at[i, 'gm_ordre'], gm_selection_multiple=df.at[i, 'gm_selection_multiple']) else: liste_create.append(gm_create) MarquesGroupe.objects.bulk_create(liste_create) return 1 def insert_ZoneGeographique(df: pd.DataFrame) -> None: """ Insertion des données de la table ZoneGeographique dans la base :param df: Dataframe contenant les données pretraités à inserer :type df: class:`pandas.DataFrame` """ ModelType = ZoneGeographique liste_create = [] error_count = 0 zones_dict = dict((f"{o.zone_id}", o) for o in ModelType.objects.all()) for i in range(df.shape[0]): zone_id = f"{df.at[i,'zone_id']}" try: if zone_id in zones_dict: zone = zones_dict.pop(zone_id) else: zone_create = ModelType(zone_id=df.at[i, 'zone_id'], zone_libelle=df.at[i, 'zone_libelle']) zone_create.full_clean(validate_unique=False) liste_create.append(zone_create) except Exception: error_count = error_count + 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', ModelType.__name__, i, zone_id) if error_count: logger.warning("%s(s) en erreur : %s", ModelType.__name__, error_count) size_batch = 100 if liste_create: ModelType.objects.bulk_create(liste_create, batch_size=size_batch) logger.info('%s(s) créée(s) : %s', ModelType.__name__, len(liste_create)) deleted = ModelType.objects.filter(zone_id__in=zones_dict.keys()).delete()[0] if deleted: logger.info('%s(s) supprimée(s) : %s', ModelType.__name__, deleted) return len(liste_create),deleted, error_count def insert_Commentaires(df: pd.DataFrame) -> None: """ Insertion des données de commentaires dans la table des administrés :param df: Dataframe contenant les données pretraités à inserer :type df: class:`pandas.DataFrame` """ try: data = list(df.to_dict('index').values()) fields = ['a_notes_gestionnaire'] objs = [Administre(**data[i]) for i in range(len(data))] Administre.objects.bulk_update(objs, fields=fields) except Exception as e: logger.exception("Une erreur est survenue lors de l'insertion %s", e) return 1 @execution_time(level=logging.INFO, logger_factory=data_perf_logger_factory) def insert_Poste(df: pd.DataFrame, annee): """ Insertion des données de la table Poste dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer """ logger.info('start_insert_poste') ModelType = Poste ModelType_2 = Postes_Pams Cols = ModelType.Cols Cols_2 = ModelType_2.Cols col_pk = Cols.PK fields_to_update = [ 'formation_emploi_id', 'p_domaine', 'p_filiere', 'p_nf', 'fonction_id', 'p_fonction', 'p_categorie', 'p_nfs', 'p_dep', 'p_eip', 'p_annee', ] fields_to_update_2 = [ 'p_pam_id', 'poste_id', 'p_avis_pam', 'p_avis_fe_pam', 'p_direct_commissionne_pam', 'p_notes_gestionnaire_pam', 'p_priorisation_pcp_pam,' 'info_reo', ] fields_db = ['p_id'] + fields_to_update # Integer ou Float Colonnes à mettre à jour fields_num = [ ] # Dict pour convertir toutes les colonnes non Integer en chaîne de caractères. dict_conv_str = {a: str for a in fields_db if a not in fields_num} # Dict pour convertir toutes les colonnes Integer en Float. dict_conv_float = {a: float for a in fields_num } logger.info("Lecture de l'ensemble des postes en base") # Lire tous les postes de la base de données poste_in_db_df = pd.DataFrame.from_records(Poste.objects.all().values_list(*tuple(fields_db)), columns=fields_db) if not poste_in_db_df.empty: # Il va y avoir de modification de type donc c'est mieux de ne pas toucher df df_comparing = df.copy() # Modification de type de quelque champs logger.debug('Conversion des types pour la fusion') poste_in_db_df = (poste_in_db_df.fillna(APP_NAN) .replace({APP_NAN: None})) poste_in_db_df = poste_in_db_df.astype(dict_conv_str) poste_in_db_df = poste_in_db_df.astype(dict_conv_float) df_comparing = df_comparing.astype(dict_conv_str) df_comparing = df_comparing.astype(dict_conv_float) df_comparing['p_nf'] = df_comparing['p_nf'].str.upper() compare = pd.DataFrame([df_comparing[fields_db].dtypes,poste_in_db_df[fields_db].dtypes]).T logger.debug('Comparaison des types pour la fusion') logger.debug('------------------------------------') logger.debug(compare[compare[0]!=compare[1]].dropna()) logger.debug('------------------------------------') # Comparaison pour savoir ce qui doit etre creer, mis a jour ou supprimer comparing_poste_id = pd.merge(df_comparing, poste_in_db_df, how='outer', on='p_id', suffixes=(None, "_x"), indicator=True) same_rows = comparing_poste_id[comparing_poste_id['_merge']=='both'].drop('_merge', axis=1) new_rows = comparing_poste_id[comparing_poste_id['_merge']=='left_only'].drop('_merge', axis=1) delete_rows = comparing_poste_id[comparing_poste_id['_merge']=='right_only'].drop('_merge', axis=1) # Comparaison pour savoir des ligne a mettre a jour, lequel est deja a jour et lequel doit se mettre a jour comparing_poste_both = pd.merge(same_rows, poste_in_db_df, how='left', on=fields_db, suffixes=(None, "_x"), indicator=True) not_updated_rows = comparing_poste_both[comparing_poste_both['_merge']=='both'].drop('_merge', axis=1) updated_rows = comparing_poste_both[comparing_poste_both['_merge']=='left_only'].drop('_merge', axis=1) # Creation du df final avec une colonne db_create_status qui dis si la ligne doit etre creer ou mis a jour update = df.loc[df['p_id'].isin(list(updated_rows['p_id']))] update['db_create_status'] = 0 create = df.loc[df['p_id'].isin(list(new_rows['p_id']))] create['db_create_status'] = 1 df = pd.concat([update, create]) else: df['db_create_status'] = 1 not_updated_rows = pd.DataFrame([]) # IDs existants pour les clés étrangères fe_ids = set(FormationEmploi.objects.values_list('pk', flat=True)) dom_ids = set(Domaine.objects.values_list('pk', flat=True)) fil_ids = set(Filiere.objects.values_list('pk', flat=True)) fon_ids = set(Fonction.objects.values_list('pk', flat=True)) fields_not_validated = [f.name for f in ModelType._meta.get_fields() if f.is_relation] fields_not_validated_2 = [f.name for f in ModelType_2._meta.get_fields() if f.is_relation] dict_create = {} dict_create_2 = {} dict_update = {} set_dom = set() set_fil = set() set_fon = set() annee_a = PAM.objects.get(pam_statut='PAM en cours').pam_id error_count = 0 ignore_count = 0 for df_batch in batch_iterator(df, 1000): # itération par lot pour limiter le nombre d'objets en mémoire for idx, row in df_batch.iterrows(): try: annee_pam = annee pk = str(row[col_pk]) pk_2 = pk + str(annee_pam) if annee_pam == annee_a: annee_pam_suivant = int(annee)+1 pk_3 = pk + str(annee_pam_suivant) except Exception: logger.warning("Attention le fichier de REO est obsolète PAM A ou A+1") try: fe_id = row['formation_emploi_id'] if fe_id is not None and fe_id not in fe_ids: logger.warning("%s[pk=%s] ignoré car formation-emploi absente du référentiel : %s", ModelType.__name__, pk, fe_id) ignore_count = ignore_count + 1 continue dom_id = row['p_domaine'] if dom_id is not None and dom_id not in dom_ids: logger.warning("%s[pk=%s] domaine ignoré car absent du référentiel : %s", ModelType.__name__, pk, dom_id) set_dom.add(dom_id) dom_id = None fil_id = row['p_filiere'] if fil_id is not None and fil_id not in fil_ids: logger.warning("%s[pk=%s] filière ignorée car absente du référentiel : %s", ModelType.__name__, pk, fil_id) set_fil.add(fil_id) fil_id = None fon_id = row['fonction_id'] if fon_id is not None and fon_id not in fon_ids: logger.warning("%s[pk=%s] fonction ignorée car absente du référentiel : %s", ModelType.__name__, pk, fon_id) set_fon.add(fon_id) fon_id = None model_2 = ModelType_2(**{ 'id' :pk_2, 'p_pam_id' :annee_pam, 'poste_id' :pk, 'p_avis_pam' :StatutPam.NON_ETUDIE, 'p_avis_fe_pam' :StatutPam.NON_ETUDIE, 'p_direct_commissionne_pam' :None, 'p_notes_gestionnaire_pam' :None, 'p_priorisation_pcp_pam' :None, 'info_reo' :"SORG" if row['p_annee']=="SORG" else f'REO {annee_pam}', }) if annee_pam == annee_a: model_3 = ModelType_2(**{ 'id' :pk_3, 'p_pam_id' :annee_pam_suivant, 'poste_id' :pk, 'p_avis_pam' :StatutPam.NON_ETUDIE, 'p_avis_fe_pam' :StatutPam.NON_ETUDIE, 'p_direct_commissionne_pam' :None, 'p_notes_gestionnaire_pam' :None, 'p_priorisation_pcp_pam' :None, 'info_reo' :"SORG" if row['p_annee']=="SORG" else f'REO {annee_pam}', }) model = ModelType(**{ 'pk': pk, f'{Cols.REL_FORMATION_EMPLOI}_id': fe_id, f'{Cols.REL_DOMAINE}_id': dom_id, f'{Cols.REL_FILIERE}_id': fil_id, Cols.NIVEAU_FONCTIONNEL: row['p_nf'].upper(), f'{Cols.REL_FONCTION}_id': fon_id, 'p_fonction': row['p_fonction'], 'p_dep': row['p_dep'], Cols.CATEGORIE: row['p_categorie'], 'p_nfs': row['p_nfs'], 'p_eip': row['p_eip'], 'p_annee': row['p_annee'], }) if row['db_create_status']: model_2.full_clean(exclude=fields_not_validated_2, validate_unique=False) dict_create_2.setdefault(pk_2, model_2) if annee_pam == annee_a: model_3.full_clean(exclude=fields_not_validated_2, validate_unique=False) dict_create_2.setdefault(pk_3, model_3) model.full_clean(exclude=fields_not_validated, validate_unique=False) dict_create.setdefault(pk, model) else: if not ModelType_2.objects.filter(Q(id=pk_2)).exists(): model_2.full_clean(exclude=fields_not_validated_2, validate_unique=False) dict_create_2.setdefault(pk_2, model_2) if annee_pam == annee_a: if not ModelType_2.objects.filter(Q(id=pk_3)).exists(): model_3.full_clean(exclude=fields_not_validated_2, validate_unique=False) dict_create_2.setdefault(pk_3, model_3) model.full_clean(exclude=fields_not_validated, validate_unique=False) dict_update.setdefault(pk, model) except Exception: error_count = error_count + 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', ModelType.__name__, idx, pk) if error_count: logger.warning("%s(s) en erreur : %s", ModelType.__name__, error_count) if ignore_count: logger.warning('%s(s) ignoré(s) : %s', ModelType.__name__, ignore_count) if set_dom: logger.warning('%s %s(s) ignoré(s) : %s', len(set_dom), Domaine.__name__, set_dom) if set_fil: logger.warning('%s %s(s) ignorée(s) : %s', len(set_fil), Filiere.__name__, set_fil) if set_fon: logger.warning('%s %s(s) ignorée(s) : %s', len(set_fon), Filiere.__name__, set_fon) if not not_updated_rows.empty: logger.info('%s(s) déjà à jour : %s', ModelType.__name__, len(not_updated_rows)) batch_size = 100 if dict_create: logger.info('Création de %s %s(s)...', len(dict_create), ModelType.__name__) for idx, data_batch in enumerate(batch_iterator(list(dict_create.values()), batch_size)): ModelType.objects.bulk_create(data_batch) logger.debug('créé(s) : %s (lot %s)', len(data_batch), idx + 1) logger.info('%s(s) créé(s) : %s', ModelType.__name__, len(dict_create)) else: logger.info('Aucun %s(s) à créer', ModelType.__name__) if dict_create_2: logger.info('Création de %s %s(s)...', len(dict_create_2), ModelType_2.__name__) for idx, data_batch in enumerate(batch_iterator(list(dict_create_2.values()), batch_size)): ModelType_2.objects.bulk_create(data_batch) logger.debug('créé(s) : %s (lot %s)', len(data_batch), idx + 1) logger.info('%s(s) créé(s) : %s', ModelType_2.__name__, len(dict_create_2)) else: logger.info('Aucun %s(s) à créer', ModelType_2.__name__) if dict_update and fields_to_update: logger.info('Mise à jour de %s %s(s)...', len(dict_update), ModelType.__name__) for idx, data_batch in enumerate(batch_iterator(list(dict_update.values()), batch_size)): ModelType.objects.bulk_update(data_batch, fields=fields_to_update) logger.debug('mis à jour : %s (lot %s)', len(data_batch), idx + 1) logger.info('%s(s) mis à jour : %s', ModelType.__name__, len(dict_update)) else: logger.info('Aucun %s(s) à mettre à jour', ModelType.__name__) # pas de suppression ici pour l'instant return len(dict_create), len(dict_create_2), len(dict_update), error_count, ignore_count def insert_delta(df: pd.DataFrame) -> None: annee_pam = PAM.objects.filter(pam_statut='PAM en cours')[0].pam_id annee_pam_suivant = PAM.objects.filter(pam_statut='PAM A+1')[0].pam_id poste_init = dict((f"{o.poste_id}", o) for o in Postes_Pams.objects.all().filter(p_pam_id=annee_pam)) for i,row in df.iterrows(): p_suivant = str(row['p_id']) info_annee = str(row['p_annee']) if p_suivant in poste_init: if Postes_Pams.objects.filter(Q(poste_id=p_suivant) & Q(p_pam_id=annee_pam_suivant)& Q(info_reo="SORG")): Postes_Pams.objects.filter(Q(poste_id=p_suivant) & Q(p_pam_id=annee_pam_suivant)).update(info_reo=f'CREE {annee_pam_suivant}') poste_init.pop(p_suivant) else: Postes_Pams.objects.filter(Q(poste_id=p_suivant) & Q(p_pam_id=annee_pam_suivant)).update(info_reo=f'REO {annee_pam_suivant}') poste_init.pop(p_suivant) else: if info_annee !='SORG': Postes_Pams.objects.filter(Q(poste_id=p_suivant) & Q(p_pam_id=annee_pam_suivant)).update(info_reo=f'CREE {annee_pam_suivant}') for i in poste_init: Postes_Pams.objects.filter(Q(poste_id=i) & Q(p_pam_id=annee_pam_suivant)).update(info_reo='SUP REO') return 1 # Insertion de Id sap dans la table poste def update_poste_ocv(df: pd.DataFrame) -> None: """ Met à jour les postes : renseigne l'ID SAP de l'administré à partir des données d'OCV. :param df: Dataframe contenant les données pretraités à inserer :type df: class:`pandas.DataFrame` """ start_time_insert = time.time() update_header = [Poste.Cols.REL_ADMINISTRE] adm_ids = list(Administre.objects.values_list('pk', flat=True)) pos_ids = {t[0]: t[1] for t in Poste.objects.values_list('pk', f'{Poste.Cols.REL_ADMINISTRE}_id')} dict_update = {} up_to_date = set() ignore_count = 0 error_count = 0 for i, row in df.iterrows(): pk = row['p_id'] adm_id = int(row['Identifiant SAP']) try: if pk not in pos_ids: logger.warning("%s[pk=%s] non mis à jour car ID de poste inconnu", Poste.__name__, pk) ignore_count = ignore_count + 1 continue if adm_id not in adm_ids: logger.warning("%s[pk=%s] non mis à jour car ID SAP inconnu : %s", Poste.__name__, pk, adm_id) ignore_count = ignore_count + 1 continue if adm_id != pos_ids.get(pk): dict_update.setdefault(pk, Poste(pk=pk, p_administre_id=adm_id)) else: up_to_date.add(pk) except Exception: error_count = error_count + 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', Poste.__name__, i, pk) if ignore_count: logger.warning('%s(s) ignoré(s) : %s', Poste.__name__, ignore_count) if error_count: logger.warning("%s(s) en erreur : %s", Poste.__name__, error_count) if update_header: if dict_update: Poste.objects.bulk_update(dict_update.values(), batch_size=100, fields=update_header) logger.info('%s(s) mis à jour : %s', Poste.__name__, len(dict_update)) if up_to_date: logger.info('%s(s) déjà à jour : %s', Poste.__name__, len(up_to_date)) logger.debug("insert time %d seconds ", time.time() - start_time_insert) logger.info('Insert poste ocv end') return len(dict_update), error_count, ignore_count def insert_FMOB_fmob(df): """ Insertion des données du fichier FMOB pour la table FMOB dans la base :type df: dataframe :param df: Dataframe contenant les données du fichier FMOB pretraités à inserer Returns ------- results : list Liste de toutes les données insérées :return: - **results** (*list*): Liste de toutes les données insérées. """ logger.info("----- Début de l'insertion du fichier FMOB pour la table FMOB ----- ") liste_create = [] liste_update = [] # To do : ADD columns update_header = ['administre_id', 'fmob_millesime', "fmob_annulation_fmob", "fmob_reception_drhat_fmob", 'fmob_mobilite_bassin_externe', 'fmob_mobilite_bassin_interne', 'fmob_mobilite_centre_interet_adt', 'fmob_mobilite_dans_specialite', 'fmob_mobilite_hors_metropole', 'fmob_mobilite_recrutement_particulier_administre', "fmob_sans_suite_militaire_fmob", 'fmob_avis_cdc_mutation_administre', 'fmob_avis_cdc_mobilite_externe', 'fmob_avis_cdc_mobilite_interne', 'fmob_avis_cdc_mobilite_centre_interet', 'fmob_avis_cdc_mobilite_specialite', 'fmob_avis_cdc_mobilite_hors_metropole', 'fmob_avis_cdc_mobilite_recrutement_particulier_admin', "fmob_date_visa_militaire", "fmob_depart_institution_soff", "fmob_date_deb_fmob", "fmob_date_fin_fmob", "fmob_date_signature_admin_fmob", "fmob_date_signature_chef_de_corps", "fmob_motif_edition_la", "fmob_reconnaissance_parcours_pro_administre", "fmob_statut", "fmob_remarques_eventuelles_administres", "fmob_avis_commandant_formation", ] fmob_dict = dict((f"{o.administre_id}", o) for o in FMOB.objects.all()) adm_ids = list(Administres_Pams.objects.values_list('pk', flat=True)) ignore_count = 0 for i, row in df.iterrows(): adm_id = str(int(row['administre_id'])) + str(int(row['fmob_millesime'])) #adm_id = str(row['administre_id']) + str(int(row['fmob_millesime'])) if adm_id not in adm_ids: logger.warning("FMOB d'id %s ignoré car l'administré est absent de la base", adm_id) ignore_count = ignore_count + 1 continue if str(adm_id) in fmob_dict: # ADD columns fmob = fmob_dict.pop(str(adm_id)) fmob.fmob_millesime=row['fmob_millesime'] fmob.fmob_annulation_fmob=row['fmob_annulation_fmob'] fmob.fmob_reception_drhat_fmob=row['fmob_reception_drhat_fmob'] fmob.fmob_mobilite_bassin_externe=row['fmob_mobilite_bassin_externe'] fmob.fmob_mobilite_bassin_interne=row['fmob_mobilite_bassin_interne'] fmob.fmob_mobilite_centre_interet_adt=row['fmob_mobilite_centre_interet_adt'] fmob.fmob_mobilite_dans_specialite=row['fmob_mobilite_dans_specialite'] fmob.fmob_mobilite_hors_metropole=row['fmob_mobilite_hors_metropole'] fmob.fmob_mobilite_recrutement_particulier_administre=row['fmob_mobilite_recrutement_particulier_administre'] fmob.fmob_sans_suite_militaire_fmob=row['fmob_sans_suite_fmob'] fmob.fmob_date_visa_militaire=row['fmob_date_visa_militaire'] fmob.fmob_depart_institution_soff=row['fmob_depart_institution_soff'] fmob.fmob_avis_cdc_mutation_administre=row['fmob_avis_cdc_mutation_administre'] fmob.fmob_avis_cdc_mobilite_externe=row['fmob_avis_cdc_mobilite_externe'] fmob.fmob_avis_cdc_mobilite_interne=row['fmob_avis_cdc_mobilite_interne'] fmob.fmob_avis_cdc_mobilite_centre_interet=row['fmob_avis_cdc_mobilite_centre_interet'] fmob.fmob_avis_cdc_mobilite_specialite=row['fmob_avis_cdc_mobilite_specialite'] fmob.fmob_avis_cdc_mobilite_hors_metropole=row['fmob_avis_cdc_mobilite_hors_metropole'] fmob.fmob_avis_cdc_mobilite_recrutement_particulier_admin=row['fmob_avis_cdc_mobilite_recrutement_particulier_admin'] fmob.fmob_date_deb_fmob=row['fmob_date_deb_fmob'] fmob.fmob_date_fin_fmob=row['fmob_date_fin_fmob'] fmob.fmob_date_signature_admin_fmob=row['fmob_date_signature_admin_fmob'] fmob.fmob_date_signature_chef_de_corps=row['fmob_date_signature_chef_de_corps'] fmob.fmob_motif_edition_la=row['fmob_motif_edition_la'] fmob.fmob_reconnaissance_parcours_pro_administre=row['fmob_reconnaissance_parcours_pro_administre'] fmob.fmob_statut=row['fmob_statut'] fmob.fmob_remarques_eventuelles_administres = row['fmob_remarques_eventuelles_administres'] fmob.fmob_avis_commandant_formation = row['fmob_avis_commandant_formation'] liste_update.append(fmob) else: # ADD columns fmob = FMOB(fmob_id=str(int(row['administre_id'])) + '_' + str(int(row['fmob_millesime'])), administre_id=adm_id, fmob_millesime=row['fmob_millesime'], fmob_annulation_fmob=row['fmob_annulation_fmob'], fmob_reception_drhat_fmob=row['fmob_reception_drhat_fmob'], fmob_sans_suite_militaire_fmob=row['fmob_sans_suite_fmob'], fmob_mobilite_bassin_externe=row['fmob_mobilite_bassin_externe'], fmob_mobilite_bassin_interne=row['fmob_mobilite_bassin_interne'], fmob_mobilite_centre_interet_adt=row['fmob_mobilite_centre_interet_adt'], fmob_mobilite_dans_specialite=row['fmob_mobilite_dans_specialite'], fmob_mobilite_hors_metropole=row['fmob_mobilite_hors_metropole'], fmob_mobilite_recrutement_particulier_administre=row['fmob_mobilite_recrutement_particulier_administre'], fmob_date_visa_militaire=row['fmob_date_visa_militaire'], fmob_depart_institution_soff=row['fmob_depart_institution_soff'], fmob_avis_cdc_mutation_administre=row['fmob_avis_cdc_mutation_administre'], fmob_avis_cdc_mobilite_externe=row['fmob_avis_cdc_mobilite_externe'], fmob_avis_cdc_mobilite_interne=row['fmob_avis_cdc_mobilite_interne'], fmob_avis_cdc_mobilite_centre_interet=row['fmob_avis_cdc_mobilite_centre_interet'], fmob_avis_cdc_mobilite_specialite=row['fmob_avis_cdc_mobilite_specialite'], fmob_avis_cdc_mobilite_hors_metropole=row['fmob_avis_cdc_mobilite_hors_metropole'], fmob_avis_cdc_mobilite_recrutement_particulier_admin=row['fmob_avis_cdc_mobilite_recrutement_particulier_admin'], fmob_date_deb_fmob=row['fmob_date_deb_fmob'], fmob_date_fin_fmob=row['fmob_date_fin_fmob'], fmob_date_signature_admin_fmob=row['fmob_date_signature_admin_fmob'], fmob_date_signature_chef_de_corps=row['fmob_date_signature_chef_de_corps'], fmob_motif_edition_la=row['fmob_motif_edition_la'], fmob_reconnaissance_parcours_pro_administre=row['fmob_reconnaissance_parcours_pro_administre'], fmob_statut=row['fmob_statut'], fmob_proposition_affectation_verrouille=False, fmob_remarques_eventuelles_administres = row['fmob_remarques_eventuelles_administres'], fmob_avis_commandant_formation = row['fmob_avis_commandant_formation'], ) liste_create.append(fmob) if ignore_count: logger.info('%s(s) ignoré(s) : %s', FMOB.__name__, ignore_count) size_batch = 100 if liste_create: logger.info('%s(s) créé(s) : %s', FMOB.__name__, len(liste_create)) FMOB.objects.bulk_create(liste_create, batch_size=size_batch) if liste_update: logger.info('%s(s) mis à jour : %s', FMOB.__name__, len(liste_update)) FMOB.objects.bulk_update(liste_update, fields=update_header, batch_size=size_batch) logger.info("----- Fin de l'insertion du fichier FMOB pour la table FMOB ----- ") return len(liste_create), len(liste_update), ignore_count def insert_FMOB_femp(df): """ Insertion des données du fichier FEMP pour la table FMOB dans la base :type df: dataframe :param df: Dataframe contenant les données du fichier FEMP pretraités à inserer Returns ------- results : list Liste de toutes les données insérées :return: - **results** (*list*): Liste de toutes les données insérées. """ logger.info("----- Début de l'insertion du fichier FEMP pour la table FMOB ----- ") liste_create = [] liste_update = [] update_header = ['fmob_annulation_femp', 'fmob_proposition_affectation_verrouille', 'fmob_sans_suite_militaire_femp', 'fmob_date_signature_admin_femp', "fmob_millesime_femp","fmob_commentaire_ac"] fmob_dict = dict((f"{o.administre_id}", o) for o in FMOB.objects.all()) adm_ids = list(Administres_Pams.objects.values_list('pk', flat=True)) ignore_count = 0 for i, row in df.iterrows(): adm_id = str(int(row['administre_id'])) + str(int(row['fmob_millesime_femp'])) if adm_id not in adm_ids: logger.warning("FEMP d'id %s ignoré car l'administré est absent de la base", adm_id) ignore_count = ignore_count + 1 continue if str(adm_id) in fmob_dict: fmob = fmob_dict.pop(str(adm_id)) fmob.fmob_annulation_femp = row['fmob_annulation_femp'] fmob.fmob_proposition_affectation_verrouille = row['fmob_proposition_affectation_verrouille'] fmob.fmob_sans_suite_militaire_femp = row['fmob_sans_suite_femp'] fmob.fmob_date_signature_admin_femp = row['fmob_date_signature_admin_femp'] fmob.fmob_millesime_femp = row['fmob_millesime_femp'] fmob.fmob_commentaire_ac = row['fmob_commentaire_ac'] liste_update.append(fmob) else: fmob = FMOB(fmob_id=str(int(row['administre_id'])) + '_' + str(None), administre_id=adm_id, fmob_annulation_femp=row['fmob_annulation_femp'], fmob_proposition_affectation_verrouille=row['fmob_proposition_affectation_verrouille'], fmob_sans_suite_militaire_femp=row['fmob_sans_suite_femp'], fmob_statut='Non réceptionné', fmob_date_signature_admin_femp=row['fmob_date_signature_admin_femp'], fmob_commentaire_ac = row['fmob_commentaire_ac']) liste_create.append(fmob) # Administres_Pams.objects.update(pam_id=row['fmob_millesime']) if ignore_count: logger.info('FEMP(s) ignoré(s) : %s', ignore_count) size_batch = 100 if liste_create: logger.info('%s(s) créé(s) : %s', FMOB.__name__, len(liste_create)) FMOB.objects.bulk_create(liste_create, batch_size=size_batch) if liste_update: logger.info('%s(s) mis à jour : %s', FMOB.__name__, len(liste_update)) FMOB.objects.bulk_update(liste_update, fields=update_header, batch_size=size_batch) logger.info("----- Fin de l'insertion du fichier FEMP pour la table FMOB ----- ") return len(liste_create), len(liste_update), ignore_count @execution_time(level=logging.INFO, logger_factory=data_perf_logger_factory) def insert_administre_notation(df: pd.DataFrame) -> None: """ Insertion des données de la table Administre_Notation dans la base :param df: Dataframe contenant les données pretraités à inserer :type df: class:`pandas.DataFrame` """ logger.info('start insert_administre_notation') ModelType = Administre_Notation liste_create = [] liste_update = [] df['key'] = df['administre_id'].astype(float).astype(str) + '_' + df['no_age_annees'].astype(float).astype(str) fields_to_update = [ 'no_annne_de_notation', 'no_nr_ou_iris', 'no_rac_ou_iris_cumule', 'no_rf_qsr', 'no_aptitude_emploie_sup', 'no_potentiel_responsabilite_sup' ] fields_db = ['administre_id','no_age_annees'] + fields_to_update # Integer ou Float Colonnes à mettre à jour fields_num = [ 'administre_id', 'no_age_annees', 'no_annne_de_notation', 'no_nr_ou_iris', 'no_rac_ou_iris_cumule' ] # Dict pour convertir toutes les colonnes non Integer en chaîne de caractères. dict_conv_str = {a: str for a in fields_db if a not in fields_num} # Dict pour convertir toutes les colonnes Integer en Float. dict_conv_float = {a: float for a in fields_num } logger.info("Lecture de l'ensemble des notations en base") # Lire tous les postes de la base de données notation_in_db_df = pd.DataFrame.from_records(Administre_Notation.objects.all().values_list(*tuple(fields_db)), columns=fields_db) if not notation_in_db_df.empty: # Il va y avoir de modification de type donc c'est mieux de ne pas toucher df df_comparing = df.copy() # Modification de type de quelque champs logger.info('Conversion des types pour la fusion') notation_in_db_df = notation_in_db_df.fillna(np.NAN).replace('None',np.NAN) notation_in_db_df = notation_in_db_df.astype(dict_conv_str) notation_in_db_df = notation_in_db_df.astype(dict_conv_float) notation_in_db_df['key'] = notation_in_db_df['administre_id'].astype(str) + '_' + notation_in_db_df['no_age_annees'].astype(str) df_comparing = df_comparing.astype(dict_conv_str) df_comparing = df_comparing.astype(dict_conv_float) # Comparaison pour savoir ce qui doit etre creer, mis a jour ou supprimer comparing_poste_id = pd.merge(df_comparing, notation_in_db_df, how='outer', on='key', suffixes=(None, "_x"), indicator=True) same_rows = comparing_poste_id[comparing_poste_id['_merge']=='both'].drop('_merge', axis=1) new_rows = comparing_poste_id[comparing_poste_id['_merge']=='left_only'].drop('_merge', axis=1) delete_rows = comparing_poste_id[comparing_poste_id['_merge']=='right_only'].drop('_merge', axis=1) # Comparaison pour savoir des ligne a mettre a jour, lequel est deja a jour et lequel doit se mettre a jour comparing_poste_both = pd.merge(same_rows, notation_in_db_df, how='left', on=fields_db, suffixes=(None, "_x"), indicator=True) not_updated_rows = comparing_poste_both[comparing_poste_both['_merge']=='both'].drop('_merge', axis=1) updated_rows = comparing_poste_both[comparing_poste_both['_merge']=='left_only'].drop('_merge', axis=1) # Creation du df final avec une colonne db_create_status qui dis si la ligne doit etre creer ou mis a jour update = df.loc[df['key'].isin(list(updated_rows['key']))] update['db_create_status'] = 0 create = df.loc[df['key'].isin(list(new_rows['key']))] create['db_create_status'] = 1 df = pd.concat([update, create]) else: df['db_create_status'] = 1 not_updated_rows = pd.DataFrame([]) dict_create = {} dict_update = {} error_count = 0 for df_batch in batch_iterator(df, 1000): # itération par lot pour limiter le nombre d'objets en mémoire for idx, row in df_batch.iterrows(): pk = f"{row['administre_id']}_{row['no_age_annees']}" try: model = ModelType( id = pk, administre_id = row['administre_id'], no_nr_ou_iris=row['no_nr_ou_iris'], no_rac_ou_iris_cumule=row['no_rac_ou_iris_cumule'], no_rf_qsr=row['no_rf_qsr'], no_aptitude_emploie_sup=row['no_aptitude_emploie_sup'], no_potentiel_responsabilite_sup=row['no_potentiel_responsabilite_sup'], no_annne_de_notation=row['no_annne_de_notation'], no_age_annees=row['no_age_annees'] ) if row['db_create_status']: dict_create.setdefault(pk, model) else: dict_update.setdefault(pk, model) except Exception: error_count = error_count + 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', ModelType.__name__, idx, pk) if error_count: logger.warning("%s(s) en erreur : %s", ModelType.__name__, error_count) if not not_updated_rows.empty: logger.info('%s(s) déjà à jour : %s', ModelType.__name__, len(not_updated_rows)) batch_size = 100 if dict_create: logger.info('Création de %s %s(s)...', len(dict_create), ModelType.__name__) for idx, data_batch in enumerate(batch_iterator(list(dict_create.values()), batch_size)): ModelType.objects.bulk_create(data_batch) logger.debug('créé(s) : %s (lot %s)', len(data_batch), idx + 1) logger.info('%s(s) créé(s) : %s', ModelType.__name__, len(dict_create)) else: logger.info('Aucun %s(s) à créer', ModelType.__name__) if dict_update and fields_to_update: logger.info('Mise à jour de %s %s(s)...', len(dict_update), ModelType.__name__) for idx, data_batch in enumerate(batch_iterator(list(dict_update.values()), batch_size)): ModelType.objects.bulk_update(data_batch, fields=fields_to_update) logger.debug('mis à jour : %s (lot %s)', len(data_batch), idx + 1) logger.info('%s(s) mis à jour : %s', ModelType.__name__, len(dict_update)) else: logger.info('Aucun %s(s) à mettre à jour', ModelType.__name__) return len(dict_create), len(dict_update), error_count def suppression_administres(df: pd.DataFrame) -> None: """ Suppression des administrés de la table des administrés :param df: Dataframe contenant les données pretraités à supprimer :type df: class:`pandas.DataFrame` """ ModelType = Administre try: delete_list = df['a_id_sap'].tolist() logger.debug('%s(s) SAP à supprimer : %s', ModelType.__name__, len(delete_list)) deleted = ModelType.objects.filter(Q(a_id_sap__in=delete_list)).delete() if deleted[0]: logger.info('Instance(s) supprimée(s) : %s', deleted[0]) for k, v in deleted[1].items(): logger.info(' %s(s) supprimé(s) : %s', k, v) except Exception as e: logger.exception("Une erreur est survenue lors de la suppression %s", e) return len(delete_list) # fonction d'insertion dans la table Notation def insert_liste_preference(df): """ Insertion des données de la liste preference dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ liste_create = [] liste_update = [] update_header = ['administre_id','administre_pam_id','poste_id','lp_rang_poste','pam_id'] for i in range(df.shape[0]): try: pl = PreferencesListe.objects.get(lp_id=int(df.at[i, 'lp_id'])) # Update pl.administre_pam_id = df.at[i, 'administre_pam_id'] pl.administre_id = df.at[i, 'a_id_sap'] pl.poste_id = df.at[i, 'poste_id'] pl.lp_rang_poste = df.at[i, 'lp_rang_poste'] pl.pam_id = df.at[i, 'pam_id'] liste_update.append(pl) except: pl_create = PreferencesListe(lp_id=df.at[i, 'lp_id'], poste_id=df.at[i, 'poste_id'], administre_pam_id=df.at[i, 'administre_pam_id'], administre_id=df.at[i, 'a_id_sap'], lp_rang_poste=df.at[i, 'lp_rang_poste'], pam_id=df.at[i, 'pam_id']) liste_create.append(pl_create) PreferencesListe.objects.bulk_create(liste_create) PreferencesListe.objects.bulk_update(liste_update, fields=update_header) return 1 def insert_Notation(df, pam_id, sv_id): """ Insertion des données de la table Notation dans la base :type df: dataframe :param df: Dataframe contenant les données pretraités à inserer :return: - **1** (*int*): La fonction renvoie 1 si l'execution a été réalisé avec succés. """ start_time = time.time() liste_create = [] liste_update = [] update_header = ['no_date_execution', 'no_score_administre'] statut = [StatutPam.A_MUTER,StatutPam.A_ETUDIER] # Mettre le flag_cple de toutes les notations du sours-vivier à Faux dans la base de données Notation.objects.filter(pam_id=pam_id, administre__sous_vivier_id=sv_id, no_flag_cple_ideal=True).update(no_flag_cple_ideal=False) # Lire la table notation où les administrés appartiennent au sous_vivier all_notation_sv_id = Notation.objects.filter(pam_id=pam_id,administre__sous_vivier_id=sv_id).values() all_notation_sv_id_pd = pd.DataFrame.from_records(all_notation_sv_id) # Vérifiez si la notation dans la base de données est vide ; si elle est vide, toutes les notations calculées sont différentes. if not all_notation_sv_id_pd.empty: # Comparer la nouvelle notation calculée à la notation de la base de données, et créer un dataframe avec seulement les différents lignes Comparing_notation = pd.merge(df, all_notation_sv_id_pd, how='left', left_on=['administre_pam_id', 'poste_pam_id', 'pam_id', 'no_score_administre'], right_on=['administre_pam_id', 'poste_pam_id', 'pam_id', 'no_score_administre'], suffixes=(None, "_x"), indicator=True) Comparing_notation.drop(['no_date_execution_x', 'no_flag_cple_ideal_x'], 1, inplace=True) Different_rows = Comparing_notation[Comparing_notation['_merge'] != 'both'].drop('_merge', 1) Different_rows = Different_rows.drop(Different_rows.columns.difference(['poste_pam_id','poste_id','administre_pam_id','administre_id','pam_id','no_date_execution','no_score_administre','no_flag_cple_ideal']), 1) else : # Comme la notation est vide dans la base de données, les différentes lignes seront toutes les notations calculées et la notation de la base de données aura seulement les colonnes mais vide Different_rows = df.copy() all_notation_sv_id_pd = df.copy()[0:0] logger.debug("Il existe {} des lignes qui ont changé".format(len(Different_rows))) logger.debug("------------------Comparing Time -- %d seconds -----------------" % (time.time() - start_time)) start_time = time.time() # Si certaines lignes sont différentes, vérifiez si elles doivent être mises à jour ou créées. if not Different_rows.empty : Merging_notation = pd.merge(Different_rows, all_notation_sv_id_pd, how='left', left_on=['administre_pam_id', 'poste_pam_id', 'pam_id'], right_on=['administre_pam_id', 'poste_pam_id', 'pam_id'], suffixes=(None, "_x"), indicator=True) Merging_notation.drop(['no_date_execution_x', 'no_score_administre_x', 'no_flag_cple_ideal_x'], 1, inplace=True) update = Merging_notation[Merging_notation['_merge'] == 'both'].drop('_merge', 1) create = Merging_notation[Merging_notation['_merge'] != 'both'].drop('_merge', 1) logger.debug("------------------Merging Time -- %d seconds -----------------" % (time.time() - start_time)) start_time = time.time() # Création de la liste des objets notations qui doivent être créées if not create.empty: liste_create = list(create.apply(lambda x: Notation(poste_id=x['poste_id'], poste_pam_id = x['poste_pam_id'], administre_id=x['administre_id'], administre_pam_id=x['administre_pam_id'], pam_id=x['pam_id'], no_date_execution=x['no_date_execution'], no_score_administre=x['no_score_administre'], no_flag_cple_ideal=x['no_flag_cple_ideal']), axis=1)) # Création de la liste des objets notations qui doivent être mises à jour if not update.empty: liste_update = list(update.apply(lambda x: Notation(no_id=x['no_id'], poste_id=x['poste_id'], poste_pam_id = x['poste_pam_id'], administre_id=x['administre_id'], administre_pam_id=x['administre_pam_id'], pam_id=x['pam_id'], no_date_execution=x['no_date_execution'], no_score_administre=x['no_score_administre'], no_flag_cple_ideal=x['no_flag_cple_ideal']), axis=1)) logger.debug("------------------Creating Objects Time -- %d seconds -----------------" % (time.time() - start_time)) start_time = time.time() logger.debug("Dans les {} des lignes qui ont changé :".format(len(Different_rows))) logger.debug(" Il ya {} des lignes qui doivent être créées".format(len(liste_create))) logger.debug(" Il ya {} des lignes qui doivent être mises à jour".format(len(liste_update))) batch_size = 100 Notation.objects.bulk_create(liste_create, batch_size=batch_size) Notation.objects.bulk_update(liste_update, fields=update_header, batch_size=batch_size) logger.debug("------------------Inserting Objects Time -- %d seconds -----------------" % (time.time() - start_time)) else : logger.debug("Rien à mettre à jour ou à créer") return 1 # Fonction d'inertion des resultats du matching def insert_matching(matching_dict, pam_id): """ Fonction d'insertion en base des resultats du matching :type matching_dict: dictionnaire :param matching_dict: dictionnaire python presentant le resultats du matching Returns ------- 'insert_matching_result_done' : str La fonction renvoie 'insert_matching_result_done' si l'execution a été réalisé avec succés :return: - **insert_matching_result_done** (*str*): La fonction renvoie 'insert_matching_result_done' si l'execution a été réalisé avec succés. """ for poste, list_admin in matching_dict.items(): for i in range(len(list_admin)): couple_ideal = Notation.objects.get(poste_pam_id=str(poste)+str(pam_id), administre_pam_id=str(list_admin[i].name)+str(pam_id)) couple_ideal.no_flag_cple_ideal = True couple_ideal.save() return ('insert_matching_result_done')