Files
aem_moniteurs/aem_gestion/views/examens.py
2023-06-22 10:34:18 +02:00

438 lines
15 KiB
Python

# -*- coding: utf8 -*-
from pyramid.view import (
view_config,
)
from pyramid.httpexceptions import (
HTTPFound,
)
from datetime import date, datetime
from dateutil.relativedelta import *
from urllib.request import urlopen
import json
import os
import shutil
import imaplib
import email
from ..models.examens import *
from ..models.default import *
@view_config(route_name='examens_schd', renderer='../templates/examens/examens_schd.pt', permission='view')
def examens_schd(request):
def generer_planning(datedeb, permis):
rows = get_examens_byDate(request, datedeb, permis)
# construire la liste des events
events = []
for row in rows:
# déterminer la couleur de l'event
color = get_examens_aff_color(request, row)
debut = datetime.strptime('%s %s:%s:00' % (row.DATE.strftime('%Y-%m-%d'), row.HEURE.seconds//3600, (row.HEURE.seconds//60)%60),
'%Y-%m-%d %H:%M:00')
fin = debut + relativedelta(minutes = 60)
agence = getAgenceLib(request, row.AGENCE)
json_event = {
'title': '%s à %s' % (row.NO_EXA, row.LIEU),
'start': debut.strftime('%Y-%m-%d %H:%M:00'),
'end': fin.strftime('%Y-%m-%d %H:%M:00'),
'description': '%s | %s' % (row.CD_MON, agence[7:]),
'allDay': False,
'color': color,
'textColor': '#000000',
'url': '/examen_list/%s/%s' % (row.DATE.strftime('%Y-%m-%d'), row.NO_EXA),
}
events.append(json_event)
return json.dumps(events)
# récupérer les paramètres de l'appel de la view
datePlan = request.matchdict['date']
permis = request.matchdict['permis']
logged_in = request.authenticated_userid
types_permis = ['ETG','B','2R','GL']
if datePlan == 'today':
datePlan = date.today().strftime('%Y-%m-%d')
# si permis a été changé par le user
if 'permis' in request.params:
permis = request.params["permis"]
url = request.route_url('examens_schd', date=datePlan, permis=permis)
TODAY = date.today()
# début = aujourd'hui - 4 semaines
d = TODAY + relativedelta(weeks=-8)
datedeb = d.strftime('%Y-%m-%d')
# generer le planning
calendar_events = generer_planning(datedeb, permis)
return {
'page_title': "Examens %s" % permis,
'url': url,
'datePlan': datePlan,
'permis': permis,
'types_permis': types_permis,
'calendar_events': calendar_events,
}
@view_config(route_name='examen_list', renderer='../templates/examens/examen_list.pt', permission='view')
def examen_list(request):
date_exa = request.matchdict['date_exa']
no_exa = request.matchdict['no_exa']
# convertir date_exa en format date
date = datetime.strptime(date_exa, '%Y-%m-%d')
# lire examen
examen = get_examens(request, date_exa, no_exa)
if examen:
lieu_exa = examen.LIEU
permis = examen.PERMIS
accompgntr = examen.CD_MON
dateheure = datetime.strptime('%s %s:%s:00' % (examen.DATE.strftime('%Y-%m-%d'), examen.HEURE.seconds//3600, (examen.HEURE.seconds//60)%60),
'%Y-%m-%d %H:%M:00').strftime('%d-%m-%Y %H h')
else:
lieu_exa = 'Non trouvé'
permis = ""
accompgntr = ""
dateheure = ""
# lire les inscrits dans examen
items = get_examens_aff(request, date_exa, no_exa)
total_unites = 0
# construire la liste
liste=[]
for item in items:
# cumuler le total d'unité
total_unites += item.UNITE
if item.TR_T_OK:
ETG_OK = item.TR_T_OK.strftime('%d-%m-%Y')
else:
ETG_OK = ""
tel = ''
if item.TEL:
if len(item.TEL) > 0:
tel += item.TEL + ' '
if item.TEL2:
if len(item.TEL2) > 0:
tel += item.TEL2 + ' '
if item.TEL3:
if len(item.TEL3) > 0:
tel += 'P:' + item.TEL3 + ' '
if item.TEL4:
if len(item.TEL4) > 0:
tel += 'M:' + item.TEL4 + ' '
agence = getAgenceLib(request, item.AGENCE)
d = (agence[7:10], '%s - %s' % ("%06d" % item.CD_CLI, item.NOM), item.NOM_JF, item.filiere, item.CD_MON, tel, ETG_OK,
'%s - %s' % (item.INDICE, item.UNITE))
liste.append(d)
return {
'page_title': 'Examen N° %s du %s' % (no_exa, dateheure),
'lieu_exa': lieu_exa,
'permis': permis,
'total_unites': total_unites,
'accompgntr': accompgntr,
'dt_data': json.dumps(liste),
}
@view_config(route_name='results_import', renderer='../templates/examens/results_import.pt', permission='view')
def results_import(request):
def get_emails(lecture):
# créer la liste des entêtes des messages
liste = []
# connecter au serveur de mail
conn, data = mailbox_connect(request)
if conn == None:
request.session.flash("ERREUR de lecture de la boîte de réception", 'danger')
return liste
mail_ids = data[0]
for email_UID in mail_ids.split():
rv, msg_data = conn.fetch(email_UID, '(RFC822)')
if rv != 'OK':
request.session.flash("ERREUR de lecture du message %s" % email_UID, 'danger')
return liste
msg = email.message_from_bytes(msg_data[0][1])
hdr = email.header.make_header(email.header.decode_header(msg['Subject']))
email_subject = str(hdr)
email_from = email.utils.parseaddr(msg['from'])[1]
# Now convert to local date-time
date_tuple = email.utils.parsedate_tz(msg['Date'])
if date_tuple:
email_date = datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
else:
email_date = datetime.now()
d = {
"email_date": email_date,
"email_from": email_from.split('@')[1],
'email_subject':email_subject,
"email_uid": email_UID
}
liste.append(d)
if lecture == 'first':
break
# deconnexion du serveur
conn.close()
conn.logout()
return liste
def analyser_email(body):
neph = ""
date_exa = ''
resultat_exa = ''
# scan le body pour extraire les infos utiles
body = body.decode('latin-1')
# analyser la phrase suivante:
# Le candidat n°210269100510 que vous avez présenté à l'examen pratique (BC) du 19/07/2021 a été ajourné.
# récupère le neph (12 car.)
i = body.find('Le candidat n°')
if i > 0:
neph = body[i+14:i+26]
# récupère la date examen (10 car.)
i = body.find('du ', i)
if i > 0:
date_exa = body[i+3:i+13]
# récupère le résultat
if body.find("a été reçu") > 0:
resultat_exa = 10 # "BON"
resultat_lib = "BON"
elif body.find("a été ajourné") > 0:
resultat_exa = 1 # "ECHEC"
resultat_lib = "ECHEC"
else:
resultat_exa = 7 # "ABS"
resultat_lib = "ABS. EXCUSEE"
return neph, date_exa, resultat_exa, resultat_lib
def import_email(request, email_uid):
# importer le résulat dans la BD
# retour :
# - "completed" : importation réussi
# - "not_found" : éléve non trouvé
# - "error" : erreur détectée
# connecter au serveur de mail
conn, data = mailbox_connect(request)
if conn == None:
request.session.flash("ERREUR de lecture de la boîte de réception", 'danger')
return "error"
# lire le message avec UID
rv, msg_data = conn.fetch(email_uid, '(RFC822)')
if rv != 'OK':
request.session.flash("ERREUR de lecture du message %S" % (email_uid), 'danger')
return "error"
email_message = email.message_from_bytes(msg_data[0][1])
# get the message's body
body = ''
for part in email_message.walk():
ctype = part.get_content_type()
cdispo = str(part.get('Content-Disposition'))
# skip any text/plain (txt) attachments
if ctype == 'text/plain' and 'attachment' not in cdispo:
body = part.get_payload(decode=True) # decode
break
# Analyser le message pour récupérer les infos utiles
neph, date_exa, resultat_exa, resultat_lib = analyser_email(body)
# convertir la date en 'yyyy/mm/dd'
dateexa = datetime.strptime(date_exa, '%d/%m/%Y')
dateexa = dateexa.strftime('%Y-%m-%d')
if neph != '':
# éléve trouvé ?
eleve = get_eleves_by_neph(request, neph)
if eleve:
update_resultat(request, eleve.CD_CLI, dateexa, resultat_exa, logged_in)
try:
# envoyer un message interne au moniteur référent s'il
if len(eleve.CD_MON) > 0:
urlopen('https://Suivi-eleve.marietton.com/MSG-fromAEM.php?codeE=%s&type=cepcOK&date=%s&dest=%s&resultat=%s'
% (eleve.CD_CLI, dateexa, eleve.CD_MON, resultat_lib),timeout=30 )
except:
pass
else:
request.session.flash("NEPH %s non trouvé dans la BD, impossible d'importer l'examen" % neph, 'danger')
return "not_found"
# downloading attachment
temp_file_path = download_pdf_to_tmp(email_message)
if temp_file_path != '':
# si attachement existe, verifier qu'il n'est pas déjà importé
next_code = get_next_cepc(request, eleve.CD_CLI, dateexa)
if len(next_code) > 0:
# fabriquer le nom du document
filename = '%s_%s_%s_%s.PDF' % (eleve.CD_CLI, 'DOC', next_code, dateexa)
# l'insérer dans la fiche élève
cepcFile2Dossier(request, eleve.CD_CLI, temp_file_path, filename, next_code, logged_in)
# déplacer le message dans la corbeille
conn.store(email_uid, '+FLAGS', '\\Deleted')
conn.expunge()
conn.close()
# deconnexion du serveur
conn.logout()
return "completed"
# ------- main -------
logged_in = request.authenticated_userid.upper()
url = request.route_url('results_import')
message = ''
# -- importer les emails
if 'form.import' in request.params:
nbTraite = 0
for i in range(100):
# lire le premier email des résultats pratique
message = get_emails('first')
if message:
nbTraite += 1
res = import_email(request, message[0]['email_uid'])
if res == "not_found":
# si élève non trouvé, arrêter l'import
break
else:
# si plus d'email, arrêter
break
# message de fin
request.session.flash("%s Résultat(s) mis à jour avec succès." % str(nbTraite), 'success')
# lister les messages reçus
emails = []
# emails = emails + get_emails('objectif_code', 'all')
emails = emails + get_emails('all')
# ister les résultats mis à jour
examens_maj = get_examens_maj(request, logged_in)
return {
'page_title': "Liste des emails de résultats d'examen",
'url': url,
'emails': emails,
'examens_maj': examens_maj,
}
def download_pdf_to_tmp(email_message):
# import pdb;pdb.set_trace()
temp_file_path = ''
# downloading attachments
for part in email_message.walk():
# this part comes from the snipped I don't understand yet...
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
continue
fileName = part.get_filename()
if bool(fileName):
# copier le fichier PDF dans le dossier /tmp
temp_file_path = os.path.join('/tmp/', fileName)
# if not os.path.isfile(temp_file_path) :
fp = open(temp_file_path, 'wb')
fp.write(part.get_payload(decode=True))
fp.close()
return temp_file_path
def cepcFile2Dossier(request, cd_cli, temp_file, filename, next_code, logged_in):
# créer le répertoire de eleve
path = '%s/%s' % (request.registry.settings['aem_gestion.justifs_dir'], cd_cli)
os.makedirs(path, exist_ok=True)
filepath = os.path.join('%s/%s' % (path, filename))
# Finally move the temporary file to folder
shutil.move(temp_file, filepath)
filesize = round(os.path.getsize(filepath) / 1024)
insert_eleve_cepc(request, cd_cli, next_code, filename, filesize, logged_in)
@view_config(route_name='result_del', renderer='../templates/examens/result_del.pt', permission='view')
def result_del(request):
email_uid = request.matchdict['email_uid']
url = request.route_url('result_del',email_uid=email_uid)
message = ""
if 'form.submitted' in request.params:
# connecter au serveur de mail
conn, data = mailbox_connect(request)
# déplacer le message dans la corbeille
conn.store(email_uid, '+FLAGS', '\\Deleted')
# deconnexion du serveur
conn.close()
conn.logout()
request.session.flash("Le message %s a été supprimée avec succès." % email_uid, 'success')
return HTTPFound(location=request.route_url('results_import'))
return {
'page_title': "Supprimer le message %s" % email_uid,
'url': url,
'message': message,
}
def mailbox_connect(request):
# Cette fonction effectue :
# 1. la connexion au serveur de mail
# 2. la lecture des messages de INBOX
# puis retourne 2 résultats :
# - conn : la connexion ou None si erreur rencontré
# - data : la liste UID des messages
# connecter au serveur IMAP
mbx_name = 'resultats-examens@marietton.com'
mbx_pwd = '9KmKPn36a'
conn = imaplib.IMAP4_SSL('SSL0.OVH.NET', 993)
#mbx_name = 'phuoc@caotek.fr'
#mbx_pwd = 'pcao.8211'
#conn = imaplib.IMAP4_SSL('imap.gmail.com')
try:
# se connecter à la mailbox
conn.login(mbx_name, mbx_pwd)
except imaplib.IMAP4.error:
request.session.flash("ERREUR d'authentification à %s" % mbx_name, 'danger')
return None, None
# select INBOX
conn.select('INBOX')
criteria = 'SUBJECT "des examens du permis de conduire" UNDELETED'
rv, data = conn.search(None, criteria)
if rv != 'OK':
request.session.flash("ERREUR de lecture de la boîte de réception", 'danger')
return None, None
return conn, data