Files
aem_moniteurs/aem_gestion/models/default.py
2023-06-22 10:34:18 +02:00

295 lines
11 KiB
Python

# -*- coding: utf8 -*-
from datetime import datetime
from sqlalchemy import text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (
scoped_session,
sessionmaker,
)
from zope.sqlalchemy import ZopeTransactionExtension, mark_changed
import transaction
def update_eleve_solde(request,cd_cli):
query = """ CALL spUPD_ELEVES_SOLDE(:cd_cli); """
execute_query(request,query,{'cd_cli':str(cd_cli),'date':datetime.today()})
def get_mode_mode_regl(request,search):
# lire l'eleve connecte
query = """SELECT * FROM p_mode_regl WHERE LIBELLE like '%{0}%';""".format(search)
print(query)
results = request.dbsession.execute(query).fetchall()
return results
def get_libelle_mode_regl(request,search):
# lire l'eleve connecte
query = """SELECT * FROM p_libelle_regl WHERE LIBELLE like '%{0}%';""".format(search)
results = request.dbsession.execute(query).fetchall()
return results
def get_list_activity(request,search):
query = "SELECT * FROM `eleves` WHERE AGENCE=0 AND PERMIS_DEMANDE='ACTIV' AND NOM like :search "
results = request.dbsession.execute(query,{'search':'%'+search+'%'}).fetchall()
return results
def execute_query(request, query, params):
"""Execute query and mark session as changed"""
request.dbsession.execute(query, params)
mark_changed(request.dbsession)
transaction.commit()
def get_formules(request, formule, validite):
"""Lire les formules"""
if validite == 'TOUTES':
query = """SELECT * FROM formules WHERE formule like :code ORDER BY formule;"""
else:
query = """SELECT * FROM formules WHERE DateFin >= CURRENT_DATE AND formule like :code ORDER BY formule;"""
results = request.dbsession.execute(query, {'code': formule + "%"})
return results.fetchall()
def get_formule(request,formule):
query = """SELECT * FROM formules WHERE formule=:code ORDER BY formule;"""
results = request.dbsession.execute(query, {'code': formule })
return results.first()
def get_dept_nais(request, code):
query = """SELECT * FROM p_departements WHERE code like :code ORDER BY code;"""
results = request.dbsession.execute(query, {'code': code + "%"})
return results.fetchall()
def get_departements(request):
""" Lire toutes les départements """
query = "SELECT * FROM p_departements order by no_tri;"
results = request.dbsession.execute(query).fetchall()
return results
def get_all_dept_nais(request):
query = """SELECT * FROM p_departements ORDER BY code;"""
results = request.dbsession.execute(query)
return results.fetchall()
def get_users_by_code(request, cd_uti):
if cd_uti == '0':
query = """SELECT * FROM p_users order by cd_uti;"""
results = request.dbsession.execute(query).fetchall()
else:
# lire l'utilisateur connecte
query = """SELECT * FROM p_users WHERE cd_uti=:cd_uti;"""
results = request.dbsession.execute(query, {'cd_uti': cd_uti}).first()
return results
def get_user_access(request, cd_uti):
# lire l'utilisateur connecte
query = """SELECT secu FROM p_users WHERE cd_uti=:cd_uti;"""
results = request.dbsession.execute(query, {'cd_uti': cd_uti}).first()
return results.secu
def get_user_cd_mon(request, cd_uti):
# lire le code moniteur de l'utilisateur connecte
query = """SELECT cd_mon, cd_mon_B78 FROM p_users WHERE cd_uti=:cd_uti;"""
results = request.dbsession.execute(query, {'cd_uti': cd_uti}).first()
# lire le code moniteur de l'utilisateur
if results:
cd_mon = results.cd_mon
cd_mon_b78 = results.cd_mon_B78
else:
cd_mon = '1JS'
cd_mon_b78 = ''
return cd_mon,cd_mon_b78
def get_user_cd_mon_B78(request, cd_uti):
# lire le code moniteur de l'utilisateur connecte
query = """SELECT cd_mon_B78 FROM p_users WHERE cd_uti=:cd_uti;"""
results = request.dbsession.execute(query, {'cd_uti': cd_uti}).first()
# lire le code moniteur de l'utilisateur
if results:
cd_mon_b78 = results.cd_mon_B78
else:
cd_mon_b78 = ''
return cd_mon_b78
def get_cd_mon_B78(request, cd_mon):
# lire le code moniteur de l'utilisateur connecte
query = """SELECT cd_mon_B78 FROM p_users WHERE cd_mon=:cd_mon;"""
results = request.dbsession.execute(query, {'cd_mon': cd_mon}).first()
# lire le code moniteur de l'utilisateur
if results:
cd_mon_b78 = results.cd_mon_B78
else:
cd_mon_b78 = ''
return cd_mon_b78
def get_cd_mon_from_cd_b78(request, cd_mon_b78):
# lire le code moniteur de l'utilisateur connecte
query = """SELECT cd_mon FROM p_users WHERE cd_mon_b78=:cd_mon_b78;"""
results = request.dbsession.execute(query, {'cd_mon_b78': cd_mon_b78}).first()
# lire le code moniteur de l'utilisateur
if results:
cd_mon = results.cd_mon
else:
cd_mon = ''
return cd_mon
def isDelaiOK(request, date, cree_le, statut):
# controler que le delai de suppression est OK
query = "CALL spCTL_DELAI_ANNUL(:date, :cree_le, :statut)"
results = request.dbsession.execute(query, {'date': date, 'cree_le': cree_le, 'statut': statut}).first()
return results.resultat
def update_last_connection(request, login, ua_string):
"""Update last connection for login """
query = "UPDATE p_users SET dern_cnx_le=NOW(), ua_string=:ua_string WHERE cd_uti=:login;"
execute_query(request, query, {'login': login, 'ua_string': ua_string})
def update_password(request, login, password):
"""Update password for member login"""
query = "UPDATE p_users SET mdp_hash=SHA1(:password), mdp_oublie=NULL, mdp_oublie_date=NULL WHERE cd_uti=:login"
execute_query(request, query, {'login': login, 'password': password})
def is_lien_mdp_oublie(request, lien):
query = "SELECT mdp_oublie FROM p_users WHERE mdp_oublie=:lien;"
results = request.dbsession.execute(query, {'lien': lien}).first()
return len(results) > 0
def get_mdp_oublie_infos(request, lien):
query = "SELECT * FROM p_users WHERE mdp_oublie=:lien;"
results = request.dbsession.execute(query, {"lien": lien}).first()
return results
def update_membre_mdp_oublie(request, cd_uti):
import uuid, base64
# get a UUID - URL safe, Base64
uid = uuid.uuid1()
urlslug = base64.urlsafe_b64encode(uid.bytes).decode("utf-8").rstrip('=\n').replace('/', '_')
query = "UPDATE p_users SET mdp_oublie=:urlslug, mdp_oublie_date=now() WHERE cd_uti=:cd_uti;"
execute_query(request, query, {'urlslug': urlslug, 'cd_uti': cd_uti})
return urlslug
def update_user_agency(request, cd_uti, agence):
query = "UPDATE p_users SET agence=:agence WHERE cd_uti = :cd_uti;"
execute_query(request, query, {'agence': agence, 'cd_uti': cd_uti})
def get_all_moniteurs(request):
query = "SELECT * FROM moniteurs ORDER BY nom;"
results = request.dbsession.execute(query)
return results.fetchall()
def get_moniteur_by_code(request,cd_mon):
query = "SELECT * FROM moniteurs WHERE cd_mon = :cd_mon;"
results = request.dbsession.execute(query, {'cd_mon': cd_mon})
return results.first()
def get_moniteurs(request, cd_mon):
if cd_mon == 0:
query = "SELECT * FROM moniteurs WHERE obsolete = 0 ORDER BY nom;"
results = request.dbsession.execute(query)
return results.fetchall()
else:
query = "SELECT * FROM moniteurs WHERE cd_mon = :cd_mon;"
results = request.dbsession.execute(query, {'cd_mon': cd_mon})
return results.first()
def get_moniteur_aff(request, cd_mon):
query = "SELECT * FROM moniteurs_aff WHERE cd_mon = :cd_mon ORDER BY DATED;"
results = request.dbsession.execute(query, {'cd_mon': cd_mon})
return results.fetchall()
def get_moniteur_aff_by_no_ligne(request, no_ligne):
query = "SELECT * FROM moniteurs_aff WHERE no_ligne = :no_ligne"
results = request.dbsession.execute(query, {'no_ligne': no_ligne})
return results.first()
def get_all_moniteur_aff(request):
query = "SELECT * FROM moniteurs_aff ORDER BY DATED ASC"
results = request.dbsession.execute(query,)
return results.fetchall()
def get_all_moniteur_active_by_name(request,name):
query = "SELECT * FROM moniteurs WHERE nom like :name ORDER BY NOM ASC"
results = request.dbsession.execute(query,{"name" : "%"+name+"%"})
return results.fetchall()
def get_agences(request, code):
if code == 0:
query = "SELECT * FROM p_agences ORDER BY CODE;"
results = request.dbsession.execute(query)
return results.fetchall()
else:
query = "SELECT * FROM p_agences WHERE code = :code;"
results = request.dbsession.execute(query, {'code': code})
return results.first()
def getAgenceLib(request, code):
query = "SELECT * FROM p_agences WHERE code = :code;"
results = request.dbsession.execute(query, {'code': code})
return results.first().LIBELLE
def get_all_formules(request, permis):
"""Lire les formules"""
query = """CALL spGet_FORMULES_valide('{0}','')""".format(permis)
results = request.dbsession.execute(query)
return results.fetchall()
def get_codespostaux(request, code):
query = "SELECT * FROM p_codespostaux WHERE code_postal like :code;"
results = request.dbsession.execute(query, {'code': code + "%"})
return results.fetchall()
def get_tarifs_byName(request, ref):
"""Lire les tarifs commencant par"""
query = "SELECT * FROM tarifs where ref like :ref AND obsolete = 0;"
results = request.dbsession.execute(query, {'ref': ref + '%'})
return results.fetchall()
def get_types_devis(request, type_devis):
"""Lire les types_devis"""
if type_devis == '0':
query = "SELECT * FROM p_types_devis ORDER BY type_devis;"
results = request.dbsession.execute(query).fetchall()
elif type_devis == 'CPF':
query = "SELECT * FROM p_types_devis where type_devis like 'B_%' AND dispo <> 0 ORDER BY type_devis;"
results = request.dbsession.execute(query).fetchall()
elif type_devis == 'ALL':
query = "SELECT * FROM p_types_devis where dispo <> 0 ORDER BY type_devis;"
results = request.dbsession.execute(query).fetchall()
else:
query = "SELECT * FROM p_types_devis where type_devis = :type_devis;"
results = request.dbsession.execute(query, {'type_devis': type_devis}).first()
return results
def delete_stage_lig_by_cd_cli(request,stage_type,semaine,groupe,cd_cli):
query = """ CALL spDel_PLA_STAGE_LIGNES(:type,:semaine,:groupe,:cd_cli)"""
execute_query(request,query, {'type': stage_type, 'semaine': semaine, 'groupe': groupe, "cd_cli":cd_cli})
def delete_moto_lig_by_cd_cli(request,noligne,ligne_cpt,cd_cli,cd_uti):
query = """ CALL spDel_PLA_MOTO_LIGNES(:noligne,:ligne_cpt,:cd_cli,:cd_uti)"""
execute_query(request,query, {'noligne': noligne, 'ligne_cpt': ligne_cpt, 'cd_cli': cd_cli, "cd_uti":cd_uti})
def update_panning_moto_dispo(request,type_planning,date,groupe):
query2 = "CALL spUpd_PLA_MOTO_DISPO(:TYPE,:DATE,:GROUPE)"
execute_query(request, query2, {"TYPE":type_planning,"DATE":date,"GROUPE":groupe})
def get_moniteur_aff_by_agence(request, cd_mon, agence):
query = "SELECT * FROM moniteurs_aff WHERE cd_mon = :cd_mon AND agence=:agence ORDER BY DATED;"
results = request.dbsession.execute(
query, {'cd_mon': cd_mon, 'agence': agence})
return results.fetchall()