remove form and model class

This commit is contained in:
2024-12-12 15:38:34 +01:00
parent 3fd525e0da
commit 3b41520da9
18 changed files with 248 additions and 349 deletions

View File

@@ -4,7 +4,7 @@ from pyramid.authorization import ACLAuthorizationPolicy
from pyramid.session import SignedCookieSessionFactory from pyramid.session import SignedCookieSessionFactory
from pyramid_mailer import mailer_factory_from_settings from pyramid_mailer import mailer_factory_from_settings
from .services.user import groupfinder from .models.users import groupfinder
def main(global_config, **settings): def main(global_config, **settings):

View File

@@ -3,11 +3,6 @@ from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import configure_mappers from sqlalchemy.orm import configure_mappers
import zope.sqlalchemy import zope.sqlalchemy
# import or define all models here to ensure they are attached to the
# Base.metadata prior to any initialization routines
from .user import User
from .blog_record import BlogRecord
# run configure_mappers after defining all of the models to ensure # run configure_mappers after defining all of the models to ensure
# all relationships can be setup # all relationships can be setup
configure_mappers() configure_mappers()

View File

@@ -1,56 +0,0 @@
import datetime #<- will be used to set default dates on models
from cao_sunyata.models.meta import Base #<- we need to import our sqlalchemy metadata from which model classes will inherit
from sqlalchemy import (
Column,
Integer,
Unicode, #<- will provide Unicode field
UnicodeText, #<- will provide Unicode text field
DateTime, #<- time abstraction field
Index,
ForeignKey,
)
from sqlalchemy.orm import relationship
from webhelpers2.text import urlify #<- will generate slugs
from webhelpers2.date import distance_of_time_in_words #<- human friendly dates
import unidecode
class BlogRecord(Base):
__tablename__ = 'entries'
id = Column(Integer, primary_key=True)
title = Column(Unicode(255), unique=True, nullable=False)
body = Column(UnicodeText, default='')
created = Column(DateTime, default=datetime.datetime.now)
creator = Column(Unicode(50), default='')
edited = Column(DateTime, default=datetime.datetime.now)
editor = Column(Unicode(50), default='')
topic_id = Column(ForeignKey('topics.topic'), nullable=False)
topic = relationship('Topics', backref='topic_pages')
tag = Column(Unicode(25))
author = Column(Unicode(50), default='')
status = Column(Unicode(50), default='brouillon')
@property
def slug(self):
# remove ascents
title = unidecode.unidecode(self.title)
return urlify(title)
@property
def created_in_words(self):
return distance_of_time_in_words(self.created, datetime.datetime.now())
class Topics(Base):
__tablename__ = 'topics'
topic = Column(Unicode(25), primary_key=True)
topic_name = Column(Unicode(25), nullable=False)
topic_quote = Column(Unicode(255), default='')
class Tags(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
topic = Column(Unicode(25))
tag = Column(Unicode(25))
tag_name = Column(Unicode(25), nullable=False)
__table_args__ = (Index('topic_index', "topic", "tag"), )

View File

@@ -0,0 +1,10 @@
# -*- coding: utf8 -*-
from zope.sqlalchemy import mark_changed
import transaction
def execute_query(request, query, params):
"""Execute query and mark session as changed"""
request.dbsession.execute(query, params)
mark_changed(request.dbsession)
transaction.commit()

View File

@@ -0,0 +1,103 @@
# -*- coding: utf8 -*-
from .default import (
execute_query,
)
def get_entries_by_topic(request, topic, tag):
query = f"SELECT * FROM entries WHERE topic_id = {topic}"
if request.authenticated_userid == None:
# if user is anonym, display only published posts
query = query + " AND status='publié'"
else:
if request.authenticated_userid != 'admin':
# if user is not 'admin', hide admin posts
query = query + " AND tag =! '_admin'"
if tag != '':
query = query + " AND tag = {tag}"
query = query + " ORDER BY tag, title;"
results = request.dbsession.execute(query).fetchall()
return results
def get_entries_by_criteria(request, criteria):
search = "%{}%".format(criteria)
query = f"SELECT * FROM entries WHERE title = {search} or body = {search}"
if request.authenticated_userid == None:
# if user is anonym, display only published posts
query = query + " AND status='publié'"
else:
if request.authenticated_userid != 'admin':
# if user is not 'admin', hide admin posts
query = query + " AND tag =! '_admin'"
query = query + " ORDER BY title;"
results = request.dbsession.execute(query).fetchall()
return results
def get_entries_by_id(request, _id):
query = "SELECT * FROM entries WHERE id=:id;"
results = request.dbsession.execute(query, {'id':_id}).first()
return results
def get_last_created(request):
# gest the 10 last created posts
query = "SELECT strftime('%d/%m/%Y', created) AS create_date, title, author, status FROM entries WHERE topic_id <> '_admin'"
if request.authenticated_userid == None:
# if user is anonym, display only published posts
query = query + " AND status='publié'"
query = query + " ORDER BY created DESC LIMIT 10;"
results = request.dbsession.execute(query).fetchall()
return results
def get_last_edited(request):
# gest the last edited posts
query = "SELECT strftime('%d/%m/%Y', edited) AS edit_date, title, author, status FROM entries WHERE topic_id <> '_admin'"
if request.authenticated_userid == None:
# if user is anonym, display only published posts
query = query + " AND status='publié'"
query = query + " ORDER BY edited LIMIT 10;"
results = request.dbsession.execute(query).fetchall()
return results
def get_activities(request):
# gest the Activities section
query = "SELECT * FROM entries WHERE topic_id = '_admin' AND tag = 'activities' AND status = 'publié' ORDER BY created;"
results = request.dbsession.execute(query,).first()
return results
def get_tags_byTopic(request, topic):
# get tags
query = "SELECT * FROM tags WHERE topic=:topic ORDER BY tag_name;"
results = request.dbsession.execute(query, {'topic':topic}).all()
return results
def get_tags_byId(request, id):
query = "SELECT * FROM tags WHERE id=:id;"
results = request.dbsession.execute(query, {'topic':id}).first()
return results
def get_topic_byTopic(request, id):
# get the name of a given topic
query = "SELECT * FROM topics WHERE topic=:topic;"
results = request.dbsession.execute(query, {'topic':id}).first()
return results
def get_topics(request):
# get all topics
query = "SELECT * FROM topics ORDER BY topic_name;"
results = request.dbsession.execute(query, {'topic':id}).all()
return results
def entries_delete(request, id):
query = "DELETE FROM entries WHERE id = :id ;"
execute_query(request, query, {'id': id})
def tags_delete(request, id):
query = "DELETE FROM tags WHERE id = :id ;"
execute_query(request, query, {'id': id})
def topics_delete(request, id):
query = "DELETE FROM topics WHERE topic = :id ;"
execute_query(request, query, {'id': id})

View File

@@ -1,16 +0,0 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import MetaData
# Recommended naming convention used by Alembic, as various different database
# providers will autogenerate vastly different names making migrations more
# difficult. See: http://alembic.zzzcomputing.com/en/latest/naming.html
NAMING_CONVENTION = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
}
metadata = MetaData(naming_convention=NAMING_CONVENTION)
Base = declarative_base(metadata=metadata)

View File

@@ -1,34 +0,0 @@
import datetime #<- will be used to set default dates on models
from cao_sunyata.models.meta import Base #<- we need to import our sqlalchemy metadata from which model classes will inherit
from sqlalchemy import (
Column,
Integer,
Unicode, #<- will provide Unicode field
UnicodeText, #<- will provide Unicode text field
DateTime, #<- time abstraction field
)
from passlib.apps import custom_app_context as blogger_pwd_context
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(Unicode(255), unique=True, nullable=False)
password = Column(Unicode(255), nullable=False)
last_logged = Column(DateTime, default=datetime.datetime.utcnow)
def verify_password(self, password):
# is it cleartext?
if password == self.password:
self.set_password(password)
# verify password
result = blogger_pwd_context.verify(password, self.password)
if result:
# pwd OK, set last login date
self.last_logged = datetime.datetime.now()
return result
def set_password(self, password):
password_hash = blogger_pwd_context.encrypt(password)
self.password = password_hash

View File

@@ -0,0 +1,57 @@
# -*- coding: utf8 -*-
from .default import (
execute_query,
)
import datetime #<- will be used to set default dates on models
def get_users_all(request):
query = "SELECT id, name, strftime('%d/%m/%Y %H:%M:%S', last_logged) as last_login FROM users ORDER BY name;"
results = results = request.dbsession.execute(query).fetchall()
return results
def get_users_by_name(request, name ):
query = "SELECT * FROM users WHERE name=:name;"
results = request.dbsession.execute(query, {'name': name}).first()
return results
def delete_user(request, id):
query = "DELETE FROM users WHERE id = :id ;"
execute_query(request, query, {'id': id})
return
def groupfinder(userid, request):
if userid:
# user name is 'admin' ?
if userid == 'admin':
return ['group:administrators']
else:
return [] # it means that userid is logged in
else:
# it returns None if userid isn't logged in
return None
def update_user(request, name, new_values):
# formater les champs
s = ''
for param in new_values.keys():
if s:
s += ",%s=:%s" % (param, param)
else:
s = "%s=:%s" % (param, param)
import pdb;pdb.set_trace()
if name == '0':
query = "INSERT INTO users (name, password) VALUES ('{0}', '{1}')".format(new_values['name'], new_values['password'])
else:
new_values['name'] = name
query = "UPDATE users SET %s WHERE name = :name;" % s
execute_query(request, query, new_values)
def update_last_connection(request, id):
"""Update last connection for login """
last_logged = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
query = "UPDATE users SET last_logged = '" + last_logged + "' WHERE id=:id;"
execute_query(request, query, {'id': id})

View File

@@ -1,110 +0,0 @@
import sqlalchemy as sa
import datetime #<- will be used to set default dates on models
from sqlalchemy import or_, and_
from ..models.blog_record import BlogRecord, Topics, Tags
class BlogRecordService(object):
@classmethod
def by_topic(cls, request, topic, tag):
# get posts by topic
query = request.dbsession.query(BlogRecord).filter(BlogRecord.topic_id == topic)
if request.authenticated_userid == None:
# if user is anonym, display only published posts
query = query.filter(BlogRecord.status == 'publié')
if request.authenticated_userid != 'admin':
# if user is not 'admin', hide admin posts
query = query.filter(BlogRecord.tag != 'admin')
if tag != '':
query = query.filter(BlogRecord.tag == tag)
return query.order_by(BlogRecord.tag, BlogRecord.title).all()
@classmethod
def by_criteria(cls, request, criteria):
search = "%{}%".format(criteria)
query = request.dbsession.query(BlogRecord).filter(or_(BlogRecord.title.like(search),
BlogRecord.body.like(search)))
# import pdb;pdb.set_trace()
if request.authenticated_userid == None:
# if user is anonym, display only published posts
query = query.filter(BlogRecord.status == 'publié')
if request.authenticated_userid != 'admin':
# if user is not 'admin', hide admin posts
query = query.filter(BlogRecord.topic_id != '_admin')
return query.order_by(BlogRecord.title).all()
@classmethod
def by_id(cls, request, _id):
query = request.dbsession.query(BlogRecord).filter(BlogRecord.id == _id).first()
return query
@classmethod
def get_last_created(cls, request):
# gest the 10 last created posts
query = request.dbsession.query(BlogRecord).filter(BlogRecord.topic_id != '_admin')
if request.authenticated_userid == None:
# if user is anonym, display only published posts
query = query.filter(BlogRecord.status == 'publié')
query = query.order_by(sa.desc(BlogRecord.created)).limit(10).all()
return query
@classmethod
def get_last_edited(cls, request):
# gest the last edited posts
query = request.dbsession.query(BlogRecord).filter(BlogRecord.topic_id != '_admin')
if request.authenticated_userid == None:
# if user is anonym, display only published posts
query = query.filter(BlogRecord.status == 'publié')
query = query.order_by(sa.desc(BlogRecord.edited)).limit(10).all()
return query
@classmethod
def get_activities(cls, request):
# gest the Activities section
query = request.dbsession.query(BlogRecord).filter(and_(BlogRecord.topic_id == '_admin',
BlogRecord.tag == 'activities',
BlogRecord.status == 'publié'))
query = query.order_by(sa.desc(BlogRecord.created)).first()
return query
@classmethod
def get_tags_byTopic(cls, request, topic):
# gest the last 5 items modified
query = request.dbsession.query(Tags).filter(Tags.topic == topic)
query = query.order_by(Tags.tag_name).all()
return query
@classmethod
def get_tags_byId(cls, request, id):
# gest the last 5 items modified
query = request.dbsession.query(Tags).filter(Tags.id == id).first()
return query
@classmethod
def get_topic_byTopic(cls, request, id):
# get the name of a given topic
query = request.dbsession.query(Topics).filter(Topics.topic == id).first()
return query
@classmethod
def get_topics(cls, request):
# get all topics
query = request.dbsession.query(Topics).order_by(Topics.topic_name).all()
return query
@classmethod
def delete(cls, request, id):
request.dbsession.query(BlogRecord).filter(BlogRecord.id == id).delete(synchronize_session=False)
return
@classmethod
def tag_delete(cls, request, id):
request.dbsession.query(Tags).filter(Tags.id == id).delete(synchronize_session=False)
return
@classmethod
def topic_delete(cls, request, id):
request.dbsession.query(Topics).filter(Topics.topic == id).delete(synchronize_session=False)
return

View File

@@ -1,32 +0,0 @@
import sqlalchemy as sa
from ..models.user import User
class UserService(object):
@classmethod
def all(cls, request):
items = request.dbsession.query(User).order_by(sa.asc(User.name)).all()
return items
@classmethod
def by_name(cls, request, name ):
item = request.dbsession.query(User).filter(User.name == name).first()
return item
@classmethod
def delete(cls, request, id):
request.dbsession.query(User).filter(User.id == id).delete(synchronize_session=False)
return
def groupfinder(userid, request):
if userid:
# user name is 'admin' ?
if userid == 'admin':
return ['group:administrators']
else:
return [] # it means that userid is logged in
else:
# it returns None if userid isn't logged in
return None

View File

@@ -76,7 +76,7 @@
<table id="users_list" class="table table-condensed"> <table id="users_list" class="table table-condensed">
{% for entry in last_ten %} {% for entry in last_ten %}
<tr> <tr>
<td>{{ entry.created.strftime("%d.%m.%Y") }}</td> <td>{{ entry.create_date }}</td>
<td> <td>
<a href="{{ request.route_url('blog', id=entry.id, slug=entry.slug) }}">{{ entry.title }}</a> <a href="{{ request.route_url('blog', id=entry.id, slug=entry.slug) }}">{{ entry.title }}</a>
</td> </td>

View File

@@ -40,7 +40,7 @@
<table id="users_list" class="table table-condensed"> <table id="users_list" class="table table-condensed">
{% for entry in items %} {% for entry in items %}
<tr> <tr>
<td>{{ entry.edited.strftime("%d-%m-%Y") }}</td> <td>{{ entry.edit_date }}</td>
<td>{{ entry.editor }}</td> <td>{{ entry.editor }}</td>
<td> <td>
<a href="{{ request.route_url('blog', id=entry.id, slug=entry.slug) }}">{{ entry.title }}</a> <a href="{{ request.route_url('blog', id=entry.id, slug=entry.slug) }}">{{ entry.title }}</a>

View File

@@ -8,33 +8,19 @@
</div> </div>
{% endif %} {% endif %}
<form action="{{ url }}" method="post" class="form"> <form action="{{ url }}" method="post" role="form">
{% for error in form.name.errors %}
<div class="label label-warning">{{ error }}</div>
{% endfor %}
<div class="form-group"> <div class="form-group">
<label class="required-field" for="name">{{form.name.label}}</label> <label class="required-field" for="name">Nom</label>
{% if form.id.data %} {% if name != '0' %}
<input class="form-control" name="name" readonly type="text" value="{{form.name.data}}"> <input class="form-control" name="name" type="text" readonly value="{{user.name}}">
{% else %} {% else %}
{{form.name(class_='form-control')}} <input class="form-control" name="name" type="text" value="{{user.name}}">
{% endif %} {% endif %}
</div> </div>
<div class="form-group"> <div class="form-group">
<label class="required-field" for="password">{{form.password.label}}</label> <label class="required-field" for="password">Mot de passe</label>
{{form.password(class_='form-control')}} <input class="form-control" name="password" type="password">
</div>
{% for error in form.confirm.errors %}
<div class="label label-danger">{{error}}</div>
{% endfor %}
<div class="form-group">
<label class="required-field" for="confirm">{{form.confirm.label}}</label>
{{form.confirm(class_='form-control')}}
</div> </div>
<br> <br>
@@ -42,7 +28,7 @@
<a class="btn btn-default" href="{{ url_retour }}"><span class="glyphicon glyphicon-chevron-left"></span> Retour</a> <a class="btn btn-default" href="{{ url_retour }}"><span class="glyphicon glyphicon-chevron-left"></span> Retour</a>
<button class="btn btn-primary" type="submit" name="form.submitted"> <button class="btn btn-primary" type="submit" name="form.submitted">
<span class="glyphicon glyphicon-ok"></span> Enregistrer</button> <span class="glyphicon glyphicon-ok"></span> Enregistrer</button>
{% if form.id.data and request.authenticated_userid == 'admin' %} {% if request.authenticated_userid == 'admin' %}
<button class="btn btn-warning" type="submit" name="form.deleted"> <button class="btn btn-warning" type="submit" name="form.deleted">
<span class="glyphicon glyphicon-remove"></span> Supprimer</button> <span class="glyphicon glyphicon-remove"></span> Supprimer</button>
{% endif %} {% endif %}

View File

@@ -24,7 +24,7 @@
{{ entry.name }} {{ entry.name }}
</a> </a>
</td> </td>
<td>{{ entry.last_logged.strftime("%d-%m-%Y - %H:%M") }}</td> <td>{{ entry.last_login }}</td>
</tr> </tr>
{% endfor %} {% endfor %}
</table> </table>

View File

@@ -3,8 +3,7 @@ from pyramid.httpexceptions import HTTPNotFound, HTTPFound
import markdown import markdown
import datetime #<- will be used to set default dates on models import datetime #<- will be used to set default dates on models
from ..models.blog_record import BlogRecord from ..models.entries import *
from ..services.blog_record import BlogRecordService
from ..forms import BlogCreateForm, BlogUpdateForm, BlogSearchForm from ..forms import BlogCreateForm, BlogUpdateForm, BlogSearchForm
@@ -12,7 +11,7 @@ from ..forms import BlogCreateForm, BlogUpdateForm, BlogSearchForm
def blog(request): def blog(request):
# get post id from request # get post id from request
blog_id = request.matchdict['id'] blog_id = request.matchdict['id']
entry = BlogRecordService.by_id(request, blog_id) entry = get_entries_by_id(request, blog_id)
if not entry: if not entry:
request.session.flash(u"Page non trouvée : %s" % blog_id, 'warning') request.session.flash(u"Page non trouvée : %s" % blog_id, 'warning')
return HTTPFound(location=request.route_url('home')) return HTTPFound(location=request.route_url('home'))
@@ -36,7 +35,7 @@ def blog_copy(request):
blog_id = request.matchdict['id'] blog_id = request.matchdict['id']
# get the post # get the post
entry = BlogRecordService.by_id(request, blog_id) entry = get_entries_by_id(request, blog_id)
if not entry: if not entry:
request.session.flash("Page non trouvée : %s" % blog_id, 'warning') request.session.flash("Page non trouvée : %s" % blog_id, 'warning')
return HTTPFound(location=request.route_url('topic', topic=topic)) return HTTPFound(location=request.route_url('topic', topic=topic))
@@ -65,7 +64,7 @@ def blog_edit(request):
url = request.route_url('blog_edit',topic=topic, id=blog_id) url = request.route_url('blog_edit',topic=topic, id=blog_id)
# get the list of tags of this topic # get the list of tags of this topic
tags = BlogRecordService.get_tags_byTopic(request, topic) tags = get_tags_byTopic(request, topic)
if blog_id == '0': if blog_id == '0':
# create a new post # create a new post
@@ -78,7 +77,7 @@ def blog_edit(request):
else: else:
# modify post # modify post
entry = BlogRecordService.by_id(request, blog_id) entry = get_entries_by_id(request, blog_id)
if not entry: if not entry:
request.session.flash("Page non trouvée : %s" % blog_id, 'warning') request.session.flash("Page non trouvée : %s" % blog_id, 'warning')
return HTTPFound(location=request.route_url('topic', topic=topic)) return HTTPFound(location=request.route_url('topic', topic=topic))
@@ -139,7 +138,7 @@ def blog_search(request):
if tags: if tags:
liste += '<ul>' liste += '<ul>'
for tag in tags: for tag in tags:
count = len(BlogRecordService.by_topic(request, tag.topic, tag.tag)) count = len(get_entries_by_topic(request, tag.topic, tag.tag))
liste += '<button type="button" class="btn">{0}&nbsp;&nbsp;&nbsp;<span class="badge">{1}</span></button>&nbsp;'.format( liste += '<button type="button" class="btn">{0}&nbsp;&nbsp;&nbsp;<span class="badge">{1}</span></button>&nbsp;'.format(
tag.tag_name, count) tag.tag_name, count)
liste += '</ul>' liste += '</ul>'
@@ -149,7 +148,7 @@ def blog_search(request):
if 'form.submitted' in request.params and form.validate(): if 'form.submitted' in request.params and form.validate():
criteria = request.params['criteria'] criteria = request.params['criteria']
# si afficher tous les fiches ? # si afficher tous les fiches ?
items = BlogRecordService.by_criteria(request, criteria) items = get_entries_by_criteria(request, criteria)
return { return {
'page_title': "Rechercher", 'page_title': "Rechercher",
@@ -179,7 +178,7 @@ def topic(request):
for tag in tags: for tag in tags:
liste += '<h3>' + tag.tag_name + '</h3>' liste += '<h3>' + tag.tag_name + '</h3>'
# lire toutes les docs du topic # lire toutes les docs du topic
items = BlogRecordService.by_topic(request, topic, tag.tag) items = get_entries_by_topic(request, topic, tag.tag)
if items: if items:
liste += '<ul><table class="table table-condensed">' liste += '<ul><table class="table table-condensed">'
for item in items: for item in items:

View File

@@ -7,23 +7,25 @@ from pyramid.httpexceptions import HTTPFound
from pyramid.security import remember, forget from pyramid.security import remember, forget
from pyramid_mailer.message import Message, Attachment from pyramid_mailer.message import Message, Attachment
from ..services.user import UserService
from ..services.blog_record import BlogRecordService
from ..forms import UserCreateForm, TopicForm, TagForm from ..forms import UserCreateForm, TopicForm, TagForm
from ..models.user import User from ..models.users import *
from ..models.blog_record import Topics, Tags from ..models.entries import *
import os from datetime import datetime
from PIL import Image from PIL import Image
from urllib import request, parse
from passlib.apps import custom_app_context as blogger_pwd_context
import os
import shutil import shutil
import magic import magic
import json import json
from urllib import request, parse
import pkg_resources import pkg_resources
import sys import sys
import sqlite3 import sqlite3
import transaction
@view_config(route_name='home',
renderer='cao_sunyata:templates/home.jinja2') @view_config(route_name='home', renderer='cao_sunyata:templates/home.jinja2')
def home(request): def home(request):
# images list for the carousel # images list for the carousel
dir = request.static_url('cao_sunyata:static/carousel') dir = request.static_url('cao_sunyata:static/carousel')
@@ -40,16 +42,18 @@ def home(request):
dir + '/S25.jpg'] dir + '/S25.jpg']
# get the Activities section # get the Activities section
activ = BlogRecordService.get_activities(request) activ = get_activities(request)
# insèrer le path de static/img # insèrer le path de static/img
activities = activ.body.replace('static/', "%s/static/" % request.application_url) activities = activ.body.replace('static/', "%s/static/" % request.application_url)
# get the last created posts # get the last created posts
last_ten = BlogRecordService.get_last_created(request) last_ten = get_last_created(request)
name = '' name = ''
email = '' email = ''
comments = '' comments = ''
# import pdb;pdb.set_trace()
if 'form.submitted' in request.params : if 'form.submitted' in request.params :
name = request.params['name'] name = request.params['name']
email = request.params['email'] email = request.params['email']
comments = request.params['comments'] comments = request.params['comments']
@@ -117,7 +121,7 @@ def settings(request):
# lire toutes les docs du topic # lire toutes les docs du topic
topic = '_admin' topic = '_admin'
items = BlogRecordService.get_last_edited(request) items = get_last_edited(request)
# informations sur les versions # informations sur les versions
pyramid_version = pkg_resources.get_distribution("pyramid").version pyramid_version = pkg_resources.get_distribution("pyramid").version
@@ -154,14 +158,20 @@ def login(request):
if 'form.submitted' in request.params: if 'form.submitted' in request.params:
username = request.POST.get('username') username = request.POST.get('username')
userpwd = request.POST.get('password') userpwd = request.POST.get('password')
user = UserService.by_name(request, username) user = get_users_by_name(request, username)
if user and user.verify_password(userpwd): # Is user existed ?
headers = remember(request, username) if user :
request.session.flash("Bienvenue %s !" % username, 'success') if blogger_pwd_context.verify(userpwd, user.password):
return HTTPFound(location=came_from, headers=headers) # pwd OK, set last login date
else: update_last_connection(request, user.id)
headers = forget(request) # force le commit car il ne se fait pas automatiquement après l'update
request.session.flash("Login et mot de passe invalides. La connexion a échoué.", "danger") transaction.commit()
headers = remember(request, username)
request.session.flash("Bienvenue %s !" % username, 'success')
return HTTPFound(location=came_from, headers=headers)
# pwd NOK, error message
headers = forget(request)
request.session.flash("Login et mot de passe invalides. La connexion a échoué.", "danger")
return { return {
'page_title': "", 'page_title': "",
@@ -183,7 +193,7 @@ def logout(request):
@view_config(route_name='users', renderer='cao_sunyata:templates/users.jinja2', permission='manage') @view_config(route_name='users', renderer='cao_sunyata:templates/users.jinja2', permission='manage')
def users(request): def users(request):
# get all users # get all users
users = UserService.all(request) users = get_users_all(request)
return { return {
'page_title': "Liste des utilisateurs", 'page_title': "Liste des utilisateurs",
'users': users 'users': users
@@ -202,66 +212,53 @@ def user_edit(request):
if name == '0': if name == '0':
# nouvel utilisateur # nouvel utilisateur
user = User() user = {}
form = UserCreateForm(request.POST, user) user['id'] = 0
user['name'] = ''
user['password'] = ''
user['last_logged'] = None
page_title = "Nouvel utilisateur" page_title = "Nouvel utilisateur"
else: else:
# lire la fiche du user # lire la fiche du user
user = UserService.by_name(request, name) user = get_users_by_name(request, name)
if not user: if not user:
request.session.flash("Utilisateur non trouvé : %s" % name, 'danger') request.session.flash("Utilisateur non trouvé : %s" % name, 'danger')
return HTTPFound(location=url_retour) return HTTPFound(location=url_retour)
form = UserCreateForm(request.POST, user)
page_title = "Modification utilisateur" page_title = "Modification utilisateur"
if 'form.submitted' in request.params:
new_values = {}
for param in user.keys():
if param in request.params and request.params[param] != user[param]:
new_values[param] = request.params[param]
if new_values:
update_user(request, name, new_values)
request.session.flash(u"La fiche a été mise à jour avec succès.", 'success')
return HTTPFound(location=url_retour)
if 'form.submitted' in request.params and form.validate():
# controle que le password a moins 6 car
if len(form.password.data) < 6 :
message = "Le mot de passe doit avoir au moins 6 caractères"
else:
if name == '0':
# création user
# controler que le nouvel user n'existe pas dans la BD
new_user = UserService.by_name(request, form.name.data)
if new_user:
message = "Utilisateur déjà créé : %s" % form.name.data
else:
form.populate_obj(user)
user.set_password(form.password.data.encode('utf8'))
# créer le nouveau
request.dbsession.add(user)
request.session.flash("La fiche a été créée avec succès.", 'success')
return HTTPFound(location=url_retour)
else:
# modification user
del form.name # SECURITY: prevent overwriting of primary key
form.populate_obj(user)
user.set_password(form.password.data.encode('utf8'))
request.session.flash("La fiche a été modifiée avec succès.", 'success')
return HTTPFound(location=url_retour)
if 'form.deleted' in request.params: if 'form.deleted' in request.params:
UserService.delete(request, user.id) import pdb;pdb.set_trace()
delete_user(request, user.id)
request.session.flash("La fiche a été supprimée avec succès.", 'success') request.session.flash("La fiche a été supprimée avec succès.", 'success')
return HTTPFound(location=url_retour) return HTTPFound(location=url_retour)
return { return {
'page_title': page_title, 'page_title': page_title,
'message': message, 'message': message,
'form': form,
'url': url, 'url': url,
'url_retour': url_retour, 'url_retour': url_retour,
'name': name, 'name': name,
'user': user,
} }
@view_config(route_name='topics', renderer='cao_sunyata:templates/topics.jinja2', permission='view') @view_config(route_name='topics', renderer='cao_sunyata:templates/topics.jinja2', permission='view')
def topics(request): def topics(request):
# get all topics # get all topics
topics = BlogRecordService.get_topics(request) topics = get_topics(request)
return { return {
'page_title': "Liste des Topics", 'page_title': "Liste des Topics",
'topics': topics 'topics': topics
@@ -275,7 +272,7 @@ def topic_edit(request):
url = request.route_url('topic_edit',topic=topic) url = request.route_url('topic_edit',topic=topic)
# get the list of tags of this topic # get the list of tags of this topic
tags = BlogRecordService.get_tags_byTopic(request, topic) tags = get_tags_byTopic(request, topic)
if topic == '0': if topic == '0':
# create a new topic # create a new topic
@@ -285,7 +282,7 @@ def topic_edit(request):
else: else:
# modify post # modify post
entry = BlogRecordService.get_topic_byTopic(request, topic) entry = get_topic_byTopic(request, topic)
if not entry: if not entry:
request.session.flash(u"Topic non trouvé : %s" % topic, 'warning') request.session.flash(u"Topic non trouvé : %s" % topic, 'warning')
return HTTPFound(location=request.route_url('topics')) return HTTPFound(location=request.route_url('topics'))
@@ -304,7 +301,7 @@ def topic_edit(request):
return HTTPFound(location=request.route_url('topics')) return HTTPFound(location=request.route_url('topics'))
if 'form.deleted' in request.params: if 'form.deleted' in request.params:
BlogRecordService.topic_delete(request, entry.topic) topic_delete(request, entry.topic)
request.session.flash("La fiche a été supprimée avec succès.", 'success') request.session.flash("La fiche a été supprimée avec succès.", 'success')
return HTTPFound(location=request.route_url('topics')) return HTTPFound(location=request.route_url('topics'))
@@ -331,7 +328,7 @@ def tag_edit(request):
else: else:
# modify post # modify post
entry = BlogRecordService.get_tags_byId(request, tag_id) entry = get_tags_byId(request, tag_id)
if not entry: if not entry:
request.session.flash(u"Tag non trouvé : %s" % tag_id, 'warning') request.session.flash(u"Tag non trouvé : %s" % tag_id, 'warning')
return HTTPFound(location=request.route_url('topic_edit', topic=topic)) return HTTPFound(location=request.route_url('topic_edit', topic=topic))
@@ -350,7 +347,7 @@ def tag_edit(request):
return HTTPFound(location=request.route_url('topic_edit', topic=topic)) return HTTPFound(location=request.route_url('topic_edit', topic=topic))
if 'form.deleted' in request.params: if 'form.deleted' in request.params:
BlogRecordService.tag_delete(request, entry.id) tag_delete(request, entry.id)
request.session.flash("La fiche a été supprimée avec succès.", 'success') request.session.flash("La fiche a été supprimée avec succès.", 'success')
return HTTPFound(location=request.route_url('topic_edit', topic=topic)) return HTTPFound(location=request.route_url('topic_edit', topic=topic))

View File

@@ -18,9 +18,9 @@ requires = [
'pyramid_mailer', 'pyramid_mailer',
'pyramid_retry', 'pyramid_retry',
'pyramid_tm', 'pyramid_tm',
'SQLAlchemy==1.4.49', 'SQLAlchemy==1.4.54',
'transaction', 'transaction',
'zope.sqlalchemy', 'zope.sqlalchemy==2.0',
'wtforms', # form library 'wtforms', # form library
'webhelpers2', # various web building related helpers 'webhelpers2', # various web building related helpers
'passlib', 'passlib',