295 lines
11 KiB
Python
295 lines
11 KiB
Python
# -*- coding: utf8 -*-
|
|
from datetime import datetime
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import (
|
|
scoped_session,
|
|
sessionmaker,
|
|
)
|
|
|
|
from zope.sqlalchemy import ZopeTransactionExtension, mark_changed
|
|
import transaction
|
|
|
|
def update_eleve_solde(request,cd_cli):
|
|
query = """ CALL spUPD_ELEVES_SOLDE(:cd_cli); """
|
|
execute_query(request,query,{'cd_cli':str(cd_cli),'date':datetime.today()})
|
|
|
|
def get_mode_mode_regl(request,search):
|
|
# lire l'eleve connecte
|
|
query = """SELECT * FROM p_mode_regl WHERE LIBELLE like '%{0}%';""".format(search)
|
|
print(query)
|
|
results = request.dbsession.execute(query).fetchall()
|
|
return results
|
|
|
|
def get_libelle_mode_regl(request,search):
|
|
# lire l'eleve connecte
|
|
query = """SELECT * FROM p_libelle_regl WHERE LIBELLE like '%{0}%';""".format(search)
|
|
results = request.dbsession.execute(query).fetchall()
|
|
return results
|
|
|
|
def get_list_activity(request,search):
|
|
query = "SELECT * FROM `eleves` WHERE AGENCE=0 AND PERMIS_DEMANDE='ACTIV' AND NOM like :search "
|
|
results = request.dbsession.execute(query,{'search':'%'+search+'%'}).fetchall()
|
|
return results
|
|
|
|
def execute_query(request, query, params):
|
|
"""Execute query and mark session as changed"""
|
|
request.dbsession.execute(query, params)
|
|
mark_changed(request.dbsession)
|
|
transaction.commit()
|
|
|
|
|
|
def get_formules(request, formule, validite):
|
|
"""Lire les formules"""
|
|
if validite == 'TOUTES':
|
|
query = """SELECT * FROM formules WHERE formule like :code ORDER BY formule;"""
|
|
else:
|
|
query = """SELECT * FROM formules WHERE DateFin >= CURRENT_DATE AND formule like :code ORDER BY formule;"""
|
|
results = request.dbsession.execute(query, {'code': formule + "%"})
|
|
return results.fetchall()
|
|
|
|
def get_formule(request,formule):
|
|
query = """SELECT * FROM formules WHERE formule=:code ORDER BY formule;"""
|
|
results = request.dbsession.execute(query, {'code': formule })
|
|
return results.first()
|
|
|
|
def get_dept_nais(request, code):
|
|
query = """SELECT * FROM p_departements WHERE code like :code ORDER BY code;"""
|
|
results = request.dbsession.execute(query, {'code': code + "%"})
|
|
return results.fetchall()
|
|
|
|
def get_departements(request):
|
|
""" Lire toutes les départements """
|
|
query = "SELECT * FROM p_departements order by no_tri;"
|
|
results = request.dbsession.execute(query).fetchall()
|
|
return results
|
|
|
|
def get_all_dept_nais(request):
|
|
query = """SELECT * FROM p_departements ORDER BY code;"""
|
|
results = request.dbsession.execute(query)
|
|
return results.fetchall()
|
|
|
|
|
|
def get_users_by_code(request, cd_uti):
|
|
if cd_uti == '0':
|
|
query = """SELECT * FROM p_users order by cd_uti;"""
|
|
results = request.dbsession.execute(query).fetchall()
|
|
else:
|
|
# lire l'utilisateur connecte
|
|
query = """SELECT * FROM p_users WHERE cd_uti=:cd_uti;"""
|
|
results = request.dbsession.execute(query, {'cd_uti': cd_uti}).first()
|
|
return results
|
|
|
|
|
|
def get_user_access(request, cd_uti):
|
|
# lire l'utilisateur connecte
|
|
query = """SELECT secu FROM p_users WHERE cd_uti=:cd_uti;"""
|
|
results = request.dbsession.execute(query, {'cd_uti': cd_uti}).first()
|
|
return results.secu
|
|
|
|
|
|
def get_user_cd_mon(request, cd_uti):
|
|
# lire le code moniteur de l'utilisateur connecte
|
|
query = """SELECT cd_mon, cd_mon_B78 FROM p_users WHERE cd_uti=:cd_uti;"""
|
|
results = request.dbsession.execute(query, {'cd_uti': cd_uti}).first()
|
|
# lire le code moniteur de l'utilisateur
|
|
if results:
|
|
cd_mon = results.cd_mon
|
|
cd_mon_b78 = results.cd_mon_B78
|
|
else:
|
|
cd_mon = '1JS'
|
|
cd_mon_b78 = ''
|
|
return cd_mon,cd_mon_b78
|
|
|
|
def get_user_cd_mon_B78(request, cd_uti):
|
|
# lire le code moniteur de l'utilisateur connecte
|
|
query = """SELECT cd_mon_B78 FROM p_users WHERE cd_uti=:cd_uti;"""
|
|
results = request.dbsession.execute(query, {'cd_uti': cd_uti}).first()
|
|
# lire le code moniteur de l'utilisateur
|
|
if results:
|
|
cd_mon_b78 = results.cd_mon_B78
|
|
else:
|
|
cd_mon_b78 = ''
|
|
return cd_mon_b78
|
|
|
|
def get_cd_mon_B78(request, cd_mon):
|
|
# lire le code moniteur de l'utilisateur connecte
|
|
query = """SELECT cd_mon_B78 FROM p_users WHERE cd_mon=:cd_mon;"""
|
|
results = request.dbsession.execute(query, {'cd_mon': cd_mon}).first()
|
|
# lire le code moniteur de l'utilisateur
|
|
if results:
|
|
cd_mon_b78 = results.cd_mon_B78
|
|
else:
|
|
cd_mon_b78 = ''
|
|
return cd_mon_b78
|
|
|
|
def get_cd_mon_from_cd_b78(request, cd_mon_b78):
|
|
# lire le code moniteur de l'utilisateur connecte
|
|
query = """SELECT cd_mon FROM p_users WHERE cd_mon_b78=:cd_mon_b78;"""
|
|
results = request.dbsession.execute(query, {'cd_mon_b78': cd_mon_b78}).first()
|
|
# lire le code moniteur de l'utilisateur
|
|
if results:
|
|
cd_mon = results.cd_mon
|
|
else:
|
|
cd_mon = ''
|
|
return cd_mon
|
|
|
|
|
|
def isDelaiOK(request, date, cree_le, statut):
|
|
# controler que le delai de suppression est OK
|
|
query = "CALL spCTL_DELAI_ANNUL(:date, :cree_le, :statut)"
|
|
results = request.dbsession.execute(query, {'date': date, 'cree_le': cree_le, 'statut': statut}).first()
|
|
return results.resultat
|
|
|
|
|
|
def update_last_connection(request, login, ua_string):
|
|
"""Update last connection for login """
|
|
query = "UPDATE p_users SET dern_cnx_le=NOW(), ua_string=:ua_string WHERE cd_uti=:login;"
|
|
execute_query(request, query, {'login': login, 'ua_string': ua_string})
|
|
|
|
|
|
def update_password(request, login, password):
|
|
"""Update password for member login"""
|
|
query = "UPDATE p_users SET mdp_hash=SHA1(:password), mdp_oublie=NULL, mdp_oublie_date=NULL WHERE cd_uti=:login"
|
|
execute_query(request, query, {'login': login, 'password': password})
|
|
|
|
|
|
def is_lien_mdp_oublie(request, lien):
|
|
query = "SELECT mdp_oublie FROM p_users WHERE mdp_oublie=:lien;"
|
|
results = request.dbsession.execute(query, {'lien': lien}).first()
|
|
return len(results) > 0
|
|
|
|
|
|
def get_mdp_oublie_infos(request, lien):
|
|
query = "SELECT * FROM p_users WHERE mdp_oublie=:lien;"
|
|
results = request.dbsession.execute(query, {"lien": lien}).first()
|
|
return results
|
|
|
|
|
|
def update_membre_mdp_oublie(request, cd_uti):
|
|
import uuid, base64
|
|
|
|
# get a UUID - URL safe, Base64
|
|
uid = uuid.uuid1()
|
|
urlslug = base64.urlsafe_b64encode(uid.bytes).decode("utf-8").rstrip('=\n').replace('/', '_')
|
|
query = "UPDATE p_users SET mdp_oublie=:urlslug, mdp_oublie_date=now() WHERE cd_uti=:cd_uti;"
|
|
execute_query(request, query, {'urlslug': urlslug, 'cd_uti': cd_uti})
|
|
return urlslug
|
|
|
|
|
|
def update_user_agency(request, cd_uti, agence):
|
|
query = "UPDATE p_users SET agence=:agence WHERE cd_uti = :cd_uti;"
|
|
execute_query(request, query, {'agence': agence, 'cd_uti': cd_uti})
|
|
|
|
|
|
def get_all_moniteurs(request):
|
|
query = "SELECT * FROM moniteurs ORDER BY nom;"
|
|
results = request.dbsession.execute(query)
|
|
return results.fetchall()
|
|
|
|
def get_moniteur_by_code(request,cd_mon):
|
|
query = "SELECT * FROM moniteurs WHERE cd_mon = :cd_mon;"
|
|
results = request.dbsession.execute(query, {'cd_mon': cd_mon})
|
|
return results.first()
|
|
|
|
def get_moniteurs(request, cd_mon):
|
|
if cd_mon == 0:
|
|
query = "SELECT * FROM moniteurs WHERE obsolete = 0 ORDER BY nom;"
|
|
results = request.dbsession.execute(query)
|
|
return results.fetchall()
|
|
else:
|
|
query = "SELECT * FROM moniteurs WHERE cd_mon = :cd_mon;"
|
|
results = request.dbsession.execute(query, {'cd_mon': cd_mon})
|
|
return results.first()
|
|
|
|
def get_moniteur_aff(request, cd_mon):
|
|
query = "SELECT * FROM moniteurs_aff WHERE cd_mon = :cd_mon ORDER BY DATED;"
|
|
results = request.dbsession.execute(query, {'cd_mon': cd_mon})
|
|
return results.fetchall()
|
|
|
|
def get_moniteur_aff_by_no_ligne(request, no_ligne):
|
|
query = "SELECT * FROM moniteurs_aff WHERE no_ligne = :no_ligne"
|
|
results = request.dbsession.execute(query, {'no_ligne': no_ligne})
|
|
return results.first()
|
|
|
|
def get_all_moniteur_aff(request):
|
|
query = "SELECT * FROM moniteurs_aff ORDER BY DATED ASC"
|
|
results = request.dbsession.execute(query,)
|
|
return results.fetchall()
|
|
|
|
def get_all_moniteur_active_by_name(request,name):
|
|
query = "SELECT * FROM moniteurs WHERE nom like :name ORDER BY NOM ASC"
|
|
results = request.dbsession.execute(query,{"name" : "%"+name+"%"})
|
|
return results.fetchall()
|
|
|
|
|
|
def get_agences(request, code):
|
|
if code == 0:
|
|
query = "SELECT * FROM p_agences ORDER BY CODE;"
|
|
results = request.dbsession.execute(query)
|
|
return results.fetchall()
|
|
else:
|
|
query = "SELECT * FROM p_agences WHERE code = :code;"
|
|
results = request.dbsession.execute(query, {'code': code})
|
|
return results.first()
|
|
|
|
|
|
def getAgenceLib(request, code):
|
|
query = "SELECT * FROM p_agences WHERE code = :code;"
|
|
results = request.dbsession.execute(query, {'code': code})
|
|
return results.first().LIBELLE
|
|
|
|
|
|
def get_all_formules(request, permis):
|
|
"""Lire les formules"""
|
|
query = """CALL spGet_FORMULES_valide('{0}','')""".format(permis)
|
|
results = request.dbsession.execute(query)
|
|
return results.fetchall()
|
|
|
|
|
|
def get_codespostaux(request, code):
|
|
query = "SELECT * FROM p_codespostaux WHERE code_postal like :code;"
|
|
results = request.dbsession.execute(query, {'code': code + "%"})
|
|
return results.fetchall()
|
|
|
|
|
|
def get_tarifs_byName(request, ref):
|
|
"""Lire les tarifs commencant par"""
|
|
query = "SELECT * FROM tarifs where ref like :ref AND obsolete = 0;"
|
|
results = request.dbsession.execute(query, {'ref': ref + '%'})
|
|
return results.fetchall()
|
|
|
|
|
|
def get_types_devis(request, type_devis):
|
|
"""Lire les types_devis"""
|
|
if type_devis == '0':
|
|
query = "SELECT * FROM p_types_devis ORDER BY type_devis;"
|
|
results = request.dbsession.execute(query).fetchall()
|
|
elif type_devis == 'CPF':
|
|
query = "SELECT * FROM p_types_devis where type_devis like 'B_%' AND dispo <> 0 ORDER BY type_devis;"
|
|
results = request.dbsession.execute(query).fetchall()
|
|
elif type_devis == 'ALL':
|
|
query = "SELECT * FROM p_types_devis where dispo <> 0 ORDER BY type_devis;"
|
|
results = request.dbsession.execute(query).fetchall()
|
|
else:
|
|
query = "SELECT * FROM p_types_devis where type_devis = :type_devis;"
|
|
results = request.dbsession.execute(query, {'type_devis': type_devis}).first()
|
|
return results
|
|
|
|
def delete_stage_lig_by_cd_cli(request,stage_type,semaine,groupe,cd_cli):
|
|
query = """ CALL spDel_PLA_STAGE_LIGNES(:type,:semaine,:groupe,:cd_cli)"""
|
|
execute_query(request,query, {'type': stage_type, 'semaine': semaine, 'groupe': groupe, "cd_cli":cd_cli})
|
|
|
|
def delete_moto_lig_by_cd_cli(request,noligne,ligne_cpt,cd_cli,cd_uti):
|
|
query = """ CALL spDel_PLA_MOTO_LIGNES(:noligne,:ligne_cpt,:cd_cli,:cd_uti)"""
|
|
execute_query(request,query, {'noligne': noligne, 'ligne_cpt': ligne_cpt, 'cd_cli': cd_cli, "cd_uti":cd_uti})
|
|
|
|
def update_panning_moto_dispo(request,type_planning,date,groupe):
|
|
query2 = "CALL spUpd_PLA_MOTO_DISPO(:TYPE,:DATE,:GROUPE)"
|
|
execute_query(request, query2, {"TYPE":type_planning,"DATE":date,"GROUPE":groupe})
|
|
|
|
def get_moniteur_aff_by_agence(request, cd_mon, agence):
|
|
query = "SELECT * FROM moniteurs_aff WHERE cd_mon = :cd_mon AND agence=:agence ORDER BY DATED;"
|
|
results = request.dbsession.execute(
|
|
query, {'cd_mon': cd_mon, 'agence': agence})
|
|
return results.fetchall() |