Imported from SVN by Bitbucket

This commit is contained in:
2015-03-31 20:26:20 +00:00
committed by bitbucket
commit ceb7984dec
212 changed files with 49537 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Package for debugging and development tools
"""

View File

@@ -0,0 +1,79 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
# (c) 2005 Clark C. Evans
# This module is part of the Python Paste Project and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
# This code was written with funding by http://prometheusresearch.com
"""
Various Applications for Debugging/Testing Purposes
"""
import time
__all__ = ['SimpleApplication', 'SlowConsumer']
class SimpleApplication(object):
"""
Produces a simple web page
"""
def __call__(self, environ, start_response):
body = "<html><body>simple</body></html>"
start_response("200 OK", [('Content-Type', 'text/html'),
('Content-Length', str(len(body)))])
return [body]
class SlowConsumer(object):
"""
Consumes an upload slowly...
NOTE: This should use the iterator form of ``wsgi.input``,
but it isn't implemented in paste.httpserver.
"""
def __init__(self, chunk_size = 4096, delay = 1, progress = True):
self.chunk_size = chunk_size
self.delay = delay
self.progress = True
def __call__(self, environ, start_response):
size = 0
total = environ.get('CONTENT_LENGTH')
if total:
remaining = int(total)
while remaining > 0:
if self.progress:
print "%s of %s remaining" % (remaining, total)
if remaining > 4096:
chunk = environ['wsgi.input'].read(4096)
else:
chunk = environ['wsgi.input'].read(remaining)
if not chunk:
break
size += len(chunk)
remaining -= len(chunk)
if self.delay:
time.sleep(self.delay)
body = "<html><body>%d bytes</body></html>" % size
else:
body = ('<html><body>\n'
'<form method="post" enctype="multipart/form-data">\n'
'<input type="file" name="file">\n'
'<input type="submit" >\n'
'</form></body></html>\n')
print "bingles"
start_response("200 OK", [('Content-Type', 'text/html'),
('Content-Length', len(body))])
return [body]
def make_test_app(global_conf):
return SimpleApplication()
make_test_app.__doc__ = SimpleApplication.__doc__
def make_slow_app(global_conf, chunk_size=4096, delay=1, progress=True):
from paste.deploy.converters import asbool
return SlowConsumer(
chunk_size=int(chunk_size),
delay=int(delay),
progress=asbool(progress))
make_slow_app.__doc__ = SlowConsumer.__doc__

View File

@@ -0,0 +1,435 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
#!/usr/bin/env python2.4
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
These are functions for use when doctest-testing a document.
"""
try:
import subprocess
except ImportError:
from paste.util import subprocess24 as subprocess
import doctest
import os
import sys
import shutil
import re
import cgi
import rfc822
from cStringIO import StringIO
from paste.util import PySourceColor
here = os.path.abspath(__file__)
paste_parent = os.path.dirname(
os.path.dirname(os.path.dirname(here)))
def run(command):
data = run_raw(command)
if data:
print data
def run_raw(command):
"""
Runs the string command, returns any output.
"""
proc = subprocess.Popen(command, shell=True,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE, env=_make_env())
data = proc.stdout.read()
proc.wait()
while data.endswith('\n') or data.endswith('\r'):
data = data[:-1]
if data:
data = '\n'.join(
[l for l in data.splitlines() if l])
return data
else:
return ''
def run_command(command, name, and_print=False):
output = run_raw(command)
data = '$ %s\n%s' % (command, output)
show_file('shell-command', name, description='shell transcript',
data=data)
if and_print and output:
print output
def _make_env():
env = os.environ.copy()
env['PATH'] = (env.get('PATH', '')
+ ':'
+ os.path.join(paste_parent, 'scripts')
+ ':'
+ os.path.join(paste_parent, 'paste', '3rd-party',
'sqlobject-files', 'scripts'))
env['PYTHONPATH'] = (env.get('PYTHONPATH', '')
+ ':'
+ paste_parent)
return env
def clear_dir(dir):
"""
Clears (deletes) the given directory
"""
shutil.rmtree(dir, True)
def ls(dir=None, recurse=False, indent=0):
"""
Show a directory listing
"""
dir = dir or os.getcwd()
fns = os.listdir(dir)
fns.sort()
for fn in fns:
full = os.path.join(dir, fn)
if os.path.isdir(full):
fn = fn + '/'
print ' '*indent + fn
if os.path.isdir(full) and recurse:
ls(dir=full, recurse=True, indent=indent+2)
default_app = None
default_url = None
def set_default_app(app, url):
global default_app
global default_url
default_app = app
default_url = url
def resource_filename(fn):
"""
Returns the filename of the resource -- generally in the directory
resources/DocumentName/fn
"""
return os.path.join(
os.path.dirname(sys.testing_document_filename),
'resources',
os.path.splitext(os.path.basename(sys.testing_document_filename))[0],
fn)
def show(path_info, example_name):
fn = resource_filename(example_name + '.html')
out = StringIO()
assert default_app is not None, (
"No default_app set")
url = default_url + path_info
out.write('<span class="doctest-url"><a href="%s">%s</a></span><br>\n'
% (url, url))
out.write('<div class="doctest-example">\n')
proc = subprocess.Popen(
['paster', 'serve' '--server=console', '--no-verbose',
'--url=' + path_info],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
env=_make_env())
stdout, errors = proc.communicate()
stdout = StringIO(stdout)
headers = rfc822.Message(stdout)
content = stdout.read()
for header, value in headers.items():
if header.lower() == 'status' and int(value.split()[0]) == 200:
continue
if header.lower() in ('content-type', 'content-length'):
continue
if (header.lower() == 'set-cookie'
and value.startswith('_SID_')):
continue
out.write('<span class="doctest-header">%s: %s</span><br>\n'
% (header, value))
lines = [l for l in content.splitlines() if l.strip()]
for line in lines:
out.write(line + '\n')
if errors:
out.write('<pre class="doctest-errors">%s</pre>'
% errors)
out.write('</div>\n')
result = out.getvalue()
if not os.path.exists(fn):
f = open(fn, 'wb')
f.write(result)
f.close()
else:
f = open(fn, 'rb')
expected = f.read()
f.close()
if not html_matches(expected, result):
print 'Pages did not match. Expected from %s:' % fn
print '-'*60
print expected
print '='*60
print 'Actual output:'
print '-'*60
print result
def html_matches(pattern, text):
regex = re.escape(pattern)
regex = regex.replace(r'\.\.\.', '.*')
regex = re.sub(r'0x[0-9a-f]+', '.*', regex)
regex = '^%s$' % regex
return re.search(regex, text)
def convert_docstring_string(data):
if data.startswith('\n'):
data = data[1:]
lines = data.splitlines()
new_lines = []
for line in lines:
if line.rstrip() == '.':
new_lines.append('')
else:
new_lines.append(line)
data = '\n'.join(new_lines) + '\n'
return data
def create_file(path, version, data):
data = convert_docstring_string(data)
write_data(path, data)
show_file(path, version)
def append_to_file(path, version, data):
data = convert_docstring_string(data)
f = open(path, 'a')
f.write(data)
f.close()
# I think these appends can happen so quickly (in less than a second)
# that the .pyc file doesn't appear to be expired, even though it
# is after we've made this change; so we have to get rid of the .pyc
# file:
if path.endswith('.py'):
pyc_file = path + 'c'
if os.path.exists(pyc_file):
os.unlink(pyc_file)
show_file(path, version, description='added to %s' % path,
data=data)
def show_file(path, version, description=None, data=None):
ext = os.path.splitext(path)[1]
if data is None:
f = open(path, 'rb')
data = f.read()
f.close()
if ext == '.py':
html = ('<div class="source-code">%s</div>'
% PySourceColor.str2html(data, PySourceColor.dark))
else:
html = '<pre class="source-code">%s</pre>' % cgi.escape(data, 1)
html = '<span class="source-filename">%s</span><br>%s' % (
description or path, html)
write_data(resource_filename('%s.%s.gen.html' % (path, version)),
html)
def call_source_highlight(input, format):
proc = subprocess.Popen(['source-highlight', '--out-format=html',
'--no-doc', '--css=none',
'--src-lang=%s' % format], shell=False,
stdout=subprocess.PIPE)
stdout, stderr = proc.communicate(input)
result = stdout
proc.wait()
return result
def write_data(path, data):
dir = os.path.dirname(os.path.abspath(path))
if not os.path.exists(dir):
os.makedirs(dir)
f = open(path, 'wb')
f.write(data)
f.close()
def change_file(path, changes):
f = open(os.path.abspath(path), 'rb')
lines = f.readlines()
f.close()
for change_type, line, text in changes:
if change_type == 'insert':
lines[line:line] = [text]
elif change_type == 'delete':
lines[line:text] = []
else:
assert 0, (
"Unknown change_type: %r" % change_type)
f = open(path, 'wb')
f.write(''.join(lines))
f.close()
class LongFormDocTestParser(doctest.DocTestParser):
"""
This parser recognizes some reST comments as commands, without
prompts or expected output, like:
.. run:
do_this(...
...)
"""
_EXAMPLE_RE = re.compile(r"""
# Source consists of a PS1 line followed by zero or more PS2 lines.
(?: (?P<source>
(?:^(?P<indent> [ ]*) >>> .*) # PS1 line
(?:\n [ ]* \.\.\. .*)*) # PS2 lines
\n?
# Want consists of any non-blank lines that do not start with PS1.
(?P<want> (?:(?![ ]*$) # Not a blank line
(?![ ]*>>>) # Not a line starting with PS1
.*$\n? # But any other line
)*))
|
(?: # This is for longer commands that are prefixed with a reST
# comment like '.. run:' (two colons makes that a directive).
# These commands cannot have any output.
(?:^\.\.[ ]*(?P<run>run):[ ]*\n) # Leading command/command
(?:[ ]*\n)? # Blank line following
(?P<runsource>
(?:(?P<runindent> [ ]+)[^ ].*$)
(?:\n [ ]+ .*)*)
)
|
(?: # This is for shell commands
(?P<shellsource>
(?:^(P<shellindent> [ ]*) [$] .*) # Shell line
(?:\n [ ]* [>] .*)*) # Continuation
\n?
# Want consists of any non-blank lines that do not start with $
(?P<shellwant> (?:(?![ ]*$)
(?![ ]*[$]$)
.*$\n?
)*))
""", re.MULTILINE | re.VERBOSE)
def _parse_example(self, m, name, lineno):
r"""
Given a regular expression match from `_EXAMPLE_RE` (`m`),
return a pair `(source, want)`, where `source` is the matched
example's source code (with prompts and indentation stripped);
and `want` is the example's expected output (with indentation
stripped).
`name` is the string's name, and `lineno` is the line number
where the example starts; both are used for error messages.
>>> def parseit(s):
... p = LongFormDocTestParser()
... return p._parse_example(p._EXAMPLE_RE.search(s), '<string>', 1)
>>> parseit('>>> 1\n1')
('1', {}, '1', None)
>>> parseit('>>> (1\n... +1)\n2')
('(1\n+1)', {}, '2', None)
>>> parseit('.. run:\n\n test1\n test2\n')
('test1\ntest2', {}, '', None)
"""
# Get the example's indentation level.
runner = m.group('run') or ''
indent = len(m.group('%sindent' % runner))
# Divide source into lines; check that they're properly
# indented; and then strip their indentation & prompts.
source_lines = m.group('%ssource' % runner).split('\n')
if runner:
self._check_prefix(source_lines[1:], ' '*indent, name, lineno)
else:
self._check_prompt_blank(source_lines, indent, name, lineno)
self._check_prefix(source_lines[2:], ' '*indent + '.', name, lineno)
if runner:
source = '\n'.join([sl[indent:] for sl in source_lines])
else:
source = '\n'.join([sl[indent+4:] for sl in source_lines])
if runner:
want = ''
exc_msg = None
else:
# Divide want into lines; check that it's properly indented; and
# then strip the indentation. Spaces before the last newline should
# be preserved, so plain rstrip() isn't good enough.
want = m.group('want')
want_lines = want.split('\n')
if len(want_lines) > 1 and re.match(r' *$', want_lines[-1]):
del want_lines[-1] # forget final newline & spaces after it
self._check_prefix(want_lines, ' '*indent, name,
lineno + len(source_lines))
want = '\n'.join([wl[indent:] for wl in want_lines])
# If `want` contains a traceback message, then extract it.
m = self._EXCEPTION_RE.match(want)
if m:
exc_msg = m.group('msg')
else:
exc_msg = None
# Extract options from the source.
options = self._find_options(source, name, lineno)
return source, options, want, exc_msg
def parse(self, string, name='<string>'):
"""
Divide the given string into examples and intervening text,
and return them as a list of alternating Examples and strings.
Line numbers for the Examples are 0-based. The optional
argument `name` is a name identifying this string, and is only
used for error messages.
"""
string = string.expandtabs()
# If all lines begin with the same indentation, then strip it.
min_indent = self._min_indent(string)
if min_indent > 0:
string = '\n'.join([l[min_indent:] for l in string.split('\n')])
output = []
charno, lineno = 0, 0
# Find all doctest examples in the string:
for m in self._EXAMPLE_RE.finditer(string):
# Add the pre-example text to `output`.
output.append(string[charno:m.start()])
# Update lineno (lines before this example)
lineno += string.count('\n', charno, m.start())
# Extract info from the regexp match.
(source, options, want, exc_msg) = \
self._parse_example(m, name, lineno)
# Create an Example, and add it to the list.
if not self._IS_BLANK_OR_COMMENT(source):
# @@: Erg, this is the only line I need to change...
output.append(doctest.Example(
source, want, exc_msg,
lineno=lineno,
indent=min_indent+len(m.group('indent') or m.group('runindent')),
options=options))
# Update lineno (lines inside this example)
lineno += string.count('\n', m.start(), m.end())
# Update charno.
charno = m.end()
# Add any remaining post-example text to `output`.
output.append(string[charno:])
return output
if __name__ == '__main__':
if sys.argv[1:] and sys.argv[1] == 'doctest':
doctest.testmod()
sys.exit()
if not paste_parent in sys.path:
sys.path.append(paste_parent)
for fn in sys.argv[1:]:
fn = os.path.abspath(fn)
# @@: OK, ick; but this module gets loaded twice
sys.testing_document_filename = fn
doctest.testfile(
fn, module_relative=False,
optionflags=doctest.ELLIPSIS|doctest.REPORT_ONLY_FIRST_FAILURE,
parser=LongFormDocTestParser())
new = os.path.splitext(fn)[0] + '.html'
assert new != fn
os.system('rst2html.py %s > %s' % (fn, new))

View File

@@ -0,0 +1,409 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Module to find differences over time in a filesystem
Basically this takes a snapshot of a directory, then sees what changes
were made. The contents of the files are not checked, so you can
detect that the content was changed, but not what the old version of
the file was.
"""
import os
from fnmatch import fnmatch
from datetime import datetime
from paste.util.UserDict24 import IterableUserDict
import operator
import re
__all__ = ['Diff', 'Snapshot', 'File', 'Dir', 'report_expected_diffs',
'show_diff']
class Diff(object):
"""
Represents the difference between two snapshots
"""
def __init__(self, before, after):
self.before = before
self.after = after
self._calculate()
def _calculate(self):
before = self.before.data
after = self.after.data
self.deleted = {}
self.updated = {}
self.created = after.copy()
for path, f in before.items():
if path not in after:
self.deleted[path] = f
continue
del self.created[path]
if f.mtime < after[path].mtime:
self.updated[path] = after[path]
def __str__(self):
return self.report()
def report(self, header=True, dates=False):
s = []
if header:
s.append('Difference in %s from %s to %s:' %
(self.before.base_path,
self.before.calculated,
self.after.calculated))
for name, files, show_size in [
('created', self.created, True),
('deleted', self.deleted, True),
('updated', self.updated, True)]:
if files:
s.append('-- %s: -------------------' % name)
files = files.items()
files.sort()
last = ''
for path, f in files:
t = ' %s' % _space_prefix(last, path, indent=4,
include_sep=False)
last = path
if show_size and f.size != 'N/A':
t += ' (%s bytes)' % f.size
if dates:
parts = []
if self.before.get(path):
parts.append(self.before[path].mtime)
if self.after.get(path):
parts.append(self.after[path].mtime)
t += ' (mtime: %s)' % ('->'.join(map(repr, parts)))
s.append(t)
if len(s) == 1:
s.append(' (no changes)')
return '\n'.join(s)
class Snapshot(IterableUserDict):
"""
Represents a snapshot of a set of files. Has a dictionary-like
interface, keyed relative to ``base_path``
"""
def __init__(self, base_path, files=None, ignore_wildcards=(),
ignore_paths=(), ignore_hidden=True):
self.base_path = base_path
self.ignore_wildcards = ignore_wildcards
self.ignore_hidden = ignore_hidden
self.ignore_paths = ignore_paths
self.calculated = None
self.data = files or {}
if files is None:
self.find_files()
############################################################
## File finding
############################################################
def find_files(self):
"""
Find all the files under the base path, and put them in
``self.data``
"""
self._find_traverse('', self.data)
self.calculated = datetime.now()
def _ignore_file(self, fn):
if fn in self.ignore_paths:
return True
if self.ignore_hidden and os.path.basename(fn).startswith('.'):
return True
for pat in self.ignore_wildcards:
if fnmatch(fn, pat):
return True
return False
def _ignore_file(self, fn):
if fn in self.ignore_paths:
return True
if self.ignore_hidden and os.path.basename(fn).startswith('.'):
return True
return False
def _find_traverse(self, path, result):
full = os.path.join(self.base_path, path)
if os.path.isdir(full):
if path:
# Don't actually include the base path
result[path] = Dir(self.base_path, path)
for fn in os.listdir(full):
fn = os.path.join(path, fn)
if self._ignore_file(fn):
continue
self._find_traverse(fn, result)
else:
result[path] = File(self.base_path, path)
def __repr__(self):
return '<%s in %r from %r>' % (
self.__class__.__name__, self.base_path,
self.calculated or '(no calculation done)')
def compare_expected(self, expected, comparison=operator.eq,
differ=None, not_found=None,
include_success=False):
"""
Compares a dictionary of ``path: content`` to the
found files. Comparison is done by equality, or the
``comparison(actual_content, expected_content)`` function given.
Returns dictionary of differences, keyed by path. Each
difference is either noted, or the output of
``differ(actual_content, expected_content)`` is given.
If a file does not exist and ``not_found`` is given, then
``not_found(path)`` is put in.
"""
result = {}
for path in expected:
orig_path = path
path = path.strip('/')
if path not in self.data:
if not_found:
msg = not_found(path)
else:
msg = 'not found'
result[path] = msg
continue
expected_content = expected[orig_path]
file = self.data[path]
actual_content = file.bytes
if not comparison(actual_content, expected_content):
if differ:
msg = differ(actual_content, expected_content)
else:
if len(actual_content) < len(expected_content):
msg = 'differ (%i bytes smaller)' % (
len(expected_content) - len(actual_content))
elif len(actual_content) > len(expected_content):
msg = 'differ (%i bytes larger)' % (
len(actual_content) - len(expected_content))
else:
msg = 'diff (same size)'
result[path] = msg
elif include_success:
result[path] = 'same!'
return result
def diff_to_now(self):
return Diff(self, self.clone())
def clone(self):
return self.__class__(base_path=self.base_path,
ignore_wildcards=self.ignore_wildcards,
ignore_paths=self.ignore_paths,
ignore_hidden=self.ignore_hidden)
class File(object):
"""
Represents a single file found as the result of a command.
Has attributes:
``path``:
The path of the file, relative to the ``base_path``
``full``:
The full path
``stat``:
The results of ``os.stat``. Also ``mtime`` and ``size``
contain the ``.st_mtime`` and ``st_size`` of the stat.
``bytes``:
The contents of the file.
You may use the ``in`` operator with these objects (tested against
the contents of the file), and the ``.mustcontain()`` method.
"""
file = True
dir = False
def __init__(self, base_path, path):
self.base_path = base_path
self.path = path
self.full = os.path.join(base_path, path)
self.stat = os.stat(self.full)
self.mtime = self.stat.st_mtime
self.size = self.stat.st_size
self._bytes = None
def bytes__get(self):
if self._bytes is None:
f = open(self.full, 'rb')
self._bytes = f.read()
f.close()
return self._bytes
bytes = property(bytes__get)
def __contains__(self, s):
return s in self.bytes
def mustcontain(self, s):
__tracebackhide__ = True
bytes = self.bytes
if s not in bytes:
print 'Could not find %r in:' % s
print bytes
assert s in bytes
def __repr__(self):
return '<%s %s:%s>' % (
self.__class__.__name__,
self.base_path, self.path)
class Dir(File):
"""
Represents a directory created by a command.
"""
file = False
dir = True
def __init__(self, base_path, path):
self.base_path = base_path
self.path = path
self.full = os.path.join(base_path, path)
self.size = 'N/A'
self.mtime = 'N/A'
def __repr__(self):
return '<%s %s:%s>' % (
self.__class__.__name__,
self.base_path, self.path)
def bytes__get(self):
raise NotImplementedError(
"Directory %r doesn't have content" % self)
bytes = property(bytes__get)
def _space_prefix(pref, full, sep=None, indent=None, include_sep=True):
"""
Anything shared by pref and full will be replaced with spaces
in full, and full returned.
Example::
>>> _space_prefix('/foo/bar', '/foo')
' /bar'
"""
if sep is None:
sep = os.path.sep
pref = pref.split(sep)
full = full.split(sep)
padding = []
while pref and full and pref[0] == full[0]:
if indent is None:
padding.append(' ' * (len(full[0]) + len(sep)))
else:
padding.append(' ' * indent)
full.pop(0)
pref.pop(0)
if padding:
if include_sep:
return ''.join(padding) + sep + sep.join(full)
else:
return ''.join(padding) + sep.join(full)
else:
return sep.join(full)
def report_expected_diffs(diffs, colorize=False):
"""
Takes the output of compare_expected, and returns a string
description of the differences.
"""
if not diffs:
return 'No differences'
diffs = diffs.items()
diffs.sort()
s = []
last = ''
for path, desc in diffs:
t = _space_prefix(last, path, indent=4, include_sep=False)
if colorize:
t = color_line(t, 11)
last = path
if len(desc.splitlines()) > 1:
cur_indent = len(re.search(r'^[ ]*', t).group(0))
desc = indent(cur_indent+2, desc)
if colorize:
t += '\n'
for line in desc.splitlines():
if line.strip().startswith('+'):
line = color_line(line, 10)
elif line.strip().startswith('-'):
line = color_line(line, 9)
else:
line = color_line(line, 14)
t += line+'\n'
else:
t += '\n' + desc
else:
t += ' '+desc
s.append(t)
s.append('Files with differences: %s' % len(diffs))
return '\n'.join(s)
def color_code(foreground=None, background=None):
"""
0 black
1 red
2 green
3 yellow
4 blue
5 magenta (purple)
6 cyan
7 white (gray)
Add 8 to get high-intensity
"""
if foreground is None and background is None:
# Reset
return '\x1b[0m'
codes = []
if foreground is None:
codes.append('[39m')
elif foreground > 7:
codes.append('[1m')
codes.append('[%im' % (22+foreground))
else:
codes.append('[%im' % (30+foreground))
if background is None:
codes.append('[49m')
else:
codes.append('[%im' % (40+background))
return '\x1b' + '\x1b'.join(codes)
def color_line(line, foreground=None, background=None):
match = re.search(r'^(\s*)', line)
return (match.group(1) + color_code(foreground, background)
+ line[match.end():] + color_code())
def indent(indent, text):
return '\n'.join(
[' '*indent + l for l in text.splitlines()])
def show_diff(actual_content, expected_content):
actual_lines = [l.strip() for l in actual_content.splitlines()
if l.strip()]
expected_lines = [l.strip() for l in expected_content.splitlines()
if l.strip()]
if len(actual_lines) == len(expected_lines) == 1:
return '%r not %r' % (actual_lines[0], expected_lines[0])
if not actual_lines:
return 'Empty; should have:\n'+expected_content
import difflib
return '\n'.join(difflib.ndiff(actual_lines, expected_lines))

View File

@@ -0,0 +1,148 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Middleware that displays everything that is printed inline in
application pages.
Anything printed during the request will get captured and included on
the page. It will usually be included as a floating element in the
top right hand corner of the page. If you want to override this
you can include a tag in your template where it will be placed::
<pre id="paste-debug-prints"></pre>
You might want to include ``style="white-space: normal"``, as all the
whitespace will be quoted, and this allows the text to wrap if
necessary.
"""
from cStringIO import StringIO
import re
import cgi
from paste.util import threadedprint
from paste import wsgilib
from paste import response
import sys
_threadedprint_installed = False
__all__ = ['PrintDebugMiddleware']
class TeeFile(object):
def __init__(self, files):
self.files = files
def write(self, v):
if isinstance(v, unicode):
# WSGI is picky in this case
v = str(v)
for file in self.files:
file.write(v)
class PrintDebugMiddleware(object):
"""
This middleware captures all the printed statements, and inlines
them in HTML pages, so that you can see all the (debug-intended)
print statements in the page itself.
There are two keys added to the environment to control this:
``environ['paste.printdebug_listeners']`` is a list of functions
that will be called everytime something is printed.
``environ['paste.remove_printdebug']`` is a function that, if
called, will disable printing of output for that request.
If you have ``replace_stdout=True`` then stdout is replaced, not
captured.
"""
log_template = (
'<pre style="width: 40%%; border: 2px solid #000; white-space: normal; '
'background-color: #ffd; color: #000; float: right;">'
'<b style="border-bottom: 1px solid #000">Log messages</b><br>'
'%s</pre>')
def __init__(self, app, global_conf=None, force_content_type=False,
print_wsgi_errors=True, replace_stdout=False):
# @@: global_conf should be handled separately and only for
# the entry point
self.app = app
self.force_content_type = force_content_type
if isinstance(print_wsgi_errors, basestring):
from paste.deploy.converters import asbool
print_wsgi_errors = asbool(print_wsgi_errors)
self.print_wsgi_errors = print_wsgi_errors
self.replace_stdout = replace_stdout
self._threaded_print_stdout = None
def __call__(self, environ, start_response):
global _threadedprint_installed
if environ.get('paste.testing'):
# In a testing environment this interception isn't
# useful:
return self.app(environ, start_response)
if (not _threadedprint_installed
or self._threaded_print_stdout is not sys.stdout):
# @@: Not strictly threadsafe
_threadedprint_installed = True
threadedprint.install(leave_stdout=not self.replace_stdout)
self._threaded_print_stdout = sys.stdout
removed = []
def remove_printdebug():
removed.append(None)
environ['paste.remove_printdebug'] = remove_printdebug
logged = StringIO()
listeners = [logged]
environ['paste.printdebug_listeners'] = listeners
if self.print_wsgi_errors:
listeners.append(environ['wsgi.errors'])
replacement_stdout = TeeFile(listeners)
threadedprint.register(replacement_stdout)
try:
status, headers, body = wsgilib.intercept_output(
environ, self.app)
if status is None:
# Some error occurred
status = '500 Server Error'
headers = [('Content-type', 'text/html')]
start_response(status, headers)
if not body:
body = 'An error occurred'
content_type = response.header_value(headers, 'content-type')
if (removed or
(not self.force_content_type and
(not content_type
or not content_type.startswith('text/html')))):
if replacement_stdout == logged:
# Then the prints will be lost, unless...
environ['wsgi.errors'].write(logged.getvalue())
start_response(status, headers)
return [body]
response.remove_header(headers, 'content-length')
body = self.add_log(body, logged.getvalue())
start_response(status, headers)
return [body]
finally:
threadedprint.deregister()
_body_re = re.compile(r'<body[^>]*>', re.I)
_explicit_re = re.compile(r'<pre\s*[^>]*id="paste-debug-prints".*?>',
re.I+re.S)
def add_log(self, html, log):
if not log:
return html
text = cgi.escape(log)
text = text.replace('\n', '<br>')
text = text.replace(' ', '&nbsp; ')
match = self._explicit_re.search(html)
if not match:
text = self.log_template % text
match = self._body_re.search(html)
if not match:
return text + html
else:
return html[:match.end()] + text + html[match.end():]

View File

@@ -0,0 +1,227 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Middleware that profiles the request and displays profiling
information at the bottom of each page.
"""
import sys
import os
import hotshot
import hotshot.stats
import threading
import cgi
import time
from cStringIO import StringIO
from paste import response
__all__ = ['ProfileMiddleware', 'profile_decorator']
class ProfileMiddleware(object):
"""
Middleware that profiles all requests.
All HTML pages will have profiling information appended to them.
The data is isolated to that single request, and does not include
data from previous requests.
This uses the ``hotshot`` module, which affects performance of the
application. It also runs in a single-threaded mode, so it is
only usable in development environments.
"""
style = ('clear: both; background-color: #ff9; color: #000; '
'border: 2px solid #000; padding: 5px;')
def __init__(self, app, global_conf=None,
log_filename='profile.log.tmp',
limit=40):
self.app = app
self.lock = threading.Lock()
self.log_filename = log_filename
self.limit = limit
def __call__(self, environ, start_response):
catch_response = []
body = []
def replace_start_response(status, headers, exc_info=None):
catch_response.extend([status, headers])
start_response(status, headers, exc_info)
return body.append
def run_app():
app_iter = self.app(environ, replace_start_response)
try:
body.extend(app_iter)
finally:
if hasattr(app_iter, 'close'):
app_iter.close()
self.lock.acquire()
try:
prof = hotshot.Profile(self.log_filename)
prof.addinfo('URL', environ.get('PATH_INFO', ''))
try:
prof.runcall(run_app)
finally:
prof.close()
body = ''.join(body)
headers = catch_response[1]
content_type = response.header_value(headers, 'content-type')
if content_type is None or not content_type.startswith('text/html'):
# We can't add info to non-HTML output
return [body]
stats = hotshot.stats.load(self.log_filename)
stats.strip_dirs()
stats.sort_stats('time', 'calls')
output = capture_output(stats.print_stats, self.limit)
output_callers = capture_output(
stats.print_callers, self.limit)
body += '<pre style="%s">%s\n%s</pre>' % (
self.style, cgi.escape(output), cgi.escape(output_callers))
return [body]
finally:
self.lock.release()
def capture_output(func, *args, **kw):
# Not threadsafe! (that's okay when ProfileMiddleware uses it,
# though, since it synchronizes itself.)
out = StringIO()
old_stdout = sys.stdout
sys.stdout = out
try:
func(*args, **kw)
finally:
sys.stdout = old_stdout
return out.getvalue()
def profile_decorator(**options):
"""
Profile a single function call.
Used around a function, like::
@profile_decorator(options...)
def ...
All calls to the function will be profiled. The options are
all keywords, and are:
log_file:
The filename to log to (or ``'stdout'`` or ``'stderr'``).
Default: stderr.
display_limit:
Only show the top N items, default: 20.
sort_stats:
A list of string-attributes to sort on. Default
``('time', 'calls')``.
strip_dirs:
Strip directories/module names from files? Default True.
add_info:
If given, this info will be added to the report (for your
own tracking). Default: none.
log_filename:
The temporary filename to log profiling data to. Default;
``./profile_data.log.tmp``
no_profile:
If true, then don't actually profile anything. Useful for
conditional profiling.
"""
if options.get('no_profile'):
def decorator(func):
return func
return decorator
def decorator(func):
def replacement(*args, **kw):
return DecoratedProfile(func, **options)(*args, **kw)
return replacement
return decorator
class DecoratedProfile(object):
lock = threading.Lock()
def __init__(self, func, **options):
self.func = func
self.options = options
def __call__(self, *args, **kw):
self.lock.acquire()
try:
return self.profile(self.func, *args, **kw)
finally:
self.lock.release()
def profile(self, func, *args, **kw):
ops = self.options
prof_filename = ops.get('log_filename', 'profile_data.log.tmp')
prof = hotshot.Profile(prof_filename)
prof.addinfo('Function Call',
self.format_function(func, *args, **kw))
if ops.get('add_info'):
prof.addinfo('Extra info', ops['add_info'])
exc_info = None
try:
start_time = time.time()
try:
result = prof.runcall(func, *args, **kw)
except:
exc_info = sys.exc_info()
end_time = time.time()
finally:
prof.close()
stats = hotshot.stats.load(prof_filename)
os.unlink(prof_filename)
if ops.get('strip_dirs', True):
stats.strip_dirs()
stats.sort_stats(*ops.get('sort_stats', ('time', 'calls')))
display_limit = ops.get('display_limit', 20)
output = capture_output(stats.print_stats, display_limit)
output_callers = capture_output(
stats.print_callers, display_limit)
output_file = ops.get('log_file')
if output_file in (None, 'stderr'):
f = sys.stderr
elif output_file in ('-', 'stdout'):
f = sys.stdout
else:
f = open(output_file, 'a')
f.write('\n%s\n' % ('-'*60))
f.write('Date: %s\n' % time.strftime('%c'))
f.write('Function call: %s\n'
% self.format_function(func, *args, **kw))
f.write('Wall time: %0.2f seconds\n'
% (end_time - start_time))
f.write(output)
f.write(output_callers)
if output_file not in (None, '-', 'stdout', 'stderr'):
f.close()
if exc_info:
# We captured an exception earlier, now we re-raise it
raise exc_info[0], exc_info[1], exc_info[2]
return result
def format_function(self, func, *args, **kw):
args = map(repr, args)
args.extend(
['%s=%r' % (k, v) for k, v in kw.items()])
return '%s(%s)' % (func.__name__, ', '.join(args))
def make_profile_middleware(
app, global_conf,
log_filename='profile.log.tmp',
limit=40):
"""
Wrap the application in a component that will profile each
request. The profiling data is then appended to the output
of each page.
Note that this serializes all requests (i.e., removing
concurrency). Therefore never use this in production.
"""
limit = int(limit)
return ProfileMiddleware(
app, log_filename=log_filename, limit=limit)

View File

@@ -0,0 +1,93 @@
# (c) 2005 Clark C. Evans
# This module is part of the Python Paste Project and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
# This code was written with funding by http://prometheusresearch.com
"""
WSGI Test Server
This builds upon paste.util.baseserver to customize it for regressions
where using raw_interactive won't do.
"""
import time
from paste.httpserver import *
class WSGIRegressionServer(WSGIServer):
"""
A threaded WSGIServer for use in regression testing. To use this
module, call serve(application, regression=True), and then call
server.accept() to let it handle one request. When finished, use
server.stop() to shutdown the server. Note that all pending requests
are processed before the server shuts down.
"""
defaulttimeout = 10
def __init__ (self, *args, **kwargs):
WSGIServer.__init__(self, *args, **kwargs)
self.stopping = []
self.pending = []
self.timeout = self.defaulttimeout
# this is a local connection, be quick
self.socket.settimeout(2)
def serve_forever(self):
from threading import Thread
thread = Thread(target=self.serve_pending)
thread.start()
def reset_expires(self):
if self.timeout:
self.expires = time.time() + self.timeout
def close_request(self, *args, **kwargs):
WSGIServer.close_request(self, *args, **kwargs)
self.pending.pop()
self.reset_expires()
def serve_pending(self):
self.reset_expires()
while not self.stopping or self.pending:
now = time.time()
if now > self.expires and self.timeout:
# note regression test doesn't handle exceptions in
# threads very well; so we just print and exit
print "\nWARNING: WSGIRegressionServer timeout exceeded\n"
break
if self.pending:
self.handle_request()
time.sleep(.1)
def stop(self):
""" stop the server (called from tester's thread) """
self.stopping.append(True)
def accept(self, count = 1):
""" accept another request (called from tester's thread) """
assert not self.stopping
[self.pending.append(True) for x in range(count)]
def serve(application, host=None, port=None, handler=None):
server = WSGIRegressionServer(application, host, port, handler)
print "serving on %s:%s" % server.server_address
server.serve_forever()
return server
if __name__ == '__main__':
import urllib
from paste.wsgilib import dump_environ
server = serve(dump_environ)
baseuri = ("http://%s:%s" % server.server_address)
def fetch(path):
# tell the server to humor exactly one more request
server.accept(1)
# not needed; but this is what you do if the server
# may not respond in a resonable time period
import socket
socket.setdefaulttimeout(5)
# build a uri, fetch and return
return urllib.urlopen(baseuri + path).read()
assert "PATH_INFO: /foo" in fetch("/foo")
assert "PATH_INFO: /womble" in fetch("/womble")
# ok, let's make one more final request...
server.accept(1)
# and then schedule a stop()
server.stop()
# and then... fetch it...
urllib.urlopen(baseuri)

View File

@@ -0,0 +1,347 @@
"""
Watches the key ``paste.httpserver.thread_pool`` to see how many
threads there are and report on any wedged threads.
"""
import sys
import cgi
import time
import traceback
from cStringIO import StringIO
from thread import get_ident
from paste import httpexceptions
from paste.request import construct_url, parse_formvars
from paste.util.template import HTMLTemplate, bunch
page_template = HTMLTemplate('''
<html>
<head>
<style type="text/css">
body {
font-family: sans-serif;
}
table.environ tr td {
border-bottom: #bbb 1px solid;
}
table.environ tr td.bottom {
border-bottom: none;
}
table.thread {
border: 1px solid #000;
margin-bottom: 1em;
}
table.thread tr td {
border-bottom: #999 1px solid;
padding-right: 1em;
}
table.thread tr td.bottom {
border-bottom: none;
}
table.thread tr.this_thread td {
background-color: #006;
color: #fff;
}
a.button {
background-color: #ddd;
border: #aaa outset 2px;
text-decoration: none;
margin-top: 10px;
font-size: 80%;
color: #000;
}
a.button:hover {
background-color: #eee;
border: #bbb outset 2px;
}
a.button:active {
border: #bbb inset 2px;
}
</style>
<title>{{title}}</title>
</head>
<body>
<h1>{{title}}</h1>
{{if kill_thread_id}}
<div style="background-color: #060; color: #fff;
border: 2px solid #000;">
Thread {{kill_thread_id}} killed
</div>
{{endif}}
<div>Pool size: {{nworkers}}
{{if actual_workers > nworkers}}
+ {{actual_workers-nworkers}} extra
{{endif}}
({{nworkers_used}} used including current request)<br>
idle: {{len(track_threads["idle"])}},
busy: {{len(track_threads["busy"])}},
hung: {{len(track_threads["hung"])}},
dying: {{len(track_threads["dying"])}},
zombie: {{len(track_threads["zombie"])}}</div>
{{for thread in threads}}
<table class="thread">
<tr {{if thread.thread_id == this_thread_id}}class="this_thread"{{endif}}>
<td>
<b>Thread</b>
{{if thread.thread_id == this_thread_id}}
(<i>this</i> request)
{{endif}}</td>
<td>
<b>{{thread.thread_id}}
{{if allow_kill}}
<form action="{{script_name}}/kill" method="POST"
style="display: inline">
<input type="hidden" name="thread_id" value="{{thread.thread_id}}">
<input type="submit" value="kill">
</form>
{{endif}}
</b>
</td>
</tr>
<tr>
<td>Time processing request</td>
<td>{{thread.time_html|html}}</td>
</tr>
<tr>
<td>URI</td>
<td>{{if thread.uri == 'unknown'}}
unknown
{{else}}<a href="{{thread.uri}}">{{thread.uri_short}}</a>
{{endif}}
</td>
<tr>
<td colspan="2" class="bottom">
<a href="#" class="button" style="width: 9em; display: block"
onclick="
var el = document.getElementById('environ-{{thread.thread_id}}');
if (el.style.display) {
el.style.display = '';
this.innerHTML = \'&#9662; Hide environ\';
} else {
el.style.display = 'none';
this.innerHTML = \'&#9656; Show environ\';
}
return false
">&#9656; Show environ</a>
<div id="environ-{{thread.thread_id}}" style="display: none">
{{if thread.environ:}}
<table class="environ">
{{for loop, item in looper(sorted(thread.environ.items()))}}
{{py:key, value=item}}
<tr>
<td {{if loop.last}}class="bottom"{{endif}}>{{key}}</td>
<td {{if loop.last}}class="bottom"{{endif}}>{{value}}</td>
</tr>
{{endfor}}
</table>
{{else}}
Thread is in process of starting
{{endif}}
</div>
{{if thread.traceback}}
<a href="#" class="button" style="width: 9em; display: block"
onclick="
var el = document.getElementById('traceback-{{thread.thread_id}}');
if (el.style.display) {
el.style.display = '';
this.innerHTML = \'&#9662; Hide traceback\';
} else {
el.style.display = 'none';
this.innerHTML = \'&#9656; Show traceback\';
}
return false
">&#9656; Show traceback</a>
<div id="traceback-{{thread.thread_id}}" style="display: none">
<pre class="traceback">{{thread.traceback}}</pre>
</div>
{{endif}}
</td>
</tr>
</table>
{{endfor}}
</body>
</html>
''', name='watchthreads.page_template')
class WatchThreads(object):
"""
Application that watches the threads in ``paste.httpserver``,
showing the length each thread has been working on a request.
If allow_kill is true, then you can kill errant threads through
this application.
This application can expose private information (specifically in
the environment, like cookies), so it should be protected.
"""
def __init__(self, allow_kill=False):
self.allow_kill = allow_kill
def __call__(self, environ, start_response):
if 'paste.httpserver.thread_pool' not in environ:
start_response('403 Forbidden', [('Content-type', 'text/plain')])
return ['You must use the threaded Paste HTTP server to use this application']
if environ.get('PATH_INFO') == '/kill':
return self.kill(environ, start_response)
else:
return self.show(environ, start_response)
def show(self, environ, start_response):
start_response('200 OK', [('Content-type', 'text/html')])
form = parse_formvars(environ)
if form.get('kill'):
kill_thread_id = form['kill']
else:
kill_thread_id = None
thread_pool = environ['paste.httpserver.thread_pool']
nworkers = thread_pool.nworkers
now = time.time()
workers = thread_pool.worker_tracker.items()
workers.sort(key=lambda v: v[1][0])
threads = []
for thread_id, (time_started, worker_environ) in workers:
thread = bunch()
threads.append(thread)
if worker_environ:
thread.uri = construct_url(worker_environ)
else:
thread.uri = 'unknown'
thread.thread_id = thread_id
thread.time_html = format_time(now-time_started)
thread.uri_short = shorten(thread.uri)
thread.environ = worker_environ
thread.traceback = traceback_thread(thread_id)
page = page_template.substitute(
title="Thread Pool Worker Tracker",
nworkers=nworkers,
actual_workers=len(thread_pool.workers),
nworkers_used=len(workers),
script_name=environ['SCRIPT_NAME'],
kill_thread_id=kill_thread_id,
allow_kill=self.allow_kill,
threads=threads,
this_thread_id=get_ident(),
track_threads=thread_pool.track_threads())
return [page]
def kill(self, environ, start_response):
if not self.allow_kill:
exc = httpexceptions.HTTPForbidden(
'Killing threads has not been enabled. Shame on you '
'for trying!')
return exc(environ, start_response)
vars = parse_formvars(environ)
thread_id = int(vars['thread_id'])
thread_pool = environ['paste.httpserver.thread_pool']
if thread_id not in thread_pool.worker_tracker:
exc = httpexceptions.PreconditionFailed(
'You tried to kill thread %s, but it is not working on '
'any requests' % thread_id)
return exc(environ, start_response)
thread_pool.kill_worker(thread_id)
script_name = environ['SCRIPT_NAME'] or '/'
exc = httpexceptions.HTTPFound(
headers=[('Location', script_name+'?kill=%s' % thread_id)])
return exc(environ, start_response)
def traceback_thread(thread_id):
"""
Returns a plain-text traceback of the given thread, or None if it
can't get a traceback.
"""
if not hasattr(sys, '_current_frames'):
# Only 2.5 has support for this, with this special function
return None
frames = sys._current_frames()
if not thread_id in frames:
return None
frame = frames[thread_id]
out = StringIO()
traceback.print_stack(frame, file=out)
return out.getvalue()
hide_keys = ['paste.httpserver.thread_pool']
def format_environ(environ):
if environ is None:
return environ_template.substitute(
key='---',
value='No environment registered for this thread yet')
environ_rows = []
for key, value in sorted(environ.items()):
if key in hide_keys:
continue
try:
if key.upper() != key:
value = repr(value)
environ_rows.append(
environ_template.substitute(
key=cgi.escape(str(key)),
value=cgi.escape(str(value))))
except Exception, e:
environ_rows.append(
environ_template.substitute(
key=cgi.escape(str(key)),
value='Error in <code>repr()</code>: %s' % e))
return ''.join(environ_rows)
def format_time(time_length):
if time_length >= 60*60:
# More than an hour
time_string = '%i:%02i:%02i' % (int(time_length/60/60),
int(time_length/60) % 60,
time_length % 60)
elif time_length >= 120:
time_string = '%i:%02i' % (int(time_length/60),
time_length % 60)
elif time_length > 60:
time_string = '%i sec' % time_length
elif time_length > 1:
time_string = '%0.1f sec' % time_length
else:
time_string = '%0.2f sec' % time_length
if time_length < 5:
return time_string
elif time_length < 120:
return '<span style="color: #900">%s</span>' % time_string
else:
return '<span style="background-color: #600; color: #fff">%s</span>' % time_string
def shorten(s):
if len(s) > 60:
return s[:40]+'...'+s[-10:]
else:
return s
def make_watch_threads(global_conf, allow_kill=False):
from paste.deploy.converters import asbool
return WatchThreads(allow_kill=asbool(allow_kill))
make_watch_threads.__doc__ = WatchThreads.__doc__
def make_bad_app(global_conf, pause=0):
pause = int(pause)
def bad_app(environ, start_response):
import thread
if pause:
time.sleep(pause)
else:
count = 0
while 1:
print "I'm alive %s (%s)" % (count, thread.get_ident())
time.sleep(10)
count += 1
start_response('200 OK', [('content-type', 'text/plain')])
return ['OK, paused %s seconds' % pause]
return bad_app

View File

@@ -0,0 +1,121 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Middleware that tests the validity of all generated HTML using the
`WDG HTML Validator <http://www.htmlhelp.com/tools/validator/>`_
"""
from cStringIO import StringIO
try:
import subprocess
except ImportError:
from paste.util import subprocess24 as subprocess
from paste.response import header_value
import re
import cgi
__all__ = ['WDGValidateMiddleware']
class WDGValidateMiddleware(object):
"""
Middleware that checks HTML and appends messages about the validity of
the HTML. Uses: http://www.htmlhelp.com/tools/validator/ -- interacts
with the command line client. Use the configuration ``wdg_path`` to
override the path (default: looks for ``validate`` in $PATH).
To install, in your web context's __init__.py::
def urlparser_wrap(environ, start_response, app):
return wdg_validate.WDGValidateMiddleware(app)(
environ, start_response)
Or in your configuration::
middleware.append('paste.wdg_validate.WDGValidateMiddleware')
"""
_end_body_regex = re.compile(r'</body>', re.I)
def __init__(self, app, global_conf=None, wdg_path='validate'):
self.app = app
self.wdg_path = wdg_path
def __call__(self, environ, start_response):
output = StringIO()
response = []
def writer_start_response(status, headers, exc_info=None):
response.extend((status, headers))
start_response(status, headers, exc_info)
return output.write
app_iter = self.app(environ, writer_start_response)
try:
for s in app_iter:
output.write(s)
finally:
if hasattr(app_iter, 'close'):
app_iter.close()
page = output.getvalue()
status, headers = response
v = header_value(headers, 'content-type') or ''
if (not v.startswith('text/html')
and not v.startswith('text/xhtml')
and not v.startswith('application/xhtml')):
# Can't validate
# @@: Should validate CSS too... but using what?
return [page]
ops = []
if v.startswith('text/xhtml+xml'):
ops.append('--xml')
# @@: Should capture encoding too
html_errors = self.call_wdg_validate(
self.wdg_path, ops, page)
if html_errors:
page = self.add_error(page, html_errors)[0]
headers.remove(
('Content-Length',
str(header_value(headers, 'content-length'))))
headers.append(('Content-Length', str(len(page))))
return [page]
def call_wdg_validate(self, wdg_path, ops, page):
if subprocess is None:
raise ValueError(
"This middleware requires the subprocess module from "
"Python 2.4")
proc = subprocess.Popen([wdg_path] + ops,
shell=False,
close_fds=True,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout = proc.communicate(page)[0]
proc.wait()
return stdout
def add_error(self, html_page, html_errors):
add_text = ('<pre style="background-color: #ffd; color: #600; '
'border: 1px solid #000;">%s</pre>'
% cgi.escape(html_errors))
match = self._end_body_regex.search(html_page)
if match:
return [html_page[:match.start()]
+ add_text
+ html_page[match.start():]]
else:
return [html_page + add_text]
def make_wdg_validate_middleware(
app, global_conf, wdg_path='validate'):
"""
Wraps the application in the WDG validator from
http://www.htmlhelp.com/tools/validator/
Validation errors are appended to the text of each page.
You can configure this by giving the path to the validate
executable (by default picked up from $PATH)
"""
return WDGValidateMiddleware(
app, global_conf, wdg_path=wdg_path)