import logging import numpy as np import pandas as pd from backend.models.administre import Administre, Administres_Pams from backend.models.administre import StatutPamChoices as StatutPam from backend.models.pam import PAM from backend.models.domaine import Domaine from backend.models.filiere import Filiere from backend.models.formation_emploi import FormationEmploi from backend.models.grade import Grade from django.db.models import Q from ..alimentation_decorators import data_perf_logger_factory, get_data_logger from ..decorators import execution_time from .commun import (InsertionCounters, batch_iterator, is_same_model, is_same_value) logger = get_data_logger(__name__) @execution_time(level=logging.INFO, logger_factory=data_perf_logger_factory) def update_administre_fmob(df) -> None: """ Met à jour le statut PAM des administrés quand un formulaire de mobilité est annulé. """ ModelType = Administres_Pams Cols = ModelType.Cols status = StatutPam.A_MAINTENIR annee_pam = str(int(df['fmob_millesime'].iloc[0])) updated = (ModelType.objects .filter(**{f'{Cols.O2M_FMOB}__fmob_annulation_fmob': True}) .filter(~Q(**{Cols.STATUT_PAM: status})) .filter(a_statut_pam_annee=StatutPam.NON_ETUDIE) .filter(pam__pam_id=annee_pam) .update(**{Cols.STATUT_PAM: status})) if updated: logger.info('%s | mis à jour car FMOB annulé : %s', ModelType.__name__, updated) logger.info('Début de la mise à jour des FMOB en "A traiter"') next_status = StatutPam.A_TRAITER next_updated = (ModelType.objects .filter(**{f'{Cols.O2M_FMOB}__isnull' : False}) .filter(~Q(**{Cols.STATUT_PAM: status})) .filter(a_statut_pam_annee=StatutPam.NON_ETUDIE) .filter(pam__pam_id=annee_pam) .update(**{Cols.STATUT_PAM: next_status})) if next_updated: logger.info('%s | mis à jour car FMOB existe: %s administrés qui ont un FORMOB', ModelType.__name__, next_updated) logger.info("%s[pk=%s] administré mis à jour", ModelType.__name__,next_updated) return updated, next_updated @execution_time(level=logging.INFO, logger_factory=data_perf_logger_factory) def insert_administre_bo(df: pd.DataFrame) -> None: """ Insère ou met à jour des données de la table des administrés. :param df: Dataframe contenant les données pretraités à inserer :type df: class:`pandas.DataFrame` """ logger.info('start update_administre') ModelType = Administre ModelType_2 = Administres_Pams Cols = ModelType.Cols Cols_2 = ModelType_2.Cols col_pk = Cols.PK fields_to_update = [ 'formation_emploi_id', 'a_code_fonction', 'grade_id', 'a_grade_date_debut', 'a_fonction', 'a_nom', 'a_prenom', 'a_sexe', 'a_id_def', 'a_eip', 'a_eip_fiche_detaille', 'a_eis', Cols.REL_DOMAINE, Cols.REL_FILIERE, 'a_nf', 'a_domaine_gestion', 'a_date_entree_service', 'a_arme', 'a_rg_origine_recrutement', 'a_date_rdc', 'a_date_dernier_acr', 'a_date_naissance', 'a_diplome_hl', 'a_dernier_diplome', 'a_credo_fe', 'a_date_arrivee_fe', 'a_date_pos_statuaire', 'a_pos_statuaire', 'a_interruption_service', 'a_situation_fam', 'a_date_mariage', 'a_nombre_enfants', 'a_enfants', 'a_sap_conjoint', 'a_fonction1', 'a_fonction2', 'a_fonction3', 'a_fonction4', 'a_fonction5', 'a_fonction6', 'a_fonction7', 'a_fonction8', 'a_fonction9', 'a_date_fonction1', 'a_date_fonction2', 'a_date_fonction3', 'a_date_fonction4', 'a_date_fonction5', 'a_date_fonction6', 'a_date_fonction7', 'a_date_fonction8', 'a_date_fonction9', 'a_pls_gb_max', 'a_marqueur_pn', 'a_profession_conjoint', 'a_id_def_conjoint', 'a_sexe_conjoint', 'a_origine_recrutement', Cols.STATUT_CONCERTO, Cols.DATE_STATUT_CONCERTO, Cols.STATUT_CONCERTO_FUTUR, Cols.DATE_STATUT_CONCERTO_FUTUR, 'a_fud', 'a_date_fud', 'a_lien_service', 'a_categorie', ] fields_to_update_2 = [ 'pam_id', 'administre_id', 'a_statut_pam_annee', ] models_in_db = {m.pk: m for m in ModelType.objects.only(col_pk, *fields_to_update)} fields_db = ['a_id_sap'] + fields_to_update # Integer ou Float Colonnes à mettre à jour fields_num = [ 'a_sap_conjoint', 'a_pls_gb_max', 'a_nombre_enfants', 'a_id_sap' ] # Dict pour convertir toutes les colonnes non Integer en chaîne de caractères. dict_conv_str = {a: str for a in fields_db if a not in fields_num} # Dict pour convertir toutes les colonnes Integer en Float. dict_conv_float = {a: float for a in fields_num } # Lire tous les administrés de la base de données adm_in_db_df = pd.DataFrame.from_records(Administre.objects.all().values_list(*tuple(fields_db)), columns = fields_db) df = df.drop(['_merge'],1) if not adm_in_db_df.empty : # Il va y avoir de modification de type donc c'est mieux de ne pas toucher df df_comparaison = df.copy() # Modification de type de quelque champs logger.debug('Conversion des types pour la fusion') df_comparaison = df_comparaison.fillna(np.nan) df_comparaison = df_comparaison.replace('None', np.nan) adm_in_db_df = adm_in_db_df.fillna(np.nan) adm_in_db_df = adm_in_db_df.replace('None', np.nan) adm_in_db_df = adm_in_db_df.astype(dict_conv_str) adm_in_db_df = adm_in_db_df.astype(dict_conv_float) df_comparaison = df_comparaison.astype(dict_conv_str) df_comparaison = df_comparaison.astype(dict_conv_float) compare = pd.DataFrame([df_comparaison[fields_db].dtypes,adm_in_db_df[fields_db].dtypes]).T logger.debug('Comparaison des types pour la fusion') # logger.debug(compare[compare[0]!=compare[1]].dropna()) logger.debug('------------------------------------') # Comparaison pour savoir ce qui doit etre creer, mis a jour ou supprimer comparing_adm_sap = pd.merge(df_comparaison, adm_in_db_df, how='outer', on = 'a_id_sap', suffixes=(None, "_x"), indicator=True) same_rows = comparing_adm_sap[comparing_adm_sap['_merge'] == 'both'].drop('_merge', axis=1) new_rows = comparing_adm_sap[comparing_adm_sap['_merge'] == 'left_only'].drop('_merge', axis=1) delete_rows = comparing_adm_sap[comparing_adm_sap['_merge'] == 'right_only'].drop('_merge', axis=1) # Comparaison pour savoir des ligne a mettre a jour, lequel est deja a jour et lequel doit se mettre a jour comparing_adm_both = pd.merge(same_rows, adm_in_db_df, how='left', on=fields_db, suffixes=(None, "_x"), indicator=True) not_updated_rows = comparing_adm_both[comparing_adm_both['_merge'] == 'both'].drop(['_merge'], axis=1) updated_rows = comparing_adm_both[comparing_adm_both['_merge'] == 'left_only'].drop(['_merge'], axis=1) # Creation du df final avec une colonne db_create_status qui dis si la ligne doit etre creer ou mis a jour update = df.loc[df['a_id_sap'].isin(list(updated_rows['a_id_sap']))] update['db_create_status'] = 0 create = df.loc[df['a_id_sap'].isin(list(new_rows['a_id_sap']))] create['db_create_status'] = 1 df = pd.concat([update,create]) else : df['db_create_status'] = 1 not_updated_rows = pd.DataFrame([]) # IDs existants pour les clés étrangères domaines_in_db = set(Domaine.objects.values_list('pk', flat=True)) filieres_in_db = set(Filiere.objects.values_list('pk', flat=True)) formations_in_db = set(FormationEmploi.objects.values_list('pk', flat=True)) grades_in_db = set(Grade.objects.values_list('pk', flat=True)) fields_not_validated = [f.name for f in ModelType._meta.get_fields() if f.is_relation] fields_not_validated_2 = [f.name for f in ModelType_2._meta.get_fields() if f.is_relation] dict_create = {} dict_update = {} dict_up_to_date = {} dict_create_2 = {} set_dom = set() set_fil = set() counters = InsertionCounters() annee_pam = PAM.objects.filter(pam_statut='PAM en cours')[0].pam_id annee_pam_suivant = PAM.objects.filter(pam_statut='PAM A+1')[0].pam_id def process_row(row): pk = str(row[col_pk]) pk_2 = pk + str(annee_pam) pk_3 = pk + str(annee_pam_suivant) try: domaine = row['a_domaine'] if domaine is not None and domaine not in domaines_in_db: # TODO déjà fait dans l'extraction, serait mieux placé ici logger.warning("%s[pk=%s] domaine ignoré car absent du référentiel : %s", ModelType.__name__, pk, domaine) set_dom.add(domaine) counters.ignored += 1 return filiere = row['a_filiere'] if filiere is not None and filiere not in filieres_in_db: # TODO déjà fait dans l'extraction, serait mieux placé ici logger.warning("%s[pk=%s] filière ignoré car absente du référentiel : %s", ModelType.__name__, pk, filiere) set_fil.add(filiere) counters.ignored += 1 return grade = row['grade_id'] if grade is not None and grade not in grades_in_db: # TODO déjà fait dans l'extraction, serait mieux placé ici logger.warning("%s[pk=%s] ignoré car grade inconnu : %s", ModelType.__name__, pk, grade) counters.ignored += 1 return formation = row['formation_emploi_id'] if formation is not None and formation not in formations_in_db: # TODO déjà fait dans l'extraction, serait mieux placé ici logger.warning("%s[pk=%s] ignoré car formation-emploi inconnue : %s", ModelType.__name__, pk, formation) counters.ignored += 1 return model_2 = ModelType_2(**{ 'id' :pk_2, 'pam_id' :annee_pam, 'administre_id' :pk, 'a_statut_pam_annee' :StatutPam.NON_ETUDIE, }) model_3 = ModelType_2(**{ 'id' :pk_3, 'pam_id' :annee_pam_suivant, 'administre_id' :pk, 'a_statut_pam_annee' :StatutPam.NON_ETUDIE, }) in_db = models_in_db.get(pk) model = ModelType(**{ 'pk': pk, 'formation_emploi_id': formation, 'grade_id': grade, 'a_grade_date_debut': row['a_grade_date_debut'], 'a_liste_id_marques': row['a_liste_id_marques'], 'a_nom': row['a_nom'], 'a_prenom': row['a_prenom'], 'a_sexe': row['a_sexe'], 'a_id_def': row['a_id_def'], 'a_eip': row['a_eip'], 'a_eip_fiche_detaille': row['a_eip_fiche_detaille'], f'{Cols.REL_DOMAINE}_id': domaine, 'a_domaine_futur_id': domaine, f'{Cols.REL_FILIERE}_id': filiere, 'a_filiere_futur_id': filiere, 'a_fonction': row['a_fonction'], 'a_code_fonction': row['a_code_fonction'], 'a_nf': row['a_nf'], 'a_nf_futur': row['a_nf'], 'a_categorie': row['a_categorie'], 'a_domaine_gestion': row['a_domaine_gestion'], 'a_date_entree_service': row['a_date_entree_service'], 'a_arme': row['a_arme'], 'a_rg_origine_recrutement': row['a_rg_origine_recrutement'], 'a_date_naissance': row['a_date_naissance'], 'a_diplome_hl': row['a_diplome_hl'], 'a_dernier_diplome': row['a_dernier_diplome'], 'a_credo_fe': row['a_credo_fe'], 'a_date_arrivee_fe': row['a_date_arrivee_fe'], 'a_date_pos_statuaire': row['a_date_pos_statuaire'], 'a_pos_statuaire': row['a_pos_statuaire'], 'a_interruption_service': row['a_interruption_service'], 'a_situation_fam': row['a_situation_fam'], 'a_date_mariage': row['a_date_mariage'], 'a_nombre_enfants': row['a_nombre_enfants'], 'a_enfants': row['a_enfants'], 'a_date_rdc': row['a_date_rdc'], 'a_date_dernier_acr': row['a_date_dernier_acr'], 'a_eis': row['a_eis'], 'a_sap_conjoint': row['a_sap_conjoint'], 'a_flag_particulier': row['a_flag_particulier'], 'a_notes_gestionnaire': row['a_notes_gestionnaire'], 'a_notes_partagees': row['a_notes_partagees'], 'a_fonction1': row['a_fonction1'], 'a_fonction2': row['a_fonction2'], 'a_fonction3': row['a_fonction3'], 'a_fonction4': row['a_fonction4'], 'a_fonction5': row['a_fonction5'], 'a_fonction6': row['a_fonction6'], 'a_fonction7': row['a_fonction7'], 'a_fonction8': row['a_fonction8'], 'a_fonction9': row['a_fonction9'], 'a_date_fonction1': row['a_date_fonction1'], 'a_date_fonction2': row['a_date_fonction2'], 'a_date_fonction3': row['a_date_fonction3'], 'a_date_fonction4': row['a_date_fonction4'], 'a_date_fonction5': row['a_date_fonction5'], 'a_date_fonction6': row['a_date_fonction6'], 'a_date_fonction7': row['a_date_fonction7'], 'a_date_fonction8': row['a_date_fonction8'], 'a_date_fonction9': row['a_date_fonction9'], 'a_pls_gb_max': row['a_pls_gb_max'], 'a_marqueur_pn': row['a_marqueur_pn'], 'a_profession_conjoint': row['a_profession_conjoint'], 'a_id_def_conjoint': row['a_id_def_conjoint'], 'a_sexe_conjoint': row['a_sexe_conjoint'], 'a_origine_recrutement': row['a_origine_recrutement'], 'a_lien_service': row['a_lien_service'], 'a_statut_concerto': row[Cols.STATUT_CONCERTO], 'a_date_statut_concerto': row[Cols.DATE_STATUT_CONCERTO], 'a_statut_concerto_futur': row[Cols.STATUT_CONCERTO_FUTUR], 'a_date_statut_concerto_futur': row[Cols.DATE_STATUT_CONCERTO_FUTUR], 'a_fud': row['a_fud'], 'a_date_fud': row['a_date_fud'], }) if row['db_create_status']: model_3.full_clean(exclude=fields_not_validated, validate_unique=False) dict_create_2.setdefault(pk_3, model_3) model_2.full_clean(exclude=fields_not_validated, validate_unique=False) dict_create_2.setdefault(pk_2, model_2) model.full_clean(exclude=fields_not_validated, validate_unique=False) dict_create.setdefault(pk, model) else: if not ModelType_2.objects.filter(Q(id=pk_2)).exists(): model_2.full_clean(exclude=fields_not_validated_2, validate_unique=False) dict_create_2.setdefault(pk_2, model_2) if not ModelType_2.objects.filter(Q(id=pk_3)).exists(): model_3.full_clean(exclude=fields_not_validated, validate_unique=False) dict_create_2.setdefault(pk_3, model_3) model.full_clean(exclude=fields_not_validated, validate_unique=False) dict_update.setdefault(pk, model) except Exception: counters.errors += 1 logger.exception('%s une erreur est survenue à la ligne : %s (pk=%s)', ModelType.__name__, row['index'], pk) df['index'] = df.index df.apply(process_row, axis=1) if counters.errors: logger.warning("%s | en erreur : %s", ModelType.__name__, counters.errors) if counters.ignored: logger.warning('%s | ignoré(s) : %s', ModelType.__name__, counters.ignored) if set_dom: logger.warning('%s %s(s) ignoré(s) : %s', len(set_dom), Domaine.__name__, set_dom) if set_fil: logger.warning('%s %s(s) ignorée(s) : %s', len(set_fil), Filiere.__name__, set_fil) if not not_updated_rows.empty and fields_to_update: logger.info('%s | déjà à jour : %s', ModelType.__name__, len(not_updated_rows)) batch_size = 50 if dict_create: logger.info('%s | à créer : %s', ModelType.__name__, len(dict_create)) for idx, data_batch in enumerate(batch_iterator(list(dict_create.values()), batch_size)): ModelType.objects.bulk_create(data_batch) logger.debug('créé(s) : %s (lot %s)', len(data_batch), idx + 1) logger.info('%s | créé(s) : %s', ModelType.__name__, len(dict_create)) else : logger.info('%s | rien à créer', ModelType.__name__) if dict_create_2: logger.info('%s | à créer dans la vue A+1 : (%s)', ModelType_2.__name__, len(dict_create_2)) for idx, data_batch in enumerate(batch_iterator(list(dict_create_2.values()), batch_size)): ModelType_2.objects.bulk_create(data_batch) logger.debug('créé(s) : %s (lot %s)', len(data_batch), idx + 1) logger.info('%s | créé(s) : %s', ModelType_2.__name__, len(dict_create_2)) if dict_update and fields_to_update: logger.info('%s | à mettre à jour : %s', ModelType.__name__, len(dict_update)) for idx, data_batch in enumerate(batch_iterator(list(dict_update.values()), batch_size)): ModelType.objects.bulk_update(data_batch, fields=fields_to_update) logger.debug('mis à jour : %s (lot %s)', len(data_batch), idx + 1) logger.info('%s | mis à jour : %s', ModelType.__name__, len(dict_update)) else : logger.info('%s | rien à mettre à jour', ModelType.__name__) adm_cree = len(dict_create) + len(dict_create_2) adm_modifie = len(dict_update) return adm_cree, adm_modifie, len(not_updated_rows), counters.errors, counters.ignored, set_dom, set_fil # pas de suppression ici, il est prévu de faire un upload de fichier spécifique