Revision: 5f50bfd68dcd
Author: Rodrigo Moraes <
rodrigo...@gmail.com>
Date: Sun Apr 3 07:55:49 2011
Log: Editor was set to use tabs. Moving back to spaces.
http://code.google.com/p/tipfy/source/detail?r=5f50bfd68dcd
Modified:
/CHANGES.txt
/run_tests.py
/tests/app_test.py
/tests/auth_test.py
/tests/config_test.py
/tests/gae_blobstore_test.py
/tests/gae_db_test.py
/tests/gae_mail_test.py
/tests/gae_taskqueue_test.py
/tests/gae_xmpp_test.py
/tests/manage_test.py
/tests/routing_test.py
/tests/secure_cookie_test.py
/tests/template_test.py
/tests/utils_test.py
/tipfy/app.py
/tipfy/utils.py
=======================================
--- /CHANGES.txt Sun Apr 3 07:25:05 2011
+++ /CHANGES.txt Sun Apr 3 07:55:49 2011
@@ -30,7 +30,7 @@
- APPENGINE: True is the app is running on App Engine, False otherwise.
- APPLICATION_ID: the application ID as defined in *app.yaml*.
- CURRENT_VERSION_ID: the deployed version ID. Always '1' when using
- the dev server.
+ the dev server.
- NEW: auth, debugger, i18n and sessions are now part of tipfy core. They
are
all optional, lazily loaded, default and overridable implementations.
@@ -41,9 +41,9 @@
- NEW: Auth is now part of tipfy core. The auth store is configured in
tipfy
module and uses an App Engine store by default::
- config['tipfy'] = {
- 'auth_store_class': 'tipfy.appengine.auth.AuthStore',
- }
+ config['tipfy'] = {
+ 'auth_store_class': 'tipfy.appengine.auth.AuthStore',
+ }
Then it is available through lazy loading in the RequestHandler:
@@ -51,13 +51,13 @@
Example::
- class MyHandler(RequestHandler):
- def get(self, **kwargs):
- # Get the current user, if any.
- current_user = self.auth.user
-
- # Generate a login URL.
- login_url = self.auth.login_url()
+ class MyHandler(RequestHandler):
+ def get(self, **kwargs):
+ # Get the current user, if any.
+ current_user = self.auth.user
+
+ # Generate a login URL.
+ login_url = self.auth.login_url()
- NEW: AppEngineMixedAuthStore, which allows login using App Engine Auth
across subdomains. It mixes App Engine auth with own sessions to achieve
@@ -73,9 +73,9 @@
- NEW: i18n is now part of tipfy core. The i18n store is configured in
tipfy module and uses the shipped store by default::
- config['tipfy'] = {
- 'i18n_store_class': 'tipfy.i18n.I18nStore',
- }
+ config['tipfy'] = {
+ 'i18n_store_class': 'tipfy.i18n.I18nStore',
+ }
Then it is available through lazy loading in the RequestHandler:
@@ -83,10 +83,10 @@
Example::
- class MyHandler(RequestHandler):
- def get(self, **kwargs):
- # Get a translated string.
- message = self.i18n.gettext('Translate me')
+ class MyHandler(RequestHandler):
+ def get(self, **kwargs):
+ # Get a translated string.
+ message = self.i18n.gettext('Translate me')
Global i18n functions are still available in tipfy.i18n.
@@ -96,34 +96,34 @@
- NEW: Sessions are now part of tipfy core. The auth store is configured in
tipfy module and uses the shipped store by default::
- config['tipfy'] = {
- 'session_store_class': 'tipfy.sessions.SessionStore',
- }
+ config['tipfy'] = {
+ 'session_store_class': 'tipfy.sessions.SessionStore',
+ }
Then it is available through lazy loading in the RequestHandler:
- RequestHandler.session provides access to the session using the
configured
- backend and session key.
+ backend and session key.
- RequestHandler.session_store provides session related utilities such as
- getting sessions from different backends and setting/deleting cookies.
+ getting sessions from different backends and setting/deleting cookies.
Example::
- class MyHandler(RequestHandler):
- middleware = [SessionMiddleware()]
-
- def get(self, **kwargs):
- # Set a value in the current session.
- self.session['foo'] = 'bar'
-
- # Get a list of flash messages.
- flashes = self.session.get_flashes()
-
- # Add a flash message.
- self.session.flash('Hello, world!')
-
- # Get a session from a different backend.
- session = self.session_store.get_session(backend='memcache')
+ class MyHandler(RequestHandler):
+ middleware = [SessionMiddleware()]
+
+ def get(self, **kwargs):
+ # Set a value in the current session.
+ self.session['foo'] = 'bar'
+
+ # Get a list of flash messages.
+ flashes = self.session.get_flashes()
+
+ # Add a flash message.
+ self.session.flash('Hello, world!')
+
+ # Get a session from a different backend.
+ session = self.session_store.get_session(backend='memcache')
- NEW: secure cookie has a faster and easier to use implementation. See
tipfy.sessions.SecureCookieStore.
@@ -138,9 +138,9 @@
template engine. The debugger is enabled by default when in development
mode,
and can be disabled with a configuration:
- config['tipfy'] = {
- 'enable_debugger': False,
- }
+ config['tipfy'] = {
+ 'enable_debugger': False,
+ }
- IMPROVED: the debugger now works even if the libraries are imported using
zipimport.
@@ -152,17 +152,17 @@
method is defined using a 'Handler:method' notation in the Rule
definition. Example:
- # handlers.py
-
- class MyHandler(RequestHandler):
- def my_method(self, **kwargs):
- return Response('I am not coming from a get() method!')
-
- # main.py
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler='handlers.MyHandler:my_method')
- ])
+ # handlers.py
+
+ class MyHandler(RequestHandler):
+ def my_method(self, **kwargs):
+ return Response('I am not coming from a get() method!')
+
+ # main.py
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler='handlers.MyHandler:my_method')
+ ])
This opens a whole new set of possibilities for routing and handler
definitions, as you can match the HTTP method scheme and custom method
@@ -170,43 +170,43 @@
a single class can handle listing, viewing, inserting, editting and
deleting::
- rules = [
- HandlerPrefix('handlers.ImageHandler', [
- # List images is handled by ImageHandler.index()
- Rule('/images/', name='images/index', handler=':index'),
- # Adding an image is handled by ImageHandler.new()
- Rule('/images/new', name='images/edit', handler=':new'),
- # Displaying an image is handled by ImageHandler.get()
- Rule('/image/<key>', name='images/view', handler=''),
- # Editting an image is handled by ImageHandler.edit()
- Rule('/images/<key>/edit', name='images/edit', handler=':edit'),
- # Deleting an image is handled by ImageHandler.delete()
- Rule('/images/<key>/delete', name='images/delete', handler=':delete'),
- ],
- ]
+ rules = [
+ HandlerPrefix('handlers.ImageHandler', [
+ # List images is handled by ImageHandler.index()
+ Rule('/images/', name='images/index', handler=':index'),
+ # Adding an image is handled by ImageHandler.new()
+ Rule('/images/new', name='images/edit', handler=':new'),
+ # Displaying an image is handled by ImageHandler.get()
+ Rule('/image/<key>', name='images/view', handler=''),
+ # Editting an image is handled by ImageHandler.edit()
+ Rule('/images/<key>/edit', name='images/edit',
handler=':edit'),
+ # Deleting an image is handled by ImageHandler.delete()
+ Rule('/images/<key>/delete', name='images/delete',
handler=':delete'),
+ ],
+ ]
- NEW: added a router object that centralizes URL matching, dispatching and
building URLs. It makes it more flexible to implement alternative routing
schemes. Some implications:
- Moved methods from Tipfy that are now part of the router functionality:
- get_url_map(), add_url_rule(), match_url(), pre_dispatch(), dispatch() and
- post_dispatch(). If you extended them in some way, you must now extend
- the router.
+ get_url_map(), add_url_rule(), match_url(), pre_dispatch(), dispatch()
and
+ post_dispatch(). If you extended them in some way, you must now extend
+ the router.
- URL building is centralized in the router, so url_for() methods
- (from RequestHandler or Tipfy) call router.build().
+ (from RequestHandler or Tipfy) call router.build().
- NEW: added a new rule converter, RegexConverter, which can be usefull
specially to match subdomains excluding some default ones (e.g., 'www').
For example, here we define a rule for the root URL of all non-www
subdomains, and a different one for www::
- rules = [
- Rule('/', endpoint='index', handler='IndexHandler'),
- Subdomain(r'<regex("(?!www\\b)\\w+"):namespace>', [
- Rule('/', endpoint='index', handler='SubdomainIndexHandler'),
- ]),
- ]
+ rules = [
+ Rule('/', endpoint='index', handler='IndexHandler'),
+ Subdomain(r'<regex("(?!www\\b)\\w+"):namespace>', [
+ Rule('/', endpoint='index', handler='SubdomainIndexHandler'),
+ ]),
+ ]
In this example, the request method is ignored since the Rule defines
which
handler method will handle all requests.
@@ -216,88 +216,88 @@
- **_full**: If True, builds an absolute URL.
- **_method**: Uses a rule defined to handle specific request
- methods, if any are defined.
+ methods, if any are defined.
- **_scheme**: URL scheme, e.g., `http` or `https`. If defined,
- an absolute URL is always returned.
+ an absolute URL is always returned.
- **_netloc**: Network location, e.g., `
www.google.com`. If
- defined, an absolute URL is always returned.
+ defined, an absolute URL is always returned.
- **_anchor**: If set, appends an anchor to generated URL.
- IMPROVED: Rule now accepts 'name' as keyword argument. It is an alias to
'endpoint'. This is just for semantic correctness in tipfy context and
'endpoint' will still work::
- Rule('/', name='home', handler='handlers.HomeHandler')
- # is the same as...
- Rule('/', endpoint='home', handler='handlers.HomeHandler')
+ Rule('/', name='home', handler='handlers.HomeHandler')
+ # is the same as...
+ Rule('/', endpoint='home', handler='handlers.HomeHandler')
- CHANGED: URL rules are not automatically loaded from urls.py. Instead
tipfy
is instantiated passing the URL rules, like webapp. Later other rules can
be added to the router. For example:
- app = Tipfy(rules=[
- Rule('/', name='home', handler='handlers.HomeHandler'),
- Rule('/about', name='about', handler='handlers.AboutHandler'),
- ])
-
- # Add an extra rule.
- app.router.add(Rule('/contact', name='contact',
handler='handlers.ContactHandler'))
-
- # Or add a list of rules.
- app.router.add([
- Rule('/products', name='products', handler='handlers.ProductsHandler'),
- Rule('/history', name='history', handler='handlers.HistorytHandler'),
- ])
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler='handlers.HomeHandler'),
+ Rule('/about', name='about', handler='handlers.AboutHandler'),
+ ])
+
+ # Add an extra rule.
+ app.router.add(Rule('/contact', name='contact',
handler='handlers.ContactHandler'))
+
+ # Or add a list of rules.
+ app.router.add([
+ Rule('/products', name='products',
handler='handlers.ProductsHandler'),
+ Rule('/history', name='history',
handler='handlers.HistorytHandler'),
+ ])
The previous behavior using urls.py is easy to reproduce. For example,
given a urls.py like the following:
- from tipfy import Rule
-
- def get_rules(app):
- rules = [
- Rule('/', name='home', handler='handlers.HomeHandler'),
- Rule('/about', name='about', handler='handlers.AboutHandler'),
- Rule('/contact', name='contact', handler='handlers.ContactHandler'),
- ]
-
- return rules
+ from tipfy import Rule
+
+ def get_rules(app):
+ rules = [
+ Rule('/', name='home', handler='handlers.HomeHandler'),
+ Rule('/about', name='about',
handler='handlers.AboutHandler'),
+ Rule('/contact', name='contact',
handler='handlers.ContactHandler'),
+ ]
+
+ return rules
You can still import and use it in main.py:
- from urls import get_rules
-
- app = Tipfy()
-
- # Add a list of rules.
- app.router.add(get_rules(app))
+ from urls import get_rules
+
+ app = Tipfy()
+
+ # Add a list of rules.
+ app.router.add(get_rules(app))
Or you can simply have a list of rules defined in urls.py, outside of a
function, and import it:
- from urls import rules
-
- app = Tipfy(rules=rules)
+ from urls import rules
+
+ app = Tipfy(rules=rules)
- IMPROVED: redirect() (or self.redirect() in handlers) now accepts
relative
URLs. If a relative URL is passed, it'll be joined to the current request
URL. This is mostly useful to redirect to the result of url_for() without
needing to build a full URL::
- # current URL is
http://localhost/foo/bar
-
- # redirects to
http://localhost/baz
- return redirect('/baz')
+ # current URL is
http://localhost/foo/bar
+
+ # redirects to
http://localhost/baz
+ return redirect('/baz')
...but it also allows curious results for relative paths:
- # current URL is
http://localhost/foo/bar
-
- # redirects to
http://localhost/foo/baz
- return redirect('./baz')
-
- # redirects to
http://localhost/baz
- return redirect('../baz')
+ # current URL is
http://localhost/foo/bar
+
+ # redirects to
http://localhost/foo/baz
+ return redirect('./baz')
+
+ # redirects to
http://localhost/baz
+ return redirect('../baz')
Config
@@ -306,15 +306,15 @@
auto-loading configuration values when needed and honoring required
configs.
For example, we always used this::
- secret_key = self.app.get_config('tipfy.sessions', 'secret_key')
+ secret_key = self.app.get_config('tipfy.sessions', 'secret_key')
Now it is also possible to use direct access and dict methods::
- secret_key = self.app.config['tipfy.sessions']['secret_key']
- # or...
- secret_key = self.app.config['tipfy.sessions'].get('secret_key')
- # or...
- secret_key = self.app.config.get('tipfy.sessions').get('secret_key')
+ secret_key = self.app.config['tipfy.sessions']['secret_key']
+ # or...
+ secret_key = self.app.config['tipfy.sessions'].get('secret_key')
+ # or...
+ secret_key = self.app.config.get('tipfy.sessions').get('secret_key')
The previous get_config() method works as always.
@@ -327,32 +327,32 @@
build a response. HTTPExceptions are also converted to a proper response
when returned. Example:
- class MyHandler(RequestHandler):
- def get(self, **kwargs):
- # Returns a simple string.
- return 'Hello, World'
-
- # Returns a tuple of values to create a response.
- return 'Not Found', 404
-
- # Return an HTTPException which will be converted to a response.
- return MethodNotAllowed()
+ class MyHandler(RequestHandler):
+ def get(self, **kwargs):
+ # Returns a simple string.
+ return 'Hello, World'
+
+ # Returns a tuple of values to create a response.
+ return 'Not Found', 404
+
+ # Return an HTTPException which will be converted to a
response.
+ return MethodNotAllowed()
- NEW: added Tipfy.get_test_handler(). This provides a context to test a
handler, matching it from a custom request if needed. This is intended
to be
used with a `with` statement::
- from __future__ import with_statement
-
- from tipfy import Tipfy, Rule
-
- app = Tipfy(rules=[
- Rule('/about', name='home', handler='handlers.AboutHandler'),
- ])
-
- with app.get_test_handler('/about') as handler:
- self.assertEqual(handler.url_for('/', _full=True),
- '
http://localhost/about')
+ from __future__ import with_statement
+
+ from tipfy import Tipfy, Rule
+
+ app = Tipfy(rules=[
+ Rule('/about', name='home', handler='handlers.AboutHandler'),
+ ])
+
+ with app.get_test_handler('/about') as handler:
+ self.assertEqual(handler.url_for('/', _full=True),
+ '
http://localhost/about')
- NEW: app has now a debug attribute, set when it is initialized with
debug=True.
@@ -419,19 +419,19 @@
- Added HandlerPrefix, a rule factory to prefix handler definition in
rules.
For example, take these rules::
- rules = [
- HandlerPrefix('my_app.handlers.', [
- Rule('/', endpoint='index', handler='IndexHandler'),
- Rule('/entry/<entry_slug>', endpoint='show', handler='ShowHandler'),
- ]),
- ]
+ rules = [
+ HandlerPrefix('my_app.handlers.', [
+ Rule('/', endpoint='index', handler='IndexHandler'),
+ Rule('/entry/<entry_slug>', endpoint='show',
handler='ShowHandler'),
+ ]),
+ ]
These are the same as::
- rules = [
- Rule('/', endpoint='index', handler='my_app.handlers.IndexHandler'),
- Rule('/entry/<entry_slug>', endpoint='show',
handler='my_app.handlers.ShowHandler'),
- ]
+ rules = [
+ Rule('/', endpoint='index',
handler='my_app.handlers.IndexHandler'),
+ Rule('/entry/<entry_slug>', endpoint='show',
handler='my_app.handlers.ShowHandler'),
+ ]
- If the requested method is not in ALLOWED_METHODS, a NotImplemented
exception
("501 Not Implemented") is raised, instead of MethodNotAllowed ("405
Method
@@ -458,56 +458,56 @@
defined in a single file mapped in app.yaml, without the need for a
separate
urls.py::
- import sys
- sys.path.insert(0, 'distlib')
-
- from tipfy import RequestHandler, Rule, make_wsgi_app, run_wsgi_app
-
- class HelloWorldHandler(RequestHandler):
- def get(self, **kwargs):
- return 'Hello, World!'
-
- class ByeWorldHandler(RequestHandler):
- def get(self, **kwargs):
- return 'Bye, World!'
-
- app = make_wsgi_app(rules=[
- Rule('/', endpoint='hello', handler=HelloWorldHandler),
- Rule('/bye', endpoint='bye', handler=ByeWorldHandler),
- ])
-
- def main():
- run_wsgi_app(app)
-
- if __name__ == '__main__':
- main()
+ import sys
+ sys.path.insert(0, 'distlib')
+
+ from tipfy import RequestHandler, Rule, make_wsgi_app, run_wsgi_app
+
+ class HelloWorldHandler(RequestHandler):
+ def get(self, **kwargs):
+ return 'Hello, World!'
+
+ class ByeWorldHandler(RequestHandler):
+ def get(self, **kwargs):
+ return 'Bye, World!'
+
+ app = make_wsgi_app(rules=[
+ Rule('/', endpoint='hello', handler=HelloWorldHandler),
+ Rule('/bye', endpoint='bye', handler=ByeWorldHandler),
+ ])
+
+ def main():
+ run_wsgi_app(app)
+
+ if __name__ == '__main__':
+ main()
This also helps to test handlers defined in a test file::
- import unittest
-
- from tipfy import RequestHandler, Rule, Tipfy, redirect
-
- class TestHandler(unittest.TestCase):
- def test_redirect(self):
- class Handler1(RequestHandler):
- def get(self, **kwargs):
- return redirect_to('test2')
-
- class Handler2(RequestHandler):
- def get(self, **kwargs):
- return 'Hello, World!'
-
- rules = [
- Rule('/1', endpoint='test1', handler=Handler1),
- Rule('/2', endpoint='test2', handler=Handler2),
- ]
-
- app = Tipfy(rules=rules)
- client = app.get_test_client()
- response = client.get('/1', follow_redirects=True)
-
- assert response.data == 'Hello, World!'
+ import unittest
+
+ from tipfy import RequestHandler, Rule, Tipfy, redirect
+
+ class TestHandler(unittest.TestCase):
+ def test_redirect(self):
+ class Handler1(RequestHandler):
+ def get(self, **kwargs):
+ return redirect_to('test2')
+
+ class Handler2(RequestHandler):
+ def get(self, **kwargs):
+ return 'Hello, World!'
+
+ rules = [
+ Rule('/1', endpoint='test1', handler=Handler1),
+ Rule('/2', endpoint='test2', handler=Handler2),
+ ]
+
+ app = Tipfy(rules=rules)
+ client = app.get_test_client()
+ response = client.get('/1', follow_redirects=True)
+
+ assert response.data == 'Hello, World!'
- Added get_url_map() and add_url_rule() methods to the WSGI app.
@@ -515,11 +515,11 @@
object if needed. So a simple string can also be returned, among other
options::
- from tipfy import RequestHandler, Response
-
- class HelloWorldHandler(RequestHandler):
- def get(self):
- return 'Hello, World!'
+ from tipfy import RequestHandler, Response
+
+ class HelloWorldHandler(RequestHandler):
+ def get(self):
+ return 'Hello, World!'
Backwards compatibility warnings
@@ -532,7 +532,7 @@
* request, local.request: Tipfy.request, a class attribute in the WSGI
app.
* app,
local.app: Tipfy.app, a class attribute in the WSGI app.
* local: request.context and request.registry, dictionaries set for a
- request.
+ request.
- REMOVED: old aliases: WSGIApplication (Tipfy) and REQUIRED_CONFIG
(REQUIRED_VALUE)
@@ -580,16 +580,16 @@
my_middleware = MyMiddleware(foo='bar')
class MyRequestHandler(RequestHandler):
- middleware = [my_middleware]
-
- # ...
+ middleware = [my_middleware]
+
+ # ...
my_middleware2 = MyMiddleware(foo='zing')
class MyRequestHandler2(RequestHandler):
- middleware = [my_middleware2]
-
- # ...
+ middleware = [my_middleware2]
+
+ # ...
- Fixed using tipfy with zipimport. For whatever reason py_zipimport fails
to
load werkzeug modules if werkzeug package is not imported first. This is
@@ -609,11 +609,11 @@
- REMOVED 'pre_run_app' hook. Use post_make_app instead:
- def post_make_app(self, app):
- # Wrap the callable, so we keep a reference to the app...
- app.wsgi_app = my_middleware(app.wsgi_app)
- # ...and return the original app.
- return app
+ def post_make_app(self, app):
+ # Wrap the callable, so we keep a reference to the app...
+ app.wsgi_app = my_middleware(app.wsgi_app)
+ # ...and return the original app.
+ return app
- IMPROVED tipfy.ext.appstatts and tipfy.ext.debugger: middleware is set on
'post_make_app'.
@@ -629,10 +629,10 @@
WSGIApplication and Request in the handler. If you implemented
__init__() on
RequestHandler, please update it to accept these arguments:
- class MyRequestHandler(RequestHandler):
- def __init__(self, app, request):
- super(MyRequestHandler, self).__init__(app, request)
- ...
+ class MyRequestHandler(RequestHandler):
+ def __init__(self, app, request):
+ super(MyRequestHandler, self).__init__(app, request)
+ ...
We now have access to the URL rule and rule arguments that matched on
__init__(), and can read app configuration without relying on local.
@@ -660,24 +660,24 @@
So an application using Tipfy outside of App Engine needs to setup a
proxy on
these attributes pointing to a thread local. This monkeypatch covers it
all:
- from werkzeug import Local
-
- local = Local()
-
- def set_wsgi_app(self):
-
local.app = self
-
- def set_request(self, request):
- local.request = request
-
- def cleanup(self):
- local.__release_local__()
-
- Tipfy.set_wsgi_app = set_wsgi_app
- Tipfy.set_request = set_request
- Tipfy.cleanup = cleanup
- Tipfy.app = local('app')
- Tipfy.request = local('request')
+ from werkzeug import Local
+
+ local = Local()
+
+ def set_wsgi_app(self):
+
local.app = self
+
+ def set_request(self, request):
+ local.request = request
+
+ def cleanup(self):
+ local.__release_local__()
+
+ Tipfy.set_wsgi_app = set_wsgi_app
+ Tipfy.set_request = set_request
+ Tipfy.cleanup = cleanup
+ Tipfy.app = local('app')
+ Tipfy.request = local('request')
This makes Tipfy.app and Tipfy.request thread-safe outside of App Engine.
@@ -688,8 +688,8 @@
append an anchor to the generated URL. Also keyword arguments in these
two
functions are now prefixed with a '_':
- url_for(endpoint, _full=False, _method=None, _anchor=None, **kwargs)
- redirect_to(endpoint, _method=None, _anchor=None, _code=302, **kwargs)
+ url_for(endpoint, _full=False, _method=None, _anchor=None, **kwargs)
+ redirect_to(endpoint, _method=None, _anchor=None, _code=302, **kwargs)
This is just a sanity measure to avoid collisions with variables set in
rules. The old, non-prefixed keywords are still accepted, but they should
@@ -727,22 +727,22 @@
- ADDED some ideas from Flask:
- Local variables are not cleaned when in development and an exception is
- raised. This allows the debugger to still access request and other
- variables from local in the interactive shell.
+ raised. This allows the debugger to still access request and other
+ variables from local in the interactive shell.
- Added a wsgi_app() function to WSGIApplication, wrapping __call__().
- Added a make_response() method to WSGIApplication, so that handlers
- can return not only a Response object.
+ can return not only a Response object.
- Refactoring: split pre_dispatch/dispatch/post_dispatch into separate
- functions. Makes it easier to override these methods in a middleware.
+ functions. Makes it easier to override these methods in a middleware.
- Added function get_test_client() to return a werkzeug.Client for the
wsgi
- app.
+ app.
- Added function ext.jinja2.get_template_attribute().
- ADDED: a Response object that extends werkzeug's Response, with default
- mimetype set to 'text/html'.
+ mimetype set to 'text/html'.
- REMOVED:
- tipfy.response and 'extensions' configuration key. They were deprecated
- since 0.5 and not useful at all since then.
+ since 0.5 and not useful at all since then.
- tipfy.local_manager. No longer needed since latest Werkzeug.
=======================================
--- /run_tests.py Sun Apr 3 07:14:04 2011
+++ /run_tests.py Sun Apr 3 07:55:49 2011
@@ -7,34 +7,34 @@
current_path = os.path.abspath(os.path.dirname(__file__))
tests_path = os.path.join(current_path, 'tests')
sys.path[0:0] = [
- tests_path,
- gae_path,
- os.path.join(gae_path, 'lib', 'django_0_96'),
- os.path.join(gae_path, 'lib', 'webob'),
- os.path.join(gae_path, 'lib', 'yaml', 'lib'),
+ tests_path,
+ gae_path,
+ os.path.join(gae_path, 'lib', 'django_0_96'),
+ os.path.join(gae_path, 'lib', 'webob'),
+ os.path.join(gae_path, 'lib', 'yaml', 'lib'),
]
all_tests = [f[:-8] for f in os.listdir(tests_path) if
f.endswith('_test.py')]
def get_suite(tests):
- tests = sorted(tests)
- suite = unittest.TestSuite()
- loader = unittest.TestLoader()
- for test in tests:
- suite.addTest(loader.loadTestsFromName(test))
- return suite
+ tests = sorted(tests)
+ suite = unittest.TestSuite()
+ loader = unittest.TestLoader()
+ for test in tests:
+ suite.addTest(loader.loadTestsFromName(test))
+ return suite
if __name__ == '__main__':
- # To run all tests:
- # $ python run_tests.py
- # To run a single test:
- # $ python run_tests.py app
- # To run a couple of tests:
- # $ python run_tests.py app config sessions
- tests = sys.argv[1:]
- if not tests:
- tests = all_tests
- tests = ['%s_test' % t for t in tests]
- suite = get_suite(tests)
- unittest.TextTestRunner(verbosity=2).run(suite)
-
+ # To run all tests:
+ # $ python run_tests.py
+ # To run a single test:
+ # $ python run_tests.py app
+ # To run a couple of tests:
+ # $ python run_tests.py app config sessions
+ tests = sys.argv[1:]
+ if not tests:
+ tests = all_tests
+ tests = ['%s_test' % t for t in tests]
+ suite = get_suite(tests)
+ unittest.TextTestRunner(verbosity=2).run(suite)
+
=======================================
--- /tests/app_test.py Sat Apr 2 13:52:07 2011
+++ /tests/app_test.py Sun Apr 3 07:55:49 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for
tipfy.app
+ Tests for
tipfy.app
"""
from __future__ import with_statement
@@ -22,518 +22,518 @@
import test_utils
class BaseTestCase(test_utils.BaseTestCase):
- def setUp(self):
- self.appengine = tipfy.app.APPENGINE
- self.dev_appserver = tipfy.app.DEV_APPSERVER
- test_utils.BaseTestCase.setUp(self)
-
- def tearDown(self):
- tipfy.app.APPENGINE = self.appengine
- tipfy.app.DEV_APPSERVER = self.dev_appserver
- test_utils.BaseTestCase.tearDown(self)
-
- def _set_dev_server_flag(self, flag):
- tipfy.app.APPENGINE = flag
- tipfy.app.DEV_APPSERVER = flag
+ def setUp(self):
+ self.appengine = tipfy.app.APPENGINE
+ self.dev_appserver = tipfy.app.DEV_APPSERVER
+ test_utils.BaseTestCase.setUp(self)
+
+ def tearDown(self):
+ tipfy.app.APPENGINE = self.appengine
+ tipfy.app.DEV_APPSERVER = self.dev_appserver
+ test_utils.BaseTestCase.tearDown(self)
+
+ def _set_dev_server_flag(self, flag):
+ tipfy.app.APPENGINE = flag
+ tipfy.app.DEV_APPSERVER = flag
class AllMethodsHandler(RequestHandler):
- def get(self, **kwargs):
- return Response('Method: %s' % self.request.method)
-
- delete = head = options = post = put = trace = get
+ def get(self, **kwargs):
+ return Response('Method: %s' % self.request.method)
+
+ delete = head = options = post = put = trace = get
class BrokenHandler(RequestHandler):
- def get(self, **kwargs):
- raise ValueError('booo!')
+ def get(self, **kwargs):
+ raise ValueError('booo!')
class BrokenButFixedHandler(BrokenHandler):
- def handle_exception(self, exception=None):
- # Let's fix it.
- return Response('That was close!', status=200)
+ def handle_exception(self, exception=None):
+ # Let's fix it.
+ return Response('That was close!', status=200)
class Handle404(RequestHandler):
- def handle_exception(self, exception=None):
- return Response('404 custom handler', status=404)
+ def handle_exception(self, exception=None):
+ return Response('404 custom handler', status=404)
class Handle405(RequestHandler):
- def handle_exception(self, exception=None):
- response = Response('405 custom handler', status=405)
- response.headers['Allow'] = 'GET'
- return response
+ def handle_exception(self, exception=None):
+ response = Response('405 custom handler', status=405)
+ response.headers['Allow'] = 'GET'
+ return response
class Handle500(RequestHandler):
- def handle_exception(self, exception=None):
- return Response('500 custom handler', status=500)
+ def handle_exception(self, exception=None):
+ return Response('500 custom handler', status=500)
class TestRequestHandler(BaseTestCase):
- def test_200(self):
- app = Tipfy(rules=[Rule('/', name='home', handler=AllMethodsHandler)])
- client = app.get_test_client()
-
- for method in app.allowed_methods:
- response = client.open('/', method=method)
- self.assertEqual(response.status_code, 200, method)
- if method == 'HEAD':
- self.assertEqual(response.data, '')
- else:
- self.assertEqual(response.data, 'Method: %s' % method)
-
- # App Engine mode.
- self._set_dev_server_flag(True)
- response = client.get('/')
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'Method: GET')
-
- def test_404(self):
- """No URL rules defined."""
- app = Tipfy()
- client = app.get_test_client()
-
- # Normal mode.
- response = client.get('/')
- self.assertEqual(response.status_code, 404)
-
- # Debug mode.
- app.debug = True
- response = client.get('/')
- self.assertEqual(response.status_code, 404)
-
- def test_500(self):
- """Handler import will fail."""
- app = Tipfy(rules=[Rule('/', name='home',
handler='non.existent.handler')])
- client = app.get_test_client()
-
- # Normal mode.
- response = client.get('/')
- self.assertEqual(response.status_code, 500)
-
- # Debug mode.
- app.debug = True
- app.config['tipfy']['enable_debugger'] = False
- self.assertRaises(ImportError, client.get, '/')
-
- def test_501(self):
- """Method is not in app.allowed_methods."""
- app = Tipfy()
- client = app.get_test_client()
-
- # Normal mode.
- response = client.open('/', method='CONNECT')
- self.assertEqual(response.status_code, 501)
-
- # Debug mode.
- app.debug = True
- response = client.open('/', method='CONNECT')
- self.assertEqual(response.status_code, 501)
-
- def test_abort(self):
- class HandlerWithAbort(RequestHandler):
- def get(self, **kwargs):
- self.abort(kwargs.get('status_code'))
-
- app = Tipfy(rules=[
- Rule('/<int:status_code>', name='abort-me', handler=HandlerWithAbort),
- ])
- client = app.get_test_client()
-
- response = client.get('/400')
- self.assertEqual(response.status_code, 400)
-
- response = client.get('/403')
- self.assertEqual(response.status_code, 403)
-
- response = client.get('/404')
- self.assertEqual(response.status_code, 404)
-
- def test_get_config(self):
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- ], config = {
- 'foo': {
- 'bar': 'baz',
- }
- })
- with app.get_test_handler('/') as handler:
- self.assertEqual(handler.get_config('foo', 'bar'), 'baz')
-
- def test_handle_exception(self):
- app = Tipfy([
- Rule('/', handler=AllMethodsHandler, name='home'),
- Rule('/broken', handler=BrokenHandler, name='broken'),
- Rule('/broken-but-fixed', handler=BrokenButFixedHandler,
name='broken-but-fixed'),
- ], debug=False)
- client = app.get_test_client()
-
- response = client.get('/broken')
- self.assertEqual(response.status_code, 500)
-
- response = client.get('/broken-but-fixed')
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'That was close!')
-
- def test_redirect(self):
- class HandlerWithRedirect(RequestHandler):
- def get(self, **kwargs):
- return self.redirect('/')
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- Rule('/redirect-me', name='redirect', handler=HandlerWithRedirect),
- ])
-
- client = app.get_test_client()
- response = client.get('/redirect-me', follow_redirects=True)
- self.assertEqual(response.data, 'Method: GET')
-
- def test_redirect_empty(self):
- class HandlerWithRedirect(RequestHandler):
- def get(self, **kwargs):
- return self.redirect('/', empty=True)
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- Rule('/redirect-me', name='redirect', handler=HandlerWithRedirect),
- ])
-
- client = app.get_test_client()
- response = client.get('/redirect-me', follow_redirects=False)
- self.assertEqual(response.data, '')
-
- def test_redirect_to(self):
- class HandlerWithRedirectTo(RequestHandler):
- def get(self, **kwargs):
- return self.redirect_to('home')
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- Rule('/redirect-me', name='redirect', handler=HandlerWithRedirectTo),
- ])
-
- client = app.get_test_client()
- response = client.get('/redirect-me', follow_redirects=True)
- self.assertEqual(response.data, 'Method: GET')
-
- def test_redirect_to_empty(self):
- class HandlerWithRedirectTo(RequestHandler):
- def get(self, **kwargs):
- return self.redirect_to('home', _empty=True)
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- Rule('/redirect-me', name='redirect', handler=HandlerWithRedirectTo),
- ])
-
- client = app.get_test_client()
- response = client.get('/redirect-me', follow_redirects=False)
- self.assertEqual(response.data, '')
-
- def test_redirect_relative_uris(self):
- class MyHandler(RequestHandler):
- def get(self):
- return self.redirect(self.request.args.get('redirect'))
-
- app = Tipfy(rules=[
- Rule('/foo/bar', name='test1', handler=MyHandler),
- Rule('/foo/bar/', name='test2', handler=MyHandler),
- ])
- client = app.get_test_client()
-
- response = client.get('/foo/bar/', query_string={'redirect': '/baz'})
- self.assertEqual(response.headers['Location'], '
http://localhost/baz')
-
- response = client.get('/foo/bar/', query_string={'redirect': './baz'})
-
self.assertEqual(response.headers['Location'], '
http://localhost/foo/bar/baz')
-
- response = client.get('/foo/bar/', query_string={'redirect': '../baz'})
-
self.assertEqual(response.headers['Location'], '
http://localhost/foo/baz')
-
- response = client.get('/foo/bar', query_string={'redirect': '/baz'})
- self.assertEqual(response.headers['Location'], '
http://localhost/baz')
-
- response = client.get('/foo/bar', query_string={'redirect': './baz'})
-
self.assertEqual(response.headers['Location'], '
http://localhost/foo/baz')
-
- response = client.get('/foo/bar', query_string={'redirect': '../baz'})
- self.assertEqual(response.headers['Location'], '
http://localhost/baz')
-
- def test_url_for(self):
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- Rule('/about', name='about', handler='handlers.About'),
- Rule('/contact', name='contact', handler='handlers.Contact'),
- ])
- with app.get_test_handler('/') as handler:
- self.assertEqual(handler.url_for('home'), '/')
- self.assertEqual(handler.url_for('about'), '/about')
- self.assertEqual(handler.url_for('contact'), '/contact')
-
- # Extras
- self.assertEqual(handler.url_for('about',
_anchor='history'), '/about#history')
- self.assertEqual(handler.url_for('about',
_full=True), '
http://localhost/about')
- self.assertEqual(handler.url_for('about',
_netloc='
www.google.com'), '
http://www.google.com/about')
- self.assertEqual(handler.url_for('about',
_scheme='https'), '
https://localhost/about')
-
- def test_store_instances(self):
- from tipfy.appengine.auth import AuthStore
- from tipfy.i18n import I18nStore
- from tipfy.sessions import SecureCookieSession
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- ], config={
- 'tipfy.sessions': {
- 'secret_key': 'secret',
- },
- })
- with app.get_test_handler('/') as handler:
- self.assertEqual(isinstance(handler.session, SecureCookieSession), True)
- self.assertEqual(isinstance(handler.auth, AuthStore), True)
- self.assertEqual(isinstance(handler.i18n, I18nStore), True)
+ def test_200(self):
+ app = Tipfy(rules=[Rule('/', name='home',
handler=AllMethodsHandler)])
+ client = app.get_test_client()
+
+ for method in app.allowed_methods:
+ response = client.open('/', method=method)
+ self.assertEqual(response.status_code, 200, method)
+ if method == 'HEAD':
+ self.assertEqual(response.data, '')
+ else:
+ self.assertEqual(response.data, 'Method: %s' % method)
+
+ # App Engine mode.
+ self._set_dev_server_flag(True)
+ response = client.get('/')
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'Method: GET')
+
+ def test_404(self):
+ """No URL rules defined."""
+ app = Tipfy()
+ client = app.get_test_client()
+
+ # Normal mode.
+ response = client.get('/')
+ self.assertEqual(response.status_code, 404)
+
+ # Debug mode.
+ app.debug = True
+ response = client.get('/')
+ self.assertEqual(response.status_code, 404)
+
+ def test_500(self):
+ """Handler import will fail."""
+ app = Tipfy(rules=[Rule('/', name='home',
handler='non.existent.handler')])
+ client = app.get_test_client()
+
+ # Normal mode.
+ response = client.get('/')
+ self.assertEqual(response.status_code, 500)
+
+ # Debug mode.
+ app.debug = True
+ app.config['tipfy']['enable_debugger'] = False
+ self.assertRaises(ImportError, client.get, '/')
+
+ def test_501(self):
+ """Method is not in app.allowed_methods."""
+ app = Tipfy()
+ client = app.get_test_client()
+
+ # Normal mode.
+ response = client.open('/', method='CONNECT')
+ self.assertEqual(response.status_code, 501)
+
+ # Debug mode.
+ app.debug = True
+ response = client.open('/', method='CONNECT')
+ self.assertEqual(response.status_code, 501)
+
+ def test_abort(self):
+ class HandlerWithAbort(RequestHandler):
+ def get(self, **kwargs):
+ self.abort(kwargs.get('status_code'))
+
+ app = Tipfy(rules=[
+ Rule('/<int:status_code>', name='abort-me',
handler=HandlerWithAbort),
+ ])
+ client = app.get_test_client()
+
+ response = client.get('/400')
+ self.assertEqual(response.status_code, 400)
+
+ response = client.get('/403')
+ self.assertEqual(response.status_code, 403)
+
+ response = client.get('/404')
+ self.assertEqual(response.status_code, 404)
+
+ def test_get_config(self):
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ ], config = {
+ 'foo': {
+ 'bar': 'baz',
+ }
+ })
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(handler.get_config('foo', 'bar'), 'baz')
+
+ def test_handle_exception(self):
+ app = Tipfy([
+ Rule('/', handler=AllMethodsHandler, name='home'),
+ Rule('/broken', handler=BrokenHandler, name='broken'),
+ Rule('/broken-but-fixed', handler=BrokenButFixedHandler,
name='broken-but-fixed'),
+ ], debug=False)
+ client = app.get_test_client()
+
+ response = client.get('/broken')
+ self.assertEqual(response.status_code, 500)
+
+ response = client.get('/broken-but-fixed')
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'That was close!')
+
+ def test_redirect(self):
+ class HandlerWithRedirect(RequestHandler):
+ def get(self, **kwargs):
+ return self.redirect('/')
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ Rule('/redirect-me', name='redirect',
handler=HandlerWithRedirect),
+ ])
+
+ client = app.get_test_client()
+ response = client.get('/redirect-me', follow_redirects=True)
+ self.assertEqual(response.data, 'Method: GET')
+
+ def test_redirect_empty(self):
+ class HandlerWithRedirect(RequestHandler):
+ def get(self, **kwargs):
+ return self.redirect('/', empty=True)
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ Rule('/redirect-me', name='redirect',
handler=HandlerWithRedirect),
+ ])
+
+ client = app.get_test_client()
+ response = client.get('/redirect-me', follow_redirects=False)
+ self.assertEqual(response.data, '')
+
+ def test_redirect_to(self):
+ class HandlerWithRedirectTo(RequestHandler):
+ def get(self, **kwargs):
+ return self.redirect_to('home')
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ Rule('/redirect-me', name='redirect',
handler=HandlerWithRedirectTo),
+ ])
+
+ client = app.get_test_client()
+ response = client.get('/redirect-me', follow_redirects=True)
+ self.assertEqual(response.data, 'Method: GET')
+
+ def test_redirect_to_empty(self):
+ class HandlerWithRedirectTo(RequestHandler):
+ def get(self, **kwargs):
+ return self.redirect_to('home', _empty=True)
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ Rule('/redirect-me', name='redirect',
handler=HandlerWithRedirectTo),
+ ])
+
+ client = app.get_test_client()
+ response = client.get('/redirect-me', follow_redirects=False)
+ self.assertEqual(response.data, '')
+
+ def test_redirect_relative_uris(self):
+ class MyHandler(RequestHandler):
+ def get(self):
+ return self.redirect(self.request.args.get('redirect'))
+
+ app = Tipfy(rules=[
+ Rule('/foo/bar', name='test1', handler=MyHandler),
+ Rule('/foo/bar/', name='test2', handler=MyHandler),
+ ])
+ client = app.get_test_client()
+
+ response = client.get('/foo/bar/',
query_string={'redirect': '/baz'})
+
self.assertEqual(response.headers['Location'], '
http://localhost/baz')
+
+ response = client.get('/foo/bar/',
query_string={'redirect': './baz'})
+
self.assertEqual(response.headers['Location'], '
http://localhost/foo/bar/baz')
+
+ response = client.get('/foo/bar/',
query_string={'redirect': '../baz'})
+
self.assertEqual(response.headers['Location'], '
http://localhost/foo/baz')
+
+ response = client.get('/foo/bar',
query_string={'redirect': '/baz'})
+
self.assertEqual(response.headers['Location'], '
http://localhost/baz')
+
+ response = client.get('/foo/bar',
query_string={'redirect': './baz'})
+
self.assertEqual(response.headers['Location'], '
http://localhost/foo/baz')
+
+ response = client.get('/foo/bar',
query_string={'redirect': '../baz'})
+
self.assertEqual(response.headers['Location'], '
http://localhost/baz')
+
+ def test_url_for(self):
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ Rule('/about', name='about', handler='handlers.About'),
+ Rule('/contact', name='contact', handler='handlers.Contact'),
+ ])
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(handler.url_for('home'), '/')
+ self.assertEqual(handler.url_for('about'), '/about')
+ self.assertEqual(handler.url_for('contact'), '/contact')
+
+ # Extras
+ self.assertEqual(handler.url_for('about',
_anchor='history'), '/about#history')
+ self.assertEqual(handler.url_for('about',
_full=True), '
http://localhost/about')
+ self.assertEqual(handler.url_for('about',
_netloc='
www.google.com'), '
http://www.google.com/about')
+ self.assertEqual(handler.url_for('about',
_scheme='https'), '
https://localhost/about')
+
+ def test_store_instances(self):
+ from tipfy.appengine.auth import AuthStore
+ from tipfy.i18n import I18nStore
+ from tipfy.sessions import SecureCookieSession
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ ], config={
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ },
+ })
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(isinstance(handler.session,
SecureCookieSession), True)
+ self.assertEqual(isinstance(handler.auth, AuthStore), True)
+ self.assertEqual(isinstance(handler.i18n, I18nStore), True)
class TestHandlerMiddleware(BaseTestCase):
- def test_before_dispatch(self):
- res = 'Intercepted!'
-
- class MyMiddleware(object):
- def before_dispatch(self, handler):
- return Response(res)
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- return Response('default')
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ])
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.data, res)
-
- def test_after_dispatch(self):
- res = 'Intercepted!'
-
- class MyMiddleware(object):
- def after_dispatch(self, handler, response):
- response.data += res
- return response
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- return Response('default')
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ])
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.data, 'default' + res)
-
- def test_handle_exception(self):
- res = 'Catched!'
-
- class MyMiddleware(object):
- def handle_exception(self, handler, exception):
- return Response(res)
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- raise ValueError()
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ])
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.data, res)
-
- def test_handle_exception_2(self):
- res = 'I fixed it!'
-
- class MyMiddleware(object):
- def handle_exception(self, handler, exception):
- raise ValueError()
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- raise ValueError()
-
- class ErrorHandler(RequestHandler):
- def handle_exception(self, exception):
- return Response(res)
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ], debug=False)
- app.error_handlers[500] = ErrorHandler
-
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.data, res)
-
- def test_handle_exception_3(self):
- class MyMiddleware(object):
- def handle_exception(self, handler, exception):
- pass
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- raise ValueError()
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ])
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.status_code, 500)
-
- def test_handle_exception_with_app_error_handler(self):
- class MyMiddleware(object):
- def handle_exception(self, handler, exception):
- raise ValueError()
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- raise ValueError()
-
- class ErrorHandler(RequestHandler):
- def handle_exception(self, exception):
- raise ValueError()
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ], debug=False)
- app.error_handlers[500] = ErrorHandler
-
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.status_code, 500)
+ def test_before_dispatch(self):
+ res = 'Intercepted!'
+
+ class MyMiddleware(object):
+ def before_dispatch(self, handler):
+ return Response(res)
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ return Response('default')
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ])
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.data, res)
+
+ def test_after_dispatch(self):
+ res = 'Intercepted!'
+
+ class MyMiddleware(object):
+ def after_dispatch(self, handler, response):
+ response.data += res
+ return response
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ return Response('default')
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ])
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.data, 'default' + res)
+
+ def test_handle_exception(self):
+ res = 'Catched!'
+
+ class MyMiddleware(object):
+ def handle_exception(self, handler, exception):
+ return Response(res)
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ raise ValueError()
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ])
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.data, res)
+
+ def test_handle_exception_2(self):
+ res = 'I fixed it!'
+
+ class MyMiddleware(object):
+ def handle_exception(self, handler, exception):
+ raise ValueError()
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ raise ValueError()
+
+ class ErrorHandler(RequestHandler):
+ def handle_exception(self, exception):
+ return Response(res)
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ], debug=False)
+ app.error_handlers[500] = ErrorHandler
+
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.data, res)
+
+ def test_handle_exception_3(self):
+ class MyMiddleware(object):
+ def handle_exception(self, handler, exception):
+ pass
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ raise ValueError()
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ])
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.status_code, 500)
+
+ def test_handle_exception_with_app_error_handler(self):
+ class MyMiddleware(object):
+ def handle_exception(self, handler, exception):
+ raise ValueError()
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ raise ValueError()
+
+ class ErrorHandler(RequestHandler):
+ def handle_exception(self, exception):
+ raise ValueError()
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ], debug=False)
+ app.error_handlers[500] = ErrorHandler
+
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.status_code, 500)
class TestTipfy(BaseTestCase):
- def test_custom_error_handlers(self):
- app = Tipfy([
- Rule('/', handler=AllMethodsHandler, name='home'),
- Rule('/broken', handler=BrokenHandler, name='broken'),
- ], debug=False)
- app.error_handlers = {
- 404: Handle404,
- 405: Handle405,
- 500: Handle500,
- }
- client = app.get_test_client()
-
- res = client.get('/nowhere')
- self.assertEqual(res.status_code, 404)
- self.assertEqual(res.data, '404 custom handler')
-
- res = client.put('/broken')
- self.assertEqual(res.status_code, 405)
- self.assertEqual(res.data, '405 custom handler')
- self.assertEqual(res.headers.get('Allow'), 'GET')
-
- res = client.get('/broken')
- self.assertEqual(res.status_code, 500)
- self.assertEqual(res.data, '500 custom handler')
-
- def test_store_classes(self):
- from tipfy.appengine.auth import AuthStore
- from tipfy.i18n import I18nStore
- from tipfy.sessions import SessionStore
-
- app = Tipfy()
- self.assertEqual(app.auth_store_class, AuthStore)
- self.assertEqual(app.i18n_store_class, I18nStore)
- self.assertEqual(app.session_store_class, SessionStore)
-
- def test_make_response(self):
- app = Tipfy()
- request = Request.from_values()
-
- # Empty.
- response = app.make_response(request)
- self.assertEqual(isinstance(response, app.response_class), True)
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, '')
-
- # From Response.
- response = app.make_response(request, Response('Hello, World!'))
- self.assertEqual(isinstance(response, app.response_class), True)
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'Hello, World!')
-
- # From string.
- response = app.make_response(request, 'Hello, World!')
- self.assertEqual(isinstance(response, app.response_class), True)
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'Hello, World!')
-
- # From tuple.
- response = app.make_response(request, 'Hello, World!', 404)
- self.assertEqual(isinstance(response, app.response_class), True)
- self.assertEqual(response.status_code, 404)
- self.assertEqual(response.data, 'Hello, World!')
-
- # From None.
- self.assertRaises(ValueError, app.make_response, request, None)
-
- def test_dev_run(self):
- self._set_dev_server_flag(True)
-
- sys.stdout = StringIO.StringIO()
-
- os.environ['APPLICATION_ID'] = 'my-app'
- os.environ['SERVER_SOFTWARE'] = 'Development'
- os.environ['SERVER_NAME'] = 'localhost'
- os.environ['SERVER_PORT'] = '8080'
- os.environ['REQUEST_METHOD'] = 'GET'
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- ], debug=True)
-
- app.run()
- self.assertEqual(sys.stdout.getvalue(), 'Status: 200 OK\r\nContent-Type:
text/html; charset=utf-8\r\nContent-Length: 11\r\n\r\nMethod: GET')
-
- def test_get_config(self):
- app = Tipfy(config={'tipfy': {'foo': 'bar'}})
- self.assertEqual(app.get_config('tipfy', 'foo'), 'bar')
+ def test_custom_error_handlers(self):
+ app = Tipfy([
+ Rule('/', handler=AllMethodsHandler, name='home'),
+ Rule('/broken', handler=BrokenHandler, name='broken'),
+ ], debug=False)
+ app.error_handlers = {
+ 404: Handle404,
+ 405: Handle405,
+ 500: Handle500,
+ }
+ client = app.get_test_client()
+
+ res = client.get('/nowhere')
+ self.assertEqual(res.status_code, 404)
+ self.assertEqual(res.data, '404 custom handler')
+
+ res = client.put('/broken')
+ self.assertEqual(res.status_code, 405)
+ self.assertEqual(res.data, '405 custom handler')
+ self.assertEqual(res.headers.get('Allow'), 'GET')
+
+ res = client.get('/broken')
+ self.assertEqual(res.status_code, 500)
+ self.assertEqual(res.data, '500 custom handler')
+
+ def test_store_classes(self):
+ from tipfy.appengine.auth import AuthStore
+ from tipfy.i18n import I18nStore
+ from tipfy.sessions import SessionStore
+
+ app = Tipfy()
+ self.assertEqual(app.auth_store_class, AuthStore)
+ self.assertEqual(app.i18n_store_class, I18nStore)
+ self.assertEqual(app.session_store_class, SessionStore)
+
+ def test_make_response(self):
+ app = Tipfy()
+ request = Request.from_values()
+
+ # Empty.
+ response = app.make_response(request)
+ self.assertEqual(isinstance(response, app.response_class), True)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, '')
+
+ # From Response.
+ response = app.make_response(request, Response('Hello, World!'))
+ self.assertEqual(isinstance(response, app.response_class), True)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'Hello, World!')
+
+ # From string.
+ response = app.make_response(request, 'Hello, World!')
+ self.assertEqual(isinstance(response, app.response_class), True)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'Hello, World!')
+
+ # From tuple.
+ response = app.make_response(request, 'Hello, World!', 404)
+ self.assertEqual(isinstance(response, app.response_class), True)
+ self.assertEqual(response.status_code, 404)
+ self.assertEqual(response.data, 'Hello, World!')
+
+ # From None.
+ self.assertRaises(ValueError, app.make_response, request, None)
+
+ def test_dev_run(self):
+ self._set_dev_server_flag(True)
+
+ sys.stdout = StringIO.StringIO()
+
+ os.environ['APPLICATION_ID'] = 'my-app'
+ os.environ['SERVER_SOFTWARE'] = 'Development'
+ os.environ['SERVER_NAME'] = 'localhost'
+ os.environ['SERVER_PORT'] = '8080'
+ os.environ['REQUEST_METHOD'] = 'GET'
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ ], debug=True)
+
+ app.run()
+ self.assertEqual(sys.stdout.getvalue(), 'Status: 200
OK\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length:
11\r\n\r\nMethod: GET')
+
+ def test_get_config(self):
+ app = Tipfy(config={'tipfy': {'foo': 'bar'}})
+ self.assertEqual(app.get_config('tipfy', 'foo'), 'bar')
class TestRequest(BaseTestCase):
- def test_json(self):
- class JsonHandler(RequestHandler):
- def get(self, **kwargs):
- return Response(self.request.json['foo'])
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=JsonHandler),
- ], debug=True)
-
- data = json_encode({'foo': 'bar'})
- client = app.get_test_client()
- response = client.get('/', content_type='application/json', data=data)
- self.assertEqual(response.data, 'bar')
+ def test_json(self):
+ class JsonHandler(RequestHandler):
+ def get(self, **kwargs):
+ return Response(self.request.json['foo'])
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=JsonHandler),
+ ], debug=True)
***The diff for this file has been truncated for email.***
=======================================
--- /tests/auth_test.py Sun Apr 3 07:14:04 2011
+++ /tests/auth_test.py Sun Apr 3 07:55:49 2011
@@ -8,10 +8,10 @@
import tipfy.auth
from tipfy.auth import (AdminRequiredMiddleware, LoginRequiredMiddleware,
- UserRequiredMiddleware, UserRequiredIfAuthenticatedMiddleware,
- admin_required, login_required, user_required,
- user_required_if_authenticated, check_password_hash,
generate_password_hash,
- create_session_id, MultiAuthStore)
+ UserRequiredMiddleware, UserRequiredIfAuthenticatedMiddleware,
+ admin_required, login_required, user_required,
+ user_required_if_authenticated, check_password_hash,
generate_password_hash,
+ create_session_id, MultiAuthStore)
from tipfy.appengine.auth import AuthStore, MixedAuthStore
from tipfy.appengine.auth.model import User
@@ -19,692 +19,692 @@
class LoginHandler(RequestHandler):
- def get(self, **kwargs):
- return Response('login')
+ def get(self, **kwargs):
+ return Response('login')
class LogoutHandler(RequestHandler):
- def get(self, **kwargs):
- return Response('logout')
+ def get(self, **kwargs):
+ return Response('logout')
class SignupHandler(RequestHandler):
- def get(self, **kwargs):
- return Response('signup')
+ def get(self, **kwargs):
+ return Response('signup')
class HomeHandler(RequestHandler):
- def get(self, **kwargs):
- return Response('home sweet home')
+ def get(self, **kwargs):
+ return Response('home sweet home')
def get_app():
- app = Tipfy(rules=[
- Rule('/login', name='auth/login', handler=LoginHandler),
- Rule('/logout', name='auth/logout', handler=LogoutHandler),
- Rule('/signup', name='auth/signup', handler=SignupHandler),
- ])
- return app
+ app = Tipfy(rules=[
+ Rule('/login', name='auth/login', handler=LoginHandler),
+ Rule('/logout', name='auth/logout', handler=LogoutHandler),
+ Rule('/signup', name='auth/signup', handler=SignupHandler),
+ ])
+ return app
class TestAuthStore(test_utils.BaseTestCase):
- def test_user_model(self):
- app = get_app()
- app.router.add(Rule('/', name='home', handler=HomeHandler))
-
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = AuthStore(local.current_handler)
- self.assertEqual(store.user_model, User)
-
- def test_login_url(self):
- app = get_app()
- app.router.add(Rule('/', name='home', handler=HomeHandler))
-
- with app.get_test_context() as request:
- request.app.router.match(request)
-
- store = AuthStore(request)
- self.assertEqual(store.login_url(),
request.app.router.url_for(request, 'auth/login', dict(redirect='/')))
-
- tipfy.auth.DEV_APPSERVER_APPSERVER = False
- store.config['secure_urls'] = True
- self.assertEqual(store.login_url(),
request.app.router.url_for(request, 'auth/login', dict(redirect='/',
_scheme='https')))
- tipfy.auth.DEV_APPSERVER_APPSERVER = True
-
- def test_logout_url(self):
- app = get_app()
- app.router.add(Rule('/', name='home', handler=HomeHandler))
-
- with app.get_test_context() as request:
- request.app.router.match(request)
- store = AuthStore(request)
- self.assertEqual(store.logout_url(),
request.app.router.url_for(request, 'auth/logout', dict(redirect='/')))
-
- def test_signup_url(self):
- app = get_app()
- app.router.add(Rule('/', name='home', handler=HomeHandler))
-
- with app.get_test_context() as request:
- request.app.router.match(request)
- store = AuthStore(request)
- self.assertEqual(store.signup_url(),
request.app.router.url_for(request, 'auth/signup', dict(redirect='/')))
+ def test_user_model(self):
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=HomeHandler))
+
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+
+ store = AuthStore(local.current_handler)
+ self.assertEqual(store.user_model, User)
+
+ def test_login_url(self):
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=HomeHandler))
+
+ with app.get_test_context() as request:
+ request.app.router.match(request)
+
+ store = AuthStore(request)
+ self.assertEqual(store.login_url(),
request.app.router.url_for(request, 'auth/login', dict(redirect='/')))
+
+ tipfy.auth.DEV_APPSERVER_APPSERVER = False
+ store.config['secure_urls'] = True
+ self.assertEqual(store.login_url(),
request.app.router.url_for(request, 'auth/login', dict(redirect='/',
_scheme='https')))
+ tipfy.auth.DEV_APPSERVER_APPSERVER = True
+
+ def test_logout_url(self):
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=HomeHandler))
+
+ with app.get_test_context() as request:
+ request.app.router.match(request)
+ store = AuthStore(request)
+ self.assertEqual(store.logout_url(),
request.app.router.url_for(request, 'auth/logout', dict(redirect='/')))
+
+ def test_signup_url(self):
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=HomeHandler))
+
+ with app.get_test_context() as request:
+ request.app.router.match(request)
+ store = AuthStore(request)
+ self.assertEqual(store.signup_url(),
request.app.router.url_for(request, 'auth/signup', dict(redirect='/')))
class TestMiddleware(test_utils.BaseTestCase):
- def tearDown(self):
- os.environ.pop('USER_EMAIL', None)
- os.environ.pop('USER_ID', None)
- os.environ.pop('USER_IS_ADMIN', None)
- test_utils.BaseTestCase.tearDown(self)
-
- def test_login_required_middleware_invalid(self):
- class MyHandler(HomeHandler):
- middleware = [LoginRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_login_required_decorator_invalid(self):
- class MyHandler(HomeHandler):
- @login_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_login_required_middleware(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- middleware = [LoginRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_login_required_decorator(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- @login_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_middleware_invalid(self):
- class MyHandler(HomeHandler):
- middleware = [UserRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_user_required_decorator_invalid(self):
- class MyHandler(HomeHandler):
- @user_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_user_required_middleware_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- middleware = [UserRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
-
- def test_user_required_decorator_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- @user_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
-
- def test_user_required_middleware_with_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- middleware = [UserRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_decorator_with_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- @user_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_if_authenticated_middleware(self):
- class MyHandler(HomeHandler):
- middleware = [UserRequiredIfAuthenticatedMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_if_authenticate_decorator(self):
- class MyHandler(HomeHandler):
- @user_required_if_authenticated
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_if_authenticated_middleware_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- middleware = [UserRequiredIfAuthenticatedMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
-
- def test_user_required_if_authenticate_decorator_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- @user_required_if_authenticated
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
-
- def test_user_required_if_authenticated_middleware_with_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- middleware = [UserRequiredIfAuthenticatedMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_if_authenticate_decorator_with_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- @user_required_if_authenticated
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_admin_required_middleware(self):
- class MyHandler(HomeHandler):
- middleware = [AdminRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_admin_required_decorator(self):
- class MyHandler(HomeHandler):
- @admin_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_admin_required_middleware_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- middleware = [AdminRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 403)
-
- def test_admin_required_decorator_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- @admin_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 403)
-
- def test_admin_required_middleware_with_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- middleware = [AdminRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 403)
-
- def test_admin_required_decorator_withd_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- @admin_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 403)
-
- def test_admin_required_middleware_with_admin(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me', is_admin=True)
-
- class MyHandler(HomeHandler):
- middleware = [AdminRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_admin_required_decorator_with_admin(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me', is_admin=True)
-
- class MyHandler(HomeHandler):
- @admin_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
+ def tearDown(self):
+ os.environ.pop('USER_EMAIL', None)
+ os.environ.pop('USER_ID', None)
+ os.environ.pop('USER_IS_ADMIN', None)
+ test_utils.BaseTestCase.tearDown(self)
+
+ def test_login_required_middleware_invalid(self):
+ class MyHandler(HomeHandler):
+ middleware = [LoginRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_login_required_decorator_invalid(self):
+ class MyHandler(HomeHandler):
+ @login_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_login_required_middleware(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ middleware = [LoginRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_login_required_decorator(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ @login_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_middleware_invalid(self):
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_user_required_decorator_invalid(self):
+ class MyHandler(HomeHandler):
+ @user_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_user_required_middleware_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
+
+ def test_user_required_decorator_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ @user_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
+
+ def test_user_required_middleware_with_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_decorator_with_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ @user_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_if_authenticated_middleware(self):
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredIfAuthenticatedMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_if_authenticate_decorator(self):
+ class MyHandler(HomeHandler):
+ @user_required_if_authenticated
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_if_authenticated_middleware_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredIfAuthenticatedMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
+
+ def test_user_required_if_authenticate_decorator_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ @user_required_if_authenticated
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
+
+ def test_user_required_if_authenticated_middleware_with_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredIfAuthenticatedMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_if_authenticate_decorator_with_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ @user_required_if_authenticated
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_admin_required_middleware(self):
+ class MyHandler(HomeHandler):
+ middleware = [AdminRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_admin_required_decorator(self):
+ class MyHandler(HomeHandler):
+ @admin_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_admin_required_middleware_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ middleware = [AdminRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 403)
+
+ def test_admin_required_decorator_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ @admin_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 403)
+
+ def test_admin_required_middleware_with_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ middleware = [AdminRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 403)
+
+ def test_admin_required_decorator_withd_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ @admin_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 403)
+
+ def test_admin_required_middleware_with_admin(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me', is_admin=True)
+
+ class MyHandler(HomeHandler):
+ middleware = [AdminRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_admin_required_decorator_with_admin(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me', is_admin=True)
+
+ class MyHandler(HomeHandler):
+ @admin_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
class TestUserModel(test_utils.BaseTestCase):
- def test_create(self):
- user = User.create('my_username', 'my_id')
- self.assertEqual(isinstance(user, User), True)
-
- # Second one will fail to be created.
- user = User.create('my_username', 'my_id')
- self.assertEqual(user, None)
-
- def test_create_with_password_hash(self):
- user = User.create('my_username', 'my_id', password_hash='foo')
-
- self.assertEqual(isinstance(user, User), True)
- self.assertEqual(user.password, 'foo')
-
- def test_create_with_password(self):
- user = User.create('my_username', 'my_id', password='foo')
-
- self.assertEqual(isinstance(user, User), True)
- self.assertNotEqual(user.password, 'foo')
- self.assertEqual(len(user.password.split('$')), 3)
-
- def test_set_password(self):
- user = User.create('my_username', 'my_id', password='foo')
- self.assertEqual(isinstance(user, User), True)
-
- password = user.password
-
- user.set_password('bar')
- self.assertNotEqual(user.password, password)
-
- self.assertNotEqual(user.password, 'bar')
- self.assertEqual(len(user.password.split('$')), 3)
-
- def test_check_password(self):
- app = Tipfy()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- user = User.create('my_username', 'my_id', password='foo')
-
- self.assertEqual(user.check_password('foo'), True)
- self.assertEqual(user.check_password('bar'), False)
-
- def test_check_session(self):
- app = Tipfy()
***The diff for this file has been truncated for email.***
=======================================
--- /tests/config_test.py Sat Apr 2 13:52:07 2011
+++ /tests/config_test.py Sun Apr 3 07:55:49 2011
@@ -14,357 +14,357 @@
class TestConfig(test_utils.BaseTestCase):
- def test_get(self):
- config = Config({'foo': {
- 'bar': 'baz',
- 'doo': 'ding',
- }})
-
- self.assertEqual(config.get('foo'), {
- 'bar': 'baz',
- 'doo': 'ding',
- })
-
- self.assertEqual(config.get('bar'), {})
-
- def test_get_existing_keys(self):
- config = Config({'foo': {
- 'bar': 'baz',
- 'doo': 'ding',
- }})
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- def test_get_existing_keys_from_default(self):
- config = Config({}, {'foo': {
- 'bar': 'baz',
- 'doo': 'ding',
- }})
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- def test_get_non_existing_keys(self):
- config = Config()
-
- self.assertRaises(KeyError, config.get_config, 'foo', 'bar')
-
- def test_get_dict_existing_keys(self):
- config = Config({'foo': {
- 'bar': 'baz',
- 'doo': 'ding',
- }})
-
- self.assertEqual(config.get_config('foo'), {
- 'bar': 'baz',
- 'doo': 'ding',
- })
-
- def test_get_dict_non_existing_keys(self):
- config = Config()
-
- self.assertRaises(KeyError, config.get_config, 'bar')
-
- def test_get_with_default(self):
- config = Config()
-
- self.assertRaises(KeyError, config.get_config, 'foo', 'bar', 'ooops')
- self.assertRaises(KeyError, config.get_config, 'foo', 'doo', 'wooo')
-
- def test_get_with_default_and_none(self):
- config = Config({'foo': {
- 'bar': None,
- }})
-
- self.assertEqual(config.get_config('foo', 'bar', 'ooops'), None)
-
- def test_update(self):
- config = Config({'foo': {
- 'bar': 'baz',
- 'doo': 'ding',
- }})
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- config.update('foo', {'bar': 'other'})
-
- self.assertEqual(config.get_config('foo', 'bar'), 'other')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- def test_setdefault(self):
- config = Config()
-
- self.assertRaises(KeyError, config.get_config, 'foo')
-
- config.setdefault('foo', {
- 'bar': 'baz',
- 'doo': 'ding',
- })
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- def test_setdefault2(self):
- config = Config({'foo': {
- 'bar': 'baz',
- }})
-
- self.assertEqual(config.get_config('foo'), {
- 'bar': 'baz',
- })
-
- config.setdefault('foo', {
- 'bar': 'wooo',
- 'doo': 'ding',
- })
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- def test_setitem(self):
- config = Config()
- config['foo'] = {'bar': 'baz'}
-
- self.assertEqual(config, {'foo': {'bar': 'baz'}})
- self.assertEqual(config['foo'], {'bar': 'baz'})
-
- def test_init_no_dict_values(self):
- self.assertRaises(AssertionError, Config, {'foo': 'bar'})
- self.assertRaises(AssertionError, Config, {'foo': None})
- self.assertRaises(AssertionError, Config, 'foo')
-
- def test_init_no_dict_default(self):
- self.assertRaises(AssertionError, Config, {}, {'foo': 'bar'})
- self.assertRaises(AssertionError, Config, {}, {'foo': None})
- self.assertRaises(AssertionError, Config, {}, 'foo')
-
- def test_update_no_dict_values(self):
- config = Config()
-
- self.assertRaises(AssertionError, config.update, {'foo': 'bar'}, 'baz')
- self.assertRaises(AssertionError, config.update, {'foo': None}, 'baz')
- self.assertRaises(AssertionError, config.update, 'foo', 'bar')
-
- def test_setdefault_no_dict_values(self):
- config = Config()
-
- self.assertRaises(AssertionError, config.setdefault, 'foo', 'bar')
- self.assertRaises(AssertionError, config.setdefault, 'foo', None)
-
- def test_setitem_no_dict_values(self):
- config = Config()
-
- def setitem(key, value):
- config[key] = value
- return config
-
- self.assertRaises(AssertionError, setitem, 'foo', 'bar')
- self.assertRaises(AssertionError, setitem, 'foo', None)
+ def test_get(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ }})
+
+ self.assertEqual(config.get('foo'), {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ })
+
+ self.assertEqual(config.get('bar'), {})
+
+ def test_get_existing_keys(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ def test_get_existing_keys_from_default(self):
+ config = Config({}, {'foo': {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ def test_get_non_existing_keys(self):
+ config = Config()
+
+ self.assertRaises(KeyError, config.get_config, 'foo', 'bar')
+
+ def test_get_dict_existing_keys(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ }})
+
+ self.assertEqual(config.get_config('foo'), {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ })
+
+ def test_get_dict_non_existing_keys(self):
+ config = Config()
+
+ self.assertRaises(KeyError, config.get_config, 'bar')
+
+ def test_get_with_default(self):
+ config = Config()
+
+ self.assertRaises(KeyError,
config.get_config, 'foo', 'bar', 'ooops')
+ self.assertRaises(KeyError,
config.get_config, 'foo', 'doo', 'wooo')
+
+ def test_get_with_default_and_none(self):
+ config = Config({'foo': {
+ 'bar': None,
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar', 'ooops'), None)
+
+ def test_update(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ config.update('foo', {'bar': 'other'})
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'other')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ def test_setdefault(self):
+ config = Config()
+
+ self.assertRaises(KeyError, config.get_config, 'foo')
+
+ config.setdefault('foo', {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ })
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ def test_setdefault2(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ }})
+
+ self.assertEqual(config.get_config('foo'), {
+ 'bar': 'baz',
+ })
+
+ config.setdefault('foo', {
+ 'bar': 'wooo',
+ 'doo': 'ding',
+ })
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ def test_setitem(self):
+ config = Config()
+ config['foo'] = {'bar': 'baz'}
+
+ self.assertEqual(config, {'foo': {'bar': 'baz'}})
+ self.assertEqual(config['foo'], {'bar': 'baz'})
+
+ def test_init_no_dict_values(self):
+ self.assertRaises(AssertionError, Config, {'foo': 'bar'})
+ self.assertRaises(AssertionError, Config, {'foo': None})
+ self.assertRaises(AssertionError, Config, 'foo')
+
+ def test_init_no_dict_default(self):
+ self.assertRaises(AssertionError, Config, {}, {'foo': 'bar'})
+ self.assertRaises(AssertionError, Config, {}, {'foo': None})
+ self.assertRaises(AssertionError, Config, {}, 'foo')
+
+ def test_update_no_dict_values(self):
+ config = Config()
+
+ self.assertRaises(AssertionError, config.update,
{'foo': 'bar'}, 'baz')
+ self.assertRaises(AssertionError, config.update, {'foo':
None}, 'baz')
+ self.assertRaises(AssertionError, config.update, 'foo', 'bar')
+
+ def test_setdefault_no_dict_values(self):
+ config = Config()
+
+ self.assertRaises(AssertionError, config.setdefault, 'foo', 'bar')
+ self.assertRaises(AssertionError, config.setdefault, 'foo', None)
+
+ def test_setitem_no_dict_values(self):
+ config = Config()
+
+ def setitem(key, value):
+ config[key] = value
+ return config
+
+ self.assertRaises(AssertionError, setitem, 'foo', 'bar')
+ self.assertRaises(AssertionError, setitem, 'foo', None)
class TestLoadConfig(test_utils.BaseTestCase):
- def test_default_config(self):
- config = Config()
-
- from resources.template import default_config as template_config
- from resources.i18n import default_config as i18n_config
-
-
self.assertEqual(config.get_config('resources.template', 'templates_dir'),
template_config['templates_dir'])
- self.assertEqual(config.get_config('resources.i18n', 'locale'),
i18n_config['locale'])
- self.assertEqual(config.get_config('resources.i18n', 'timezone'),
i18n_config['timezone'])
-
- def test_default_config_with_non_existing_key(self):
- config = Config()
-
- from resources.i18n import default_config as i18n_config
-
- # In the first time the module config will be loaded normally.
- self.assertEqual(config.get_config('resources.i18n', 'locale'),
i18n_config['locale'])
-
- # In the second time it won't be loaded, but won't find the value and
then use the default.
-
self.assertEqual(config.get_config('resources.i18n', 'i_dont_exist', 'foo'), 'foo')
-
- def test_override_config(self):
- config = Config({
- 'resources.template': {
- 'templates_dir': 'apps/templates'
- },
- 'resources.i18n': {
- 'locale': 'pt_BR',
- 'timezone': 'America/Sao_Paulo',
- },
- })
-
-
self.assertEqual(config.get_config('resources.template', 'templates_dir'), 'apps/templates')
- self.assertEqual(config.get_config('resources.i18n', 'locale'), 'pt_BR')
-
self.assertEqual(config.get_config('resources.i18n', 'timezone'), 'America/Sao_Paulo')
-
- def test_override_config2(self):
- config = Config({
- 'resources.i18n': {
- 'timezone': 'America/Sao_Paulo',
- },
- })
-
- self.assertEqual(config.get_config('resources.i18n', 'locale'), 'en_US')
-
self.assertEqual(config.get_config('resources.i18n', 'timezone'), 'America/Sao_Paulo')
-
- def test_get(self):
- config = Config({'foo': {
- 'bar': 'baz',
- }})
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
-
- def test_get_with_default(self):
- config = Config()
-
-
self.assertEqual(config.get_config('resources.i18n', 'bar', 'baz'), 'baz')
-
- def test_get_with_default_and_none(self):
- config = Config({'foo': {
- 'bar': None,
- }})
-
- self.assertEqual(config.get_config('foo', 'bar', 'ooops'), None)
-
- def test_get_with_default_and_module_load(self):
- config = Config()
- self.assertEqual(config.get_config('resources.i18n', 'locale'), 'en_US')
-
self.assertEqual(config.get_config('resources.i18n', 'locale', 'foo'), 'en_US')
-
- def test_required_config(self):
- config = Config()
- self.assertRaises(KeyError, config.get_config, 'resources.i18n', 'foo')
-
- def test_missing_module(self):
- config = Config()
- self.assertRaises(KeyError,
config.get_config, 'i_dont_exist', 'i_dont_exist')
-
- def test_missing_module2(self):
- config = Config()
- self.assertRaises(KeyError, config.get_config, 'i_dont_exist')
-
- def test_missing_key(self):
- config = Config()
- self.assertRaises(KeyError,
config.get_config, 'resources.i18n', 'i_dont_exist')
-
- def test_missing_default_config(self):
- config = Config()
- self.assertRaises(KeyError, config.get_config, 'tipfy', 'foo')
-
- def test_request_handler_get_config(self):
- app = Tipfy()
- with app.get_test_context() as request:
- handler = RequestHandler(request)
-
-
self.assertEqual(handler.get_config('resources.i18n', 'locale'), 'en_US')
-
self.assertEqual(handler.get_config('resources.i18n', 'locale', 'foo'), 'en_US')
- self.assertEqual(handler.get_config('resources.i18n'), {
- 'locale': 'en_US',
- 'timezone': 'America/Chicago',
- 'required': REQUIRED_VALUE,
- })
+ def test_default_config(self):
+ config = Config()
+
+ from resources.template import default_config as template_config
+ from resources.i18n import default_config as i18n_config
+
+
self.assertEqual(config.get_config('resources.template', 'templates_dir'),
template_config['templates_dir'])
+ self.assertEqual(config.get_config('resources.i18n', 'locale'),
i18n_config['locale'])
+ self.assertEqual(config.get_config('resources.i18n', 'timezone'),
i18n_config['timezone'])
+
+ def test_default_config_with_non_existing_key(self):
+ config = Config()
+
+ from resources.i18n import default_config as i18n_config
+
+ # In the first time the module config will be loaded normally.
+ self.assertEqual(config.get_config('resources.i18n', 'locale'),
i18n_config['locale'])
+
+ # In the second time it won't be loaded, but won't find the value
and then use the default.
+
self.assertEqual(config.get_config('resources.i18n', 'i_dont_exist', 'foo'), 'foo')
+
+ def test_override_config(self):
+ config = Config({
+ 'resources.template': {
+ 'templates_dir': 'apps/templates'
+ },
+ 'resources.i18n': {
+ 'locale': 'pt_BR',
+ 'timezone': 'America/Sao_Paulo',
+ },
+ })
+
+
self.assertEqual(config.get_config('resources.template', 'templates_dir'), 'apps/templates')
+
self.assertEqual(config.get_config('resources.i18n', 'locale'), 'pt_BR')
+
self.assertEqual(config.get_config('resources.i18n', 'timezone'), 'America/Sao_Paulo')
+
+ def test_override_config2(self):
+ config = Config({
+ 'resources.i18n': {
+ 'timezone': 'America/Sao_Paulo',
+ },
+ })
+
+
self.assertEqual(config.get_config('resources.i18n', 'locale'), 'en_US')
+
self.assertEqual(config.get_config('resources.i18n', 'timezone'), 'America/Sao_Paulo')
+
+ def test_get(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+
+ def test_get_with_default(self):
+ config = Config()
+
+
self.assertEqual(config.get_config('resources.i18n', 'bar', 'baz'), 'baz')
+
+ def test_get_with_default_and_none(self):
+ config = Config({'foo': {
+ 'bar': None,
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar', 'ooops'), None)
+
+ def test_get_with_default_and_module_load(self):
+ config = Config()
+
self.assertEqual(config.get_config('resources.i18n', 'locale'), 'en_US')
+
self.assertEqual(config.get_config('resources.i18n', 'locale', 'foo'), 'en_US')
+
+ def test_required_config(self):
+ config = Config()
+ self.assertRaises(KeyError,
config.get_config, 'resources.i18n', 'foo')
+
+ def test_missing_module(self):
+ config = Config()
+ self.assertRaises(KeyError,
config.get_config, 'i_dont_exist', 'i_dont_exist')
+
+ def test_missing_module2(self):
+ config = Config()
+ self.assertRaises(KeyError, config.get_config, 'i_dont_exist')
+
+ def test_missing_key(self):
+ config = Config()
+ self.assertRaises(KeyError,
config.get_config, 'resources.i18n', 'i_dont_exist')
+
+ def test_missing_default_config(self):
+ config = Config()
+ self.assertRaises(KeyError, config.get_config, 'tipfy', 'foo')
+
+ def test_request_handler_get_config(self):
+ app = Tipfy()
+ with app.get_test_context() as request:
+ handler = RequestHandler(request)
+
+
self.assertEqual(handler.get_config('resources.i18n', 'locale'), 'en_US')
+
self.assertEqual(handler.get_config('resources.i18n', 'locale', 'foo'), 'en_US')
+ self.assertEqual(handler.get_config('resources.i18n'), {
+ 'locale': 'en_US',
+ 'timezone': 'America/Chicago',
+ 'required': REQUIRED_VALUE,
+ })
class TestLoadConfigGetItem(test_utils.BaseTestCase):
- def test_default_config(self):
- config = Config()
-
- from resources.template import default_config as template_config
- from resources.i18n import default_config as i18n_config
-
- self.assertEqual(config['resources.template']['templates_dir'],
template_config['templates_dir'])
- self.assertEqual(config['resources.i18n']['locale'],
i18n_config['locale'])
- self.assertEqual(config['resources.i18n']['timezone'],
i18n_config['timezone'])
-
- def test_default_config_with_non_existing_key(self):
- config = Config()
-
- from resources.i18n import default_config as i18n_config
-
- # In the first time the module config will be loaded normally.
- self.assertEqual(config['resources.i18n']['locale'],
i18n_config['locale'])
-
- # In the second time it won't be loaded, but won't find the value and
then use the default.
-
self.assertEqual(config['resources.i18n'].get('i_dont_exist', 'foo'), 'foo')
-
- def test_override_config(self):
- config = Config({
- 'resources.template': {
- 'templates_dir': 'apps/templates'
- },
- 'resources.i18n': {
- 'locale': 'pt_BR',
- 'timezone': 'America/Sao_Paulo',
- },
- })
-
-
self.assertEqual(config['resources.template']['templates_dir'], 'apps/templates')
- self.assertEqual(config['resources.i18n']['locale'], 'pt_BR')
-
self.assertEqual(config['resources.i18n']['timezone'], 'America/Sao_Paulo')
-
- def test_override_config2(self):
- config = Config({
- 'resources.i18n': {
- 'timezone': 'America/Sao_Paulo',
- },
- })
-
- self.assertEqual(config['resources.i18n']['locale'], 'en_US')
-
self.assertEqual(config['resources.i18n']['timezone'], 'America/Sao_Paulo')
-
- def test_get(self):
- config = Config({'foo': {
- 'bar': 'baz',
- }})
-
- self.assertEqual(config['foo']['bar'], 'baz')
-
- def test_get_with_default(self):
- config = Config()
-
- self.assertEqual(config['resources.i18n'].get('bar', 'baz'), 'baz')
-
- def test_get_with_default_and_none(self):
- config = Config({'foo': {
- 'bar': None,
- }})
-
- self.assertEqual(config['foo'].get('bar', 'ooops'), None)
-
- def test_get_with_default_and_module_load(self):
- config = Config()
- self.assertEqual(config['resources.i18n']['locale'], 'en_US')
- self.assertEqual(config['resources.i18n'].get('locale', 'foo'), 'en_US')
-
- def test_required_config(self):
- config = Config()
- self.assertRaises(KeyError, config['resources.i18n'].__getitem__, 'foo')
- self.assertRaises(KeyError,
config['resources.i18n'].__getitem__, 'required')
-
- def test_missing_module(self):
- config = Config()
- self.assertRaises(KeyError, config.__getitem__, 'i_dont_exist')
-
- def test_missing_key(self):
- config = Config()
- self.assertRaises(KeyError,
config['resources.i18n'].__getitem__, 'i_dont_exist')
-
- def test_missing_default_config(self):
- config = Config()
- self.assertRaises(KeyError, config['tipfy'].__getitem__, 'foo')
+ def test_default_config(self):
+ config = Config()
+
+ from resources.template import default_config as template_config
+ from resources.i18n import default_config as i18n_config
+
+ self.assertEqual(config['resources.template']['templates_dir'],
template_config['templates_dir'])
+ self.assertEqual(config['resources.i18n']['locale'],
i18n_config['locale'])
+ self.assertEqual(config['resources.i18n']['timezone'],
i18n_config['timezone'])
+
+ def test_default_config_with_non_existing_key(self):
+ config = Config()
+
+ from resources.i18n import default_config as i18n_config
+
+ # In the first time the module config will be loaded normally.
+ self.assertEqual(config['resources.i18n']['locale'],
i18n_config['locale'])
+
+ # In the second time it won't be loaded, but won't find the value
and then use the default.
+
self.assertEqual(config['resources.i18n'].get('i_dont_exist', 'foo'), 'foo')
+
+ def test_override_config(self):
+ config = Config({
+ 'resources.template': {
+ 'templates_dir': 'apps/templates'
+ },
+ 'resources.i18n': {
+ 'locale': 'pt_BR',
+ 'timezone': 'America/Sao_Paulo',
+ },
+ })
+
+
self.assertEqual(config['resources.template']['templates_dir'], 'apps/templates')
+ self.assertEqual(config['resources.i18n']['locale'], 'pt_BR')
+
self.assertEqual(config['resources.i18n']['timezone'], 'America/Sao_Paulo')
+
+ def test_override_config2(self):
+ config = Config({
+ 'resources.i18n': {
+ 'timezone': 'America/Sao_Paulo',
+ },
+ })
+
+ self.assertEqual(config['resources.i18n']['locale'], 'en_US')
+
self.assertEqual(config['resources.i18n']['timezone'], 'America/Sao_Paulo')
+
+ def test_get(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ }})
+
+ self.assertEqual(config['foo']['bar'], 'baz')
+
+ def test_get_with_default(self):
+ config = Config()
+
+ self.assertEqual(config['resources.i18n'].get('bar', 'baz'), 'baz')
+
+ def test_get_with_default_and_none(self):
+ config = Config({'foo': {
+ 'bar': None,
+ }})
+
+ self.assertEqual(config['foo'].get('bar', 'ooops'), None)
+
+ def test_get_with_default_and_module_load(self):
+ config = Config()
+ self.assertEqual(config['resources.i18n']['locale'], 'en_US')
+
self.assertEqual(config['resources.i18n'].get('locale', 'foo'), 'en_US')
+
+ def test_required_config(self):
+ config = Config()
+ self.assertRaises(KeyError,
config['resources.i18n'].__getitem__, 'foo')
+ self.assertRaises(KeyError,
config['resources.i18n'].__getitem__, 'required')
+
+ def test_missing_module(self):
+ config = Config()
+ self.assertRaises(KeyError, config.__getitem__, 'i_dont_exist')
+
+ def test_missing_key(self):
+ config = Config()
+ self.assertRaises(KeyError,
config['resources.i18n'].__getitem__, 'i_dont_exist')
+
+ def test_missing_default_config(self):
+ config = Config()
+ self.assertRaises(KeyError, config['tipfy'].__getitem__, 'foo')
class TestGetConfig(test_utils.BaseTestCase):
- '''
- def test_get_config(self):
- app = Tipfy()
- self.assertEqual(get_config('resources.i18n', 'locale'), 'en_US')
- '''
+ '''
+ def test_get_config(self):
+ app = Tipfy()
+ self.assertEqual(get_config('resources.i18n', 'locale'), 'en_US')
+ '''
if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/gae_blobstore_test.py Sat Apr 2 13:52:07 2011
+++ /tests/gae_blobstore_test.py Sun Apr 3 07:55:49 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfy.appengine.blobstore
+ Tests for tipfy.appengine.blobstore
"""
import datetime
import decimal
@@ -12,7 +12,7 @@
from google.appengine.ext import blobstore
from tipfy.appengine.blobstore import (CreationFormatError,
parse_blob_info,
- parse_creation)
+ parse_creation)
from werkzeug import FileStorage
@@ -20,77 +20,77 @@
class TestParseCreation(test_utils.BaseTestCase):
- """YYYY-mm-dd HH:MM:SS.ffffff"""
- def test_invalid_format(self):
- self.assertRaises(CreationFormatError, parse_creation, '2010-05-20
6:20:35', 'my_field_name')
-
- def test_invalid_format2(self):
- self.assertRaises(CreationFormatError, parse_creation, '2010-05-20
6:20:35.1234.5678', 'my_field_name')
-
- def test_invalid_format3(self):
- self.assertRaises(CreationFormatError,
parse_creation, 'youcannot.parseme', 'my_field_name')
-
- def test_parse(self):
- timestamp = time.time()
- parts = str(timestamp).split('.', 1)
- ms = parts[1][:4]
- timestamp = decimal.Decimal(parts[0] + '.' + ms)
- curr_date = datetime.datetime.fromtimestamp(timestamp)
-
- to_convert = '%s.%s' % (curr_date.strftime('%Y-%m-%d %H:%M:%S'), ms)
-
- res = parse_creation(to_convert, 'my_field_name')
-
- self.assertEqual(res.timetuple(), curr_date.timetuple())
+ """YYYY-mm-dd HH:MM:SS.ffffff"""
+ def test_invalid_format(self):
+ self.assertRaises(CreationFormatError, parse_creation, '2010-05-20
6:20:35', 'my_field_name')
+
+ def test_invalid_format2(self):
+ self.assertRaises(CreationFormatError, parse_creation, '2010-05-20
6:20:35.1234.5678', 'my_field_name')
+
+ def test_invalid_format3(self):
+ self.assertRaises(CreationFormatError,
parse_creation, 'youcannot.parseme', 'my_field_name')
+
+ def test_parse(self):
+ timestamp = time.time()
+ parts = str(timestamp).split('.', 1)
+ ms = parts[1][:4]
+ timestamp = decimal.Decimal(parts[0] + '.' + ms)
+ curr_date = datetime.datetime.fromtimestamp(timestamp)
+
+ to_convert = '%s.%s' % (curr_date.strftime('%Y-%m-%d %H:%M:%S'),
ms)
+
+ res = parse_creation(to_convert, 'my_field_name')
+
+ self.assertEqual(res.timetuple(), curr_date.timetuple())
class TestParseBlobInfo(test_utils.BaseTestCase):
- def test_none(self):
- self.assertEqual(parse_blob_info(None, 'my_field_name'), None)
-
- def test_file(self):
- stream = StringIO.StringIO()
- stream.write("""\
+ def test_none(self):
+ self.assertEqual(parse_blob_info(None, 'my_field_name'), None)
+
+ def test_file(self):
+ stream = StringIO.StringIO()
+ stream.write("""\
Content-Type: application/octet-stream
Content-Length: 1
X-AppEngine-Upload-Creation: 2010-10-01 05:34:00.000000
""")
- stream.seek(0)
- headers = {}
- headers['Content-Type'] = 'image/png; blob-key=foo'
-
- f = FileStorage(stream=stream, headers=headers)
- self.assertNotEqual(parse_blob_info(f, 'my_field_name'), None)
-
- def test_invalid_size(self):
- stream = StringIO.StringIO()
- stream.write("""\
+ stream.seek(0)
+ headers = {}
+ headers['Content-Type'] = 'image/png; blob-key=foo'
+
+ f = FileStorage(stream=stream, headers=headers)
+ self.assertNotEqual(parse_blob_info(f, 'my_field_name'), None)
+
+ def test_invalid_size(self):
+ stream = StringIO.StringIO()
+ stream.write("""\
Content-Type: application/octet-stream
Content-Length: zzz
X-AppEngine-Upload-Creation: 2010-10-01 05:34:00.000000
""")
- stream.seek(0)
- headers = {}
- headers['Content-Type'] = 'image/png; blob-key=foo'
-
- f = FileStorage(stream=stream, headers=headers)
- self.assertRaises(blobstore.BlobInfoParseError,
parse_blob_info,f, 'my_field_name')
-
- def test_invalid_CREATION(self):
- stream = StringIO.StringIO()
- stream.write("""\
+ stream.seek(0)
+ headers = {}
+ headers['Content-Type'] = 'image/png; blob-key=foo'
+
+ f = FileStorage(stream=stream, headers=headers)
+ self.assertRaises(blobstore.BlobInfoParseError,
parse_blob_info,f, 'my_field_name')
+
+ def test_invalid_CREATION(self):
+ stream = StringIO.StringIO()
+ stream.write("""\
Content-Type: application/octet-stream
Content-Length: 1
X-AppEngine-Upload-Creation: XXX
""")
- stream.seek(0)
- headers = {}
- headers['Content-Type'] = 'image/png; blob-key=foo'
-
- f = FileStorage(stream=stream, headers=headers)
- self.assertRaises(blobstore.BlobInfoParseError,
parse_blob_info,f, 'my_field_name')
+ stream.seek(0)
+ headers = {}
+ headers['Content-Type'] = 'image/png; blob-key=foo'
+
+ f = FileStorage(stream=stream, headers=headers)
+ self.assertRaises(blobstore.BlobInfoParseError,
parse_blob_info,f, 'my_field_name')
if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/gae_db_test.py Sat Apr 2 13:52:07 2011
+++ /tests/gae_db_test.py Sun Apr 3 07:55:49 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfy.appengine.db
+ Tests for tipfy.appengine.db
"""
import unittest
import hashlib
@@ -16,647 +16,647 @@
class FooModel(db.Model):
- name = db.StringProperty(required=True)
- name2 = db.StringProperty()
- age = db.IntegerProperty()
- married = db.BooleanProperty()
- data = ext_db.PickleProperty()
- slug = ext_db.SlugProperty(name)
- slug2 = ext_db.SlugProperty(name2, default='some-default-value',
max_length=20)
- etag = ext_db.EtagProperty(name)
- etag2 = ext_db.EtagProperty(name2)
- somekey = ext_db.KeyProperty()
+ name = db.StringProperty(required=True)
+ name2 = db.StringProperty()
+ age = db.IntegerProperty()
+ married = db.BooleanProperty()
+ data = ext_db.PickleProperty()
+ slug = ext_db.SlugProperty(name)
+ slug2 = ext_db.SlugProperty(name2, default='some-default-value',
max_length=20)
+ etag = ext_db.EtagProperty(name)
+ etag2 = ext_db.EtagProperty(name2)
+ somekey = ext_db.KeyProperty()
class FooExpandoModel(db.Expando):
- pass
+ pass
class BarModel(db.Model):
- foo = db.ReferenceProperty(FooModel)
+ foo = db.ReferenceProperty(FooModel)
class JsonModel(db.Model):
- data = ext_db.JsonProperty()
+ data = ext_db.JsonProperty()
class TimezoneModel(db.Model):
- data = ext_db.TimezoneProperty()
+ data = ext_db.TimezoneProperty()
@ext_db.retry_on_timeout(retries=3, interval=0.1)
def test_timeout_1(**kwargs):
- counter = kwargs.get('counter')
-
- # Let it pass only in the last attempt
- if counter[0] < 3:
- counter[0] += 1
- raise db.Timeout()
+ counter = kwargs.get('counter')
+
+ # Let it pass only in the last attempt
+ if counter[0] < 3:
+ counter[0] += 1
+ raise db.Timeout()
@ext_db.retry_on_timeout(retries=5, interval=0.1)
def test_timeout_2(**kwargs):
- counter = kwargs.get('counter')
-
- # Let it pass only in the last attempt
- if counter[0] < 5:
- counter[0] += 1
- raise db.Timeout()
-
- raise ValueError()
+ counter = kwargs.get('counter')
+
+ # Let it pass only in the last attempt
+ if counter[0] < 5:
+ counter[0] += 1
+ raise db.Timeout()
+
+ raise ValueError()
@ext_db.retry_on_timeout(retries=2, interval=0.1)
def test_timeout_3(**kwargs):
- # Never let it pass.
- counter = kwargs.get('counter')
- counter[0] += 1
- raise db.Timeout()
+ # Never let it pass.
+ counter = kwargs.get('counter')
+ counter[0] += 1
+ raise db.Timeout()
class TestModel(test_utils.BaseTestCase):
- def test_no_protobuf_from_entity(self):
- res_1 = ext_db.get_entity_from_protobuf([])
- self.assertEqual(res_1, None)
- res_2 = ext_db.get_protobuf_from_entity(None)
- self.assertEqual(res_2, None)
-
- def test_no_entity_from_protobuf(self):
- res_1 = ext_db.get_entity_from_protobuf([])
- self.assertEqual(res_1, None)
-
- def test_one_model_to_and_from_protobuf(self):
- entity_1 = FooModel(name='foo', age=15, married=False)
- entity_1.put()
-
- pb_1 = ext_db.get_protobuf_from_entity(entity_1)
-
- entity_1 = ext_db.get_entity_from_protobuf(pb_1)
- self.assertEqual(isinstance(entity_1, FooModel), True)
- self.assertEqual(
entity_1.name, 'foo')
- self.assertEqual(entity_1.age, 15)
- self.assertEqual(entity_1.married, False)
-
- def test_many_models_to_and_from_protobuf(self):
- entity_1 = FooModel(name='foo', age=15, married=False)
- entity_1.put()
- entity_2 = FooModel(name='bar', age=30, married=True)
- entity_2.put()
- entity_3 = FooModel(name='baz', age=45, married=False)
- entity_3.put()
-
- pbs = ext_db.get_protobuf_from_entity([entity_1, entity_2, entity_3])
- self.assertEqual(len(pbs), 3)
-
- entity_1, entity_2, entity_3 = ext_db.get_entity_from_protobuf(pbs)
- self.assertEqual(isinstance(entity_1, FooModel), True)
- self.assertEqual(
entity_1.name, 'foo')
- self.assertEqual(entity_1.age, 15)
- self.assertEqual(entity_1.married, False)
-
- self.assertEqual(isinstance(entity_2, FooModel), True)
- self.assertEqual(
entity_2.name, 'bar')
- self.assertEqual(entity_2.age, 30)
- self.assertEqual(entity_2.married, True)
-
- self.assertEqual(isinstance(entity_3, FooModel), True)
- self.assertEqual(
entity_3.name, 'baz')
- self.assertEqual(entity_3.age, 45)
- self.assertEqual(entity_3.married, False)
-
- def test_get_protobuf_from_entity_using_dict(self):
- entity_1 = FooModel(name='foo', age=15, married=False)
- entity_1.put()
- entity_2 = FooModel(name='bar', age=30, married=True)
- entity_2.put()
- entity_3 = FooModel(name='baz', age=45, married=False)
- entity_3.put()
-
- entity_dict = {'entity_1': entity_1, 'entity_2': entity_2, 'entity_3':
entity_3,}
-
- pbs = ext_db.get_protobuf_from_entity(entity_dict)
-
- entities = ext_db.get_entity_from_protobuf(pbs)
- entity_1 = entities['entity_1']
- entity_2 = entities['entity_2']
- entity_3 = entities['entity_3']
-
- self.assertEqual(isinstance(entity_1, FooModel), True)
- self.assertEqual(
entity_1.name, 'foo')
- self.assertEqual(entity_1.age, 15)
- self.assertEqual(entity_1.married, False)
-
- self.assertEqual(isinstance(entity_2, FooModel), True)
- self.assertEqual(
entity_2.name, 'bar')
- self.assertEqual(entity_2.age, 30)
- self.assertEqual(entity_2.married, True)
-
- self.assertEqual(isinstance(entity_3, FooModel), True)
- self.assertEqual(
entity_3.name, 'baz')
- self.assertEqual(entity_3.age, 45)
- self.assertEqual(entity_3.married, False)
-
- def test_get_or_insert_with_flag(self):
- entity, flag = ext_db.get_or_insert_with_flag(FooModel, 'foo',
name='foo', age=15, married=False)
- self.assertEqual(flag, True)
- self.assertEqual(
entity.name, 'foo')
- self.assertEqual(entity.age, 15)
- self.assertEqual(entity.married, False)
-
- entity, flag = ext_db.get_or_insert_with_flag(FooModel, 'foo',
name='bar', age=30, married=True)
- self.assertEqual(flag, False)
- self.assertEqual(
entity.name, 'foo')
- self.assertEqual(entity.age, 15)
- self.assertEqual(entity.married, False)
-
- def test_get_reference_key(self):
- entity_1 = FooModel(name='foo', age=15, married=False)
- entity_1.put()
- entity_1_key = str(entity_1.key())
-
- entity_2 = BarModel(key_name='first_bar', foo=entity_1)
- entity_2.put()
-
- entity_1.delete()
- entity_3 = BarModel.get_by_key_name('first_bar')
- # Won't resolve, but we can still get the key value.
- self.assertRaises(db.Error, getattr, entity_3, 'foo')
- self.assertEqual(str(ext_db.get_reference_key(entity_3, 'foo')),
entity_1_key)
-
- def test_get_reference_key_2(self):
- # Set a book entity with an author reference.
- class Author(db.Model):
- name = db.StringProperty()
-
- class Book(db.Model):
- title = db.StringProperty()
- author = db.ReferenceProperty(Author)
-
- author = Author(name='Stephen King')
- author.put()
-
- book = Book(key_name='the-shining', title='The Shining', author=author)
- book.put()
-
- # Now let's fetch the book and get the author key without fetching it.
- fetched_book = Book.get_by_key_name('the-shining')
- self.assertEqual(str(ext_db.get_reference_key(fetched_book, 'author')),
str(author.key()))
-
-
#===========================================================================
- # db.populate_entity
-
#===========================================================================
- def test_populate_entity(self):
- entity_1 = FooModel(name='foo', age=15, married=False)
- entity_1.put()
-
- self.assertEqual(
entity_1.name, 'foo')
- self.assertEqual(entity_1.age, 15)
- self.assertEqual(entity_1.married, False)
-
- ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
- entity_1.put()
-
- self.assertEqual(
entity_1.name, 'bar')
- self.assertEqual(entity_1.age, 20)
- self.assertEqual(entity_1.married, True)
-
- def test_populate_entity_2(self):
- entity_1 = FooModel(name='foo', age=15, married=False)
- entity_1.put()
-
- self.assertEqual(
entity_1.name, 'foo')
- self.assertEqual(entity_1.age, 15)
- self.assertEqual(entity_1.married, False)
-
- ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
- entity_1.put()
-
- self.assertRaises(AttributeError, getattr, entity_1, 'city')
-
- def test_populate_expando_entity(self):
- entity_1 = FooExpandoModel(name='foo', age=15, married=False)
- entity_1.put()
-
- self.assertEqual(
entity_1.name, 'foo')
- self.assertEqual(entity_1.age, 15)
- self.assertEqual(entity_1.married, False)
-
- ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
- entity_1.put()
-
- self.assertEqual(
entity_1.name, 'bar')
- self.assertEqual(entity_1.age, 20)
- self.assertEqual(entity_1.married, True)
-
- def test_populate_expando_entity_2(self):
- entity_1 = FooExpandoModel(name='foo', age=15, married=False)
- entity_1.put()
-
- self.assertEqual(
entity_1.name, 'foo')
- self.assertEqual(entity_1.age, 15)
- self.assertEqual(entity_1.married, False)
-
- ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
- entity_1.put()
-
- self.assertRaises(AttributeError, getattr, entity_1, 'city')
+ def test_no_protobuf_from_entity(self):
+ res_1 = ext_db.get_entity_from_protobuf([])
+ self.assertEqual(res_1, None)
+ res_2 = ext_db.get_protobuf_from_entity(None)
+ self.assertEqual(res_2, None)
+
+ def test_no_entity_from_protobuf(self):
+ res_1 = ext_db.get_entity_from_protobuf([])
+ self.assertEqual(res_1, None)
+
+ def test_one_model_to_and_from_protobuf(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ pb_1 = ext_db.get_protobuf_from_entity(entity_1)
+
+ entity_1 = ext_db.get_entity_from_protobuf(pb_1)
+ self.assertEqual(isinstance(entity_1, FooModel), True)
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ def test_many_models_to_and_from_protobuf(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+ entity_2 = FooModel(name='bar', age=30, married=True)
+ entity_2.put()
+ entity_3 = FooModel(name='baz', age=45, married=False)
+ entity_3.put()
+
+ pbs = ext_db.get_protobuf_from_entity([entity_1, entity_2,
entity_3])
+ self.assertEqual(len(pbs), 3)
+
+ entity_1, entity_2, entity_3 = ext_db.get_entity_from_protobuf(pbs)
+ self.assertEqual(isinstance(entity_1, FooModel), True)
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ self.assertEqual(isinstance(entity_2, FooModel), True)
+ self.assertEqual(
entity_2.name, 'bar')
+ self.assertEqual(entity_2.age, 30)
+ self.assertEqual(entity_2.married, True)
+
+ self.assertEqual(isinstance(entity_3, FooModel), True)
+ self.assertEqual(
entity_3.name, 'baz')
+ self.assertEqual(entity_3.age, 45)
+ self.assertEqual(entity_3.married, False)
+
+ def test_get_protobuf_from_entity_using_dict(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+ entity_2 = FooModel(name='bar', age=30, married=True)
+ entity_2.put()
+ entity_3 = FooModel(name='baz', age=45, married=False)
+ entity_3.put()
+
+ entity_dict = {'entity_1': entity_1, 'entity_2':
entity_2, 'entity_3': entity_3,}
+
+ pbs = ext_db.get_protobuf_from_entity(entity_dict)
+
+ entities = ext_db.get_entity_from_protobuf(pbs)
+ entity_1 = entities['entity_1']
+ entity_2 = entities['entity_2']
+ entity_3 = entities['entity_3']
+
+ self.assertEqual(isinstance(entity_1, FooModel), True)
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ self.assertEqual(isinstance(entity_2, FooModel), True)
+ self.assertEqual(
entity_2.name, 'bar')
+ self.assertEqual(entity_2.age, 30)
+ self.assertEqual(entity_2.married, True)
+
+ self.assertEqual(isinstance(entity_3, FooModel), True)
+ self.assertEqual(
entity_3.name, 'baz')
+ self.assertEqual(entity_3.age, 45)
+ self.assertEqual(entity_3.married, False)
+
+ def test_get_or_insert_with_flag(self):
+ entity, flag = ext_db.get_or_insert_with_flag(FooModel, 'foo',
name='foo', age=15, married=False)
+ self.assertEqual(flag, True)
+ self.assertEqual(
entity.name, 'foo')
+ self.assertEqual(entity.age, 15)
+ self.assertEqual(entity.married, False)
+
+ entity, flag = ext_db.get_or_insert_with_flag(FooModel, 'foo',
name='bar', age=30, married=True)
+ self.assertEqual(flag, False)
+ self.assertEqual(
entity.name, 'foo')
+ self.assertEqual(entity.age, 15)
+ self.assertEqual(entity.married, False)
+
+ def test_get_reference_key(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+ entity_1_key = str(entity_1.key())
+
+ entity_2 = BarModel(key_name='first_bar', foo=entity_1)
+ entity_2.put()
+
+ entity_1.delete()
+ entity_3 = BarModel.get_by_key_name('first_bar')
+ # Won't resolve, but we can still get the key value.
+ self.assertRaises(db.Error, getattr, entity_3, 'foo')
+ self.assertEqual(str(ext_db.get_reference_key(entity_3, 'foo')),
entity_1_key)
+
+ def test_get_reference_key_2(self):
+ # Set a book entity with an author reference.
+ class Author(db.Model):
+ name = db.StringProperty()
+
+ class Book(db.Model):
+ title = db.StringProperty()
+ author = db.ReferenceProperty(Author)
+
+ author = Author(name='Stephen King')
+ author.put()
+
+ book = Book(key_name='the-shining', title='The Shining',
author=author)
+ book.put()
+
+ # Now let's fetch the book and get the author key without fetching
it.
+ fetched_book = Book.get_by_key_name('the-shining')
+
self.assertEqual(str(ext_db.get_reference_key(fetched_book, 'author')),
str(author.key()))
+
+
#===========================================================================
+ # db.populate_entity
+
#===========================================================================
+ def test_populate_entity(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'bar')
+ self.assertEqual(entity_1.age, 20)
+ self.assertEqual(entity_1.married, True)
+
+ def test_populate_entity_2(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
+ entity_1.put()
+
+ self.assertRaises(AttributeError, getattr, entity_1, 'city')
+
+ def test_populate_expando_entity(self):
+ entity_1 = FooExpandoModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'bar')
+ self.assertEqual(entity_1.age, 20)
+ self.assertEqual(entity_1.married, True)
+
+ def test_populate_expando_entity_2(self):
+ entity_1 = FooExpandoModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
+ entity_1.put()
+
+ self.assertRaises(AttributeError, getattr, entity_1, 'city')
-
#===========================================================================
- # db.get_entity_dict
-
#===========================================================================
- def test_get_entity_dict(self):
- class MyModel(db.Model):
- animal = db.StringProperty()
- species = db.IntegerProperty()
- description = db.TextProperty()
-
- entity = MyModel(animal='duck', species=12,
- description='A duck, a bird that swims well.')
- values = ext_db.get_entity_dict(entity)
-
- self.assertEqual(values, {
- 'animal': 'duck',
- 'species': 12,
- 'description': 'A duck, a bird that swims well.',
- })
-
- def test_get_entity_dict_multiple(self):
- class MyModel(db.Model):
- animal = db.StringProperty()
- species = db.IntegerProperty()
- description = db.TextProperty()
-
- entity = MyModel(animal='duck', species=12,
- description='A duck, a bird that swims well.')
- entity2 = MyModel(animal='bird', species=7,
- description='A bird, an animal that flies well.')
- values = ext_db.get_entity_dict([entity, entity2])
-
- self.assertEqual(values, [
- {
- 'animal': 'duck',
- 'species': 12,
- 'description': 'A duck, a bird that swims well.',
- },
- {
- 'animal': 'bird',
- 'species': 7,
- 'description': 'A bird, an animal that flies well.',
- }
- ])
-
- def test_get_entity_dict_with_expando(self):
- class MyModel(db.Expando):
- animal = db.StringProperty()
- species = db.IntegerProperty()
- description = db.TextProperty()
-
- entity = MyModel(animal='duck', species=12,
- description='A duck, a bird that swims well.',
- most_famous='Daffy Duck')
- values = ext_db.get_entity_dict(entity)
-
- self.assertEqual(values, {
- 'animal': 'duck',
- 'species': 12,
- 'description': 'A duck, a bird that swims well.',
- 'most_famous': 'Daffy Duck',
- })
-
-
#===========================================================================
- # get..._or_404
-
#===========================================================================
- def test_get_by_key_name_or_404(self):
- entity_1 = FooModel(key_name='foo', name='foo', age=15, married=False)
- entity_1.put()
-
- entity = ext_db.get_by_key_name_or_404(FooModel, 'foo')
- self.assertEqual(str(entity.key()), str(entity_1.key()))
-
- def test_get_by_key_name_or_404_2(self):
- self.assertRaises(NotFound, ext_db.get_by_key_name_or_404,
FooModel, 'bar')
-
- def test_get_by_id_or_404(self):
- entity_1 = FooModel(name='foo', age=15, married=False)
- entity_1.put()
-
- entity = ext_db.get_by_id_or_404(FooModel, entity_1.key().id())
- self.assertEqual(str(entity.key()), str(entity_1.key()))
-
- def test_get_by_id_or_404_2(self):
- self.assertRaises(NotFound, ext_db.get_by_id_or_404, FooModel, -1)
-
- def test_get_or_404(self):
- entity_1 = FooModel(name='foo', age=15, married=False)
- entity_1.put()
-
- entity = ext_db.get_or_404(entity_1.key())
- self.assertEqual(str(entity.key()), str(entity_1.key()))
-
- def test_get_or_404_2(self):
- self.assertRaises(NotFound, ext_db.get_or_404,
db.Key.from_path('FooModel', 'bar'))
-
- def test_get_or_404_3(self):
- self.assertRaises(NotFound, ext_db.get_or_404, 'this, not a valid key')
-
-
#===========================================================================
- # db.Property
-
#===========================================================================
- def test_pickle_property(self):
- data_1 = {'foo': 'bar'}
- entity_1 = FooModel(key_name='foo', name='foo', data=data_1)
- entity_1.put()
-
- data_2 = [1, 2, 3, 'baz']
- entity_2 = FooModel(key_name='bar', name='bar', data=data_2)
- entity_2.put()
-
- entity_1 = FooModel.get_by_key_name('foo')
- self.assertEqual(entity_1.data, data_1)
-
- entity_2 = FooModel.get_by_key_name('bar')
- self.assertEqual(entity_2.data, data_2)
-
- def test_slug_property(self):
- entity_1 = FooModel(key_name='foo', name=u'Mary Björk')
- entity_1.put()
-
- entity_2 = FooModel(key_name='bar', name=u'Tião Macalé')
- entity_2.put()
-
- entity_1 = FooModel.get_by_key_name('foo')
- entity_2 = FooModel.get_by_key_name('bar')
- self.assertEqual(entity_1.slug, 'mary-bjork')
- self.assertEqual(entity_2.slug, 'tiao-macale')
-
- def test_slug_property2(self):
- entity_1 = FooModel(key_name='foo', name=u'---')
- entity_1.put()
-
- entity_2 = FooModel(key_name='bar', name=u'___')
- entity_2.put()
-
- entity_1 = FooModel.get_by_key_name('foo')
- entity_2 = FooModel.get_by_key_name('bar')
- self.assertEqual(entity_1.slug, None)
- self.assertEqual(entity_2.slug, None)
-
- def test_slug_property3(self):
- entity_1 = FooModel(key_name='foo', name=u'---', name2=u'---')
- entity_1.put()
-
- entity_2 = FooModel(key_name='bar', name=u'___', name2=u'___')
- entity_2.put()
-
- entity_1 = FooModel.get_by_key_name('foo')
- entity_2 = FooModel.get_by_key_name('bar')
- self.assertEqual(entity_1.slug2, 'some-default-value')
- self.assertEqual(entity_2.slug2, 'some-default-value')
-
- def test_slug_property4(self):
- entity_1 = FooModel(key_name='foo', name=u'---', name2=u'Some really
very big and maybe enormous string')
- entity_1.put()
-
- entity_2 = FooModel(key_name='bar', name=u'___',
name2=u'abcdefghijklmnopqrstuwxyz')
- entity_2.put()
-
- entity_1 = FooModel.get_by_key_name('foo')
- entity_2 = FooModel.get_by_key_name('bar')
- self.assertEqual(entity_1.slug2, 'some-really-very-big')
- self.assertEqual(entity_2.slug2, 'abcdefghijklmnopqrst')
-
- def test_etag_property(self):
- entity_1 = FooModel(key_name='foo', name=u'Mary Björk')
- entity_1.put()
-
- entity_2 = FooModel(key_name='bar', name=u'Tião Macalé')
- entity_2.put()
-
- entity_1 = FooModel.get_by_key_name('foo')
- entity_2 = FooModel.get_by_key_name('bar')
- self.assertEqual(entity_1.etag,
hashlib.sha1(entity_1.name.encode('utf8')).hexdigest())
- self.assertEqual(entity_2.etag,
hashlib.sha1(entity_2.name.encode('utf8')).hexdigest())
-
- def test_etag_property2(self):
- entity_1 = FooModel(key_name='foo', name=u'Mary Björk')
- entity_1.put()
-
- entity_2 = FooModel(key_name='bar', name=u'Tião Macalé')
- entity_2.put()
-
- entity_1 = FooModel.get_by_key_name('foo')
- entity_2 = FooModel.get_by_key_name('bar')
- self.assertEqual(entity_1.etag2, None)
- self.assertEqual(entity_2.etag2, None)
-
- def test_json_property(self):
- entity_1 = JsonModel(key_name='foo', data={'foo': 'bar'})
- entity_1.put()
-
- entity_1 = JsonModel.get_by_key_name('foo')
- self.assertEqual(entity_1.data, {'foo': 'bar'})
-
- def test_json_property2(self):
- self.assertRaises(db.BadValueError, JsonModel, key_name='foo',
data='foo')
-
- def test_timezone_property(self):
- zone = 'America/Chicago'
- entity_1 = TimezoneModel(key_name='foo', data=zone)
- entity_1.put()
-
- entity_1 = TimezoneModel.get_by_key_name('foo')
- self.assertEqual(entity_1.data, ext_db.pytz.timezone(zone))
-
- def test_timezone_property2(self):
- self.assertRaises(db.BadValueError, TimezoneModel, key_name='foo',
data=[])
-
- def test_timezone_property3(self):
- self.assertRaises(ext_db.pytz.UnknownTimeZoneError, TimezoneModel,
key_name='foo', data='foo')
-
- def test_key_property(self):
- key = db.Key.from_path('Bar', 'bar-key')
- entity_1 = FooModel(name='foo', key_name='foo', somekey=key)
- entity_1.put()
-
- entity_1 = FooModel.get_by_key_name('foo')
- self.assertEqual(entity_1.somekey, key)
-
- def test_key_property2(self):
- key = db.Key.from_path('Bar', 'bar-key')
- entity_1 = FooModel(name='foo', key_name='foo', somekey=str(key))
- entity_1.put()
-
- entity_1 = FooModel.get_by_key_name('foo')
- self.assertEqual(entity_1.somekey, key)
-
- def test_key_property3(self):
- key = db.Key.from_path('Bar', 'bar-key')
- entity_1 = FooModel(name='foo', key_name='foo', somekey=str(key))
- entity_1.put()
-
- entity_2 = FooModel(name='bar', key_name='bar', somekey=entity_1)
- entity_2.put()
-
- entity_2 = FooModel.get_by_key_name('bar')
- self.assertEqual(entity_2.somekey, entity_1.key())
-
- def test_key_property4(self):
- key = db.Key.from_path('Bar', 'bar-key')
- entity_1 = FooModel(name='foo', somekey=str(key))
- self.assertRaises(db.BadValueError, FooModel, name='bar',
key_name='bar', somekey=entity_1)
-
- def test_key_property5(self):
- self.assertRaises(TypeError, FooModel, name='foo', key_name='foo',
somekey=['foo'])
-
- def test_key_property6(self):
- self.assertRaises(datastore_errors.BadKeyError, FooModel, name='foo',
key_name='foo', somekey='foo')
-
-
#===========================================================================
- # @db.retry_on_timeout
-
#===========================================================================
- def test_retry_on_timeout_1(self):
- counter = [0]
- test_timeout_1(counter=counter)
- self.assertEqual(counter[0], 3)
-
- def test_retry_on_timeout_2(self):
- counter = [0]
- self.assertRaises(ValueError, test_timeout_2, counter=counter)
- self.assertEqual(counter[0], 5)
-
- def test_retry_on_timeout_3(self):
- counter = [0]
- self.assertRaises(db.Timeout, test_timeout_3, counter=counter)
- self.assertEqual(counter[0], 3)
-
-
#===========================================================================
- # @db.load_entity
-
#===========================================================================
- def test_load_entity_with_key(self):
- @ext_db.load_entity(FooModel, 'foo_key', 'foo', 'key')
- def get(*args, **kwargs):
- return kwargs['foo']
-
- foo = FooModel(name='foo')
- foo.put()
-
- loaded_foo = get(foo_key=str(foo.key()))
- self.assertEqual(str(loaded_foo.key()), str(foo.key()))
- self.assertEqual(get(foo_key=None), None)
-
- def test_load_entity_with_key_2(self):
- @ext_db.load_entity(FooModel, 'foo_key', 'foo', 'key')
- def get(*args, **kwargs):
- return kwargs['foo']
-
- self.assertRaises(NotFound, get,
foo_key=str(db.Key.from_path('FooModel', 'bar')))
-
- def test_load_entity_with_id(self):
- @ext_db.load_entity(FooModel, 'foo_id', 'foo', 'id')
- def get(*args, **kwargs):
- return kwargs['foo']
-
- foo = FooModel(name='foo')
- foo.put()
-
- loaded_foo = get(foo_id=foo.key().id())
- self.assertEqual(str(loaded_foo.key()), str(foo.key()))
-
- def test_load_entity_with_id_2(self):
- @ext_db.load_entity(FooModel, 'foo_id', 'foo', 'id')
- def get(*args, **kwargs):
- return kwargs['foo']
-
- self.assertRaises(NotFound, get, foo_id=-1)
-
- def test_load_entity_with_key_name(self):
- @ext_db.load_entity(FooModel, 'foo_key_name', 'foo', 'key_name')
- def get(*args, **kwargs):
- return kwargs['foo']
-
- foo = FooModel(key_name='foo', name='foo')
- foo.put()
-
- loaded_foo = get(foo_key_name='foo')
- self.assertEqual(str(loaded_foo.key()), str(foo.key()))
-
- def test_load_entity_with_key_name_2(self):
- @ext_db.load_entity(FooModel, 'foo_key_name', 'foo', 'key_name')
- def get(*args, **kwargs):
- return kwargs['foo']
-
- self.assertRaises(NotFound, get, foo_key_name='bar')
-
- def test_load_entity_with_key_with_guessed_fetch_mode(self):
- @ext_db.load_entity(FooModel, 'foo_key')
- def get(*args, **kwargs):
- return kwargs['foo']
-
- foo = FooModel(name='foo')
- foo.put()
-
- loaded_foo = get(foo_key=str(foo.key()))
- self.assertEqual(str(loaded_foo.key()), str(foo.key()))
- self.assertEqual(get(foo_key=None), None)
-
- def test_load_entity_with_key_with_impossible_fetch_mode(self):
- def test():
- @ext_db.load_entity(FooModel, 'foo_bar')
- def get(*args, **kwargs):
- return kwargs['foo']
-
- self.assertRaises(NotImplementedError, test)
-
-
#===========================================================================
- # db.run_in_namespace
-
#===========================================================================
- def test_run_in_namespace(self):
- class MyModel(db.Model):
- name = db.StringProperty()
-
- def create_entity(name):
- entity = MyModel(key_name=name, name=name)
- entity.put()
-
- def get_entity(name):
- return MyModel.get_by_key_name(name)
-
- entity = ext_db.run_in_namespace('ns1', get_entity, 'foo')
- self.assertEqual(entity, None)
-
- ext_db.run_in_namespace('ns1', create_entity, 'foo')
-
- entity = ext_db.run_in_namespace('ns1', get_entity, 'foo')
- self.assertNotEqual(entity, None)
-
- entity = ext_db.run_in_namespace('ns2', get_entity, 'foo')
- self.assertEqual(entity, None)
-
-
#===========================================================================
- # db.to_key
-
#===========================================================================
- def test_to_key(self):
- class MyModel(db.Model):
- pass
-
- # None.
- self.assertEqual(ext_db.to_key(None), None)
- # Model without key.
- self.assertEqual(ext_db.to_key(MyModel()), None)
- # Model with key.
- self.assertEqual(ext_db.to_key(MyModel(key_name='foo')),
db.Key.from_path('MyModel', 'foo'))
- # Key.
- self.assertEqual(ext_db.to_key(db.Key.from_path('MyModel', 'foo')),
db.Key.from_path('MyModel', 'foo'))
- # Key as string.
- self.assertEqual(ext_db.to_key(str(db.Key.from_path('MyModel', 'foo'))),
db.Key.from_path('MyModel', 'foo'))
- # All mixed.
- keys = [None, MyModel(), MyModel(key_name='foo'),
db.Key.from_path('MyModel', 'foo'), str(db.Key.from_path('MyModel', 'foo'))]
- result = [None, None, db.Key.from_path('MyModel', 'foo'),
db.Key.from_path('MyModel', 'foo'), db.Key.from_path('MyModel', 'foo')]
- self.assertEqual(ext_db.to_key(keys), result)
-
- self.assertRaises(datastore_errors.BadArgumentError, ext_db.to_key, {})
+
#===========================================================================
+ # db.get_entity_dict
+
#===========================================================================
+ def test_get_entity_dict(self):
+ class MyModel(db.Model):
+ animal = db.StringProperty()
+ species = db.IntegerProperty()
+ description = db.TextProperty()
+
+ entity = MyModel(animal='duck', species=12,
+ description='A duck, a bird that swims well.')
+ values = ext_db.get_entity_dict(entity)
+
+ self.assertEqual(values, {
+ 'animal': 'duck',
+ 'species': 12,
+ 'description': 'A duck, a bird that swims well.',
+ })
+
+ def test_get_entity_dict_multiple(self):
+ class MyModel(db.Model):
+ animal = db.StringProperty()
+ species = db.IntegerProperty()
+ description = db.TextProperty()
+
+ entity = MyModel(animal='duck', species=12,
+ description='A duck, a bird that swims well.')
+ entity2 = MyModel(animal='bird', species=7,
+ description='A bird, an animal that flies well.')
+ values = ext_db.get_entity_dict([entity, entity2])
+
+ self.assertEqual(values, [
+ {
+ 'animal': 'duck',
+ 'species': 12,
+ 'description': 'A duck, a bird that swims well.',
+ },
+ {
+ 'animal': 'bird',
+ 'species': 7,
+ 'description': 'A bird, an animal that flies well.',
+ }
+ ])
+
+ def test_get_entity_dict_with_expando(self):
+ class MyModel(db.Expando):
+ animal = db.StringProperty()
+ species = db.IntegerProperty()
+ description = db.TextProperty()
+
+ entity = MyModel(animal='duck', species=12,
+ description='A duck, a bird that swims well.',
+ most_famous='Daffy Duck')
+ values = ext_db.get_entity_dict(entity)
+
+ self.assertEqual(values, {
+ 'animal': 'duck',
+ 'species': 12,
+ 'description': 'A duck, a bird that swims well.',
+ 'most_famous': 'Daffy Duck',
+ })
+
+
#===========================================================================
+ # get..._or_404
+
#===========================================================================
+ def test_get_by_key_name_or_404(self):
+ entity_1 = FooModel(key_name='foo', name='foo', age=15,
married=False)
+ entity_1.put()
+
+ entity = ext_db.get_by_key_name_or_404(FooModel, 'foo')
+ self.assertEqual(str(entity.key()), str(entity_1.key()))
+
+ def test_get_by_key_name_or_404_2(self):
+ self.assertRaises(NotFound, ext_db.get_by_key_name_or_404,
FooModel, 'bar')
+
+ def test_get_by_id_or_404(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ entity = ext_db.get_by_id_or_404(FooModel, entity_1.key().id())
+ self.assertEqual(str(entity.key()), str(entity_1.key()))
+
+ def test_get_by_id_or_404_2(self):
+ self.assertRaises(NotFound, ext_db.get_by_id_or_404, FooModel, -1)
+
+ def test_get_or_404(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ entity = ext_db.get_or_404(entity_1.key())
+ self.assertEqual(str(entity.key()), str(entity_1.key()))
+
+ def test_get_or_404_2(self):
+ self.assertRaises(NotFound, ext_db.get_or_404,
db.Key.from_path('FooModel', 'bar'))
+
+ def test_get_or_404_3(self):
+ self.assertRaises(NotFound, ext_db.get_or_404, 'this, not a valid
key')
+
+
#===========================================================================
+ # db.Property
+
#===========================================================================
+ def test_pickle_property(self):
+ data_1 = {'foo': 'bar'}
+ entity_1 = FooModel(key_name='foo', name='foo', data=data_1)
+ entity_1.put()
+
+ data_2 = [1, 2, 3, 'baz']
+ entity_2 = FooModel(key_name='bar', name='bar', data=data_2)
+ entity_2.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ self.assertEqual(entity_1.data, data_1)
+
+ entity_2 = FooModel.get_by_key_name('bar')
+ self.assertEqual(entity_2.data, data_2)
+
+ def test_slug_property(self):
+ entity_1 = FooModel(key_name='foo', name=u'Mary Björk')
+ entity_1.put()
+
+ entity_2 = FooModel(key_name='bar', name=u'Tião Macalé')
+ entity_2.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ entity_2 = FooModel.get_by_key_name('bar')
+ self.assertEqual(entity_1.slug, 'mary-bjork')
+ self.assertEqual(entity_2.slug, 'tiao-macale')
+
+ def test_slug_property2(self):
+ entity_1 = FooModel(key_name='foo', name=u'---')
***The diff for this file has been truncated for email.***
=======================================
--- /tests/gae_mail_test.py Sat Apr 2 13:52:07 2011
+++ /tests/gae_mail_test.py Sun Apr 3 07:55:49 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfy.appengine.mail
+ Tests for tipfy.appengine.mail
"""
import os
import sys
@@ -22,29 +22,29 @@
def get_app():
- return Tipfy(rules=[
- Rule('/', name='xmpp-test',
handler='resources.mail_handlers.MailHandler'),
- Rule('/test2', name='xmpp-test',
handler='resources.mail_handlers.MailHandler2'),
- ], debug=True)
+ return Tipfy(rules=[
+ Rule('/', name='xmpp-test',
handler='resources.mail_handlers.MailHandler'),
+ Rule('/test2', name='xmpp-test',
handler='resources.mail_handlers.MailHandler2'),
+ ], debug=True)
class TestInboundMailHandler(test_utils.BaseTestCase):
- def test_mail(self):
- app = get_app()
- client = app.get_test_client()
-
- response = client.open(method='POST', path='/', data=MESSAGE,
content_type='text/plain')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'Test message!')
-
- def test_not_implemented(self):
- app = get_app()
- app.config['tipfy']['enable_debugger'] = False
- client = app.get_test_client()
-
- self.assertRaises(NotImplementedError, client.open, method='POST',
path='/test2', data=MESSAGE, content_type='text/plain')
+ def test_mail(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ response = client.open(method='POST', path='/', data=MESSAGE,
content_type='text/plain')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'Test message!')
+
+ def test_not_implemented(self):
+ app = get_app()
+ app.config['tipfy']['enable_debugger'] = False
+ client = app.get_test_client()
+
+ self.assertRaises(NotImplementedError, client.open, method='POST',
path='/test2', data=MESSAGE, content_type='text/plain')
if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/gae_taskqueue_test.py Sat Apr 2 13:52:07 2011
+++ /tests/gae_taskqueue_test.py Sun Apr 3 07:55:49 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfyext.appenginetaskqueue
+ Tests for tipfyext.appenginetaskqueue
"""
import time
import unittest
@@ -17,62 +17,62 @@
def get_rules():
- # Fake get_rules() for testing.
- return [
- Rule('/_tasks/process-mymodel/', name='tasks/process-mymodel',
- handler='%s.TaskTestModelEntityTaskHandler' % __name__),
- Rule('/_tasks/process-mymodel/<string:key>',
name='tasks/process-mymodel',
- handler='%s.TaskTestModelEntityTaskHandler' % __name__),
- ]
+ # Fake get_rules() for testing.
+ return [
+ Rule('/_tasks/process-mymodel/', name='tasks/process-mymodel',
+ handler='%s.TaskTestModelEntityTaskHandler' % __name__),
+ Rule('/_tasks/process-mymodel/<string:key>',
name='tasks/process-mymodel',
+ handler='%s.TaskTestModelEntityTaskHandler' % __name__),
+ ]
def get_url_rules():
- # Fake get_rules() for testing.
- rules = [
- Rule('/_ah/queue/deferred', name='tasks/deferred',
handler='tipfy.appengine.taskqueue.DeferredHandler'),
- ]
-
- return Map(rules)
+ # Fake get_rules() for testing.
+ rules = [
+ Rule('/_ah/queue/deferred', name='tasks/deferred',
handler='tipfy.appengine.taskqueue.DeferredHandler'),
+ ]
+
+ return Map(rules)
def get_app():
- return Tipfy({
- 'tipfy': {
- 'dev': True,
- },
- }, rules=get_url_rules())
+ return Tipfy({
+ 'tipfy': {
+ 'dev': True,
+ },
+ }, rules=get_url_rules())
class TaskTestModel(db.Model):
- number = db.IntegerProperty()
+ number = db.IntegerProperty()
def save_entities(numbers):
- entities = []
- for number in numbers:
- entities.append(TaskTestModel(key_name=str(number), number=number))
-
- res = db.put(entities)
-
- import sys
- sys.exit(res)
+ entities = []
+ for number in numbers:
+ entities.append(TaskTestModel(key_name=str(number), number=number))
+
+ res = db.put(entities)
+
+ import sys
+ sys.exit(res)
class TestDeferredHandler(test_utils.BaseTestCase):
- """TODO"""
-
- def test_simple_deferred(self):
- numbers = [1234, 1577, 988]
- keys = [db.Key.from_path('TaskTestModel', str(number)) for number in
numbers]
- entities = db.get(keys)
- self.assertEqual(entities, [None, None, None])
-
- deferred.defer(save_entities, numbers)
+ """TODO"""
+
+ def test_simple_deferred(self):
+ numbers = [1234, 1577, 988]
+ keys = [db.Key.from_path('TaskTestModel', str(number)) for number
in numbers]
+ entities = db.get(keys)
+ self.assertEqual(entities, [None, None, None])
+
+ deferred.defer(save_entities, numbers)
class TestTasks(test_utils.BaseTestCase):
- """TODO"""
+ """TODO"""
if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/gae_xmpp_test.py Sat Apr 2 13:52:07 2011
+++ /tests/gae_xmpp_test.py Sun Apr 3 07:55:49 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfy.appengine.xmpp
+ Tests for tipfy.appengine.xmpp
"""
import os
import sys
@@ -17,143 +17,143 @@
def get_app():
- return Tipfy(rules=[
- Rule('/', name='xmpp-test',
handler='resources.xmpp_handlers.XmppHandler'),
- Rule('/test2', name='xmpp-test',
handler='resources.xmpp_handlers.XmppHandler2'),
- ], debug=True)
+ return Tipfy(rules=[
+ Rule('/', name='xmpp-test',
handler='resources.xmpp_handlers.XmppHandler'),
+ Rule('/test2', name='xmpp-test',
handler='resources.xmpp_handlers.XmppHandler2'),
+ ], debug=True)
def send_message(jids, body, from_jid=None, message_type='chat',
- raw_xml=False):
- fake_local['message'] = {
- 'body': body,
- }
+ raw_xml=False):
+ fake_local['message'] = {
+ 'body': body,
+ }
class InvalidMessageError(Exception):
- pass
+ pass
class Message(ApiMessage):
- """Encapsulates an XMPP message received by the application."""
- def __init__(self, vars):
- """Constructs a new XMPP Message from an HTTP request.
-
- Args:
- vars: A dict-like object to extract message arguments from.
- """
- try:
- self.__sender = vars["from"]
- self.__to = vars["to"]
- self.__body = vars["body"]
- except KeyError, e:
- raise InvalidMessageError(e[0])
-
- self.__command = None
- self.__arg = None
-
- def reply(self, body, message_type='chat', raw_xml=False,
- send_message=send_message):
- """Convenience function to reply to a message.
-
- Args:
- body: str: The body of the message
- message_type, raw_xml: As per send_message.
- send_message: Used for testing.
-
- Returns:
- A status code as per send_message.
-
- Raises:
- See send_message.
- """
- return send_message([self.sender], body, from_jid=
self.to,
- message_type=message_type, raw_xml=raw_xml)
+ """Encapsulates an XMPP message received by the application."""
+ def __init__(self, vars):
+ """Constructs a new XMPP Message from an HTTP request.
+
+ Args:
+ vars: A dict-like object to extract message arguments from.
+ """
+ try:
+ self.__sender = vars["from"]
+ self.__to = vars["to"]
+ self.__body = vars["body"]
+ except KeyError, e:
+ raise InvalidMessageError(e[0])
+
+ self.__command = None
+ self.__arg = None
+
+ def reply(self, body, message_type='chat', raw_xml=False,
+ send_message=send_message):
+ """Convenience function to reply to a message.
+
+ Args:
+ body: str: The body of the message
+ message_type, raw_xml: As per send_message.
+ send_message: Used for testing.
+
+ Returns:
+ A status code as per send_message.
+
+ Raises:
+ See send_message.
+ """
+ return send_message([self.sender], body, from_jid=
self.to,
+ message_type=message_type, raw_xml=raw_xml)
class TestCommandHandler(test_utils.BaseTestCase):
- def setUp(self):
- from tipfy.appengine import xmpp
- self.xmpp_module = xmpp.xmpp
- xmpp.xmpp = sys.modules[__name__]
- test_utils.BaseTestCase.setUp(self)
-
- def tearDown(self):
- fake_local.clear()
-
- from tipfy.appengine import xmpp
- xmpp.xmpp = self.xmpp_module
- test_utils.BaseTestCase.tearDown(self)
-
- def test_no_command(self):
- app = get_app()
- client = app.get_test_client()
-
- data = {}
- client.open(method='POST', data=data)
-
- self.assertEqual(fake_local.get('message', None), None)
-
- def test_not_implemented(self):
- app = get_app()
- app.config['tipfy']['enable_debugger'] = False
- client = app.get_test_client()
-
- data = {
- 'from': '
m...@myself.com',
- 'to': '
y...@yourself.com',
- 'body': '/inexistent_command foo bar',
- }
- self.assertRaises(NotImplementedError,
client.post, path='/test2',
data=data)
-
- def test_unknown_command(self):
- app = get_app()
- client = app.get_test_client()
-
- data = {
- 'from': '
m...@myself.com',
- 'to': '
y...@yourself.com',
- 'body': '/inexistent_command foo bar',
- }
- client.open(method='POST', data=data)
-
- self.assertEqual(fake_local.get('message', None), {'body': 'Unknown
command'})
-
- def test_command(self):
- app = get_app()
- client = app.get_test_client()
-
- data = {
- 'from': '
m...@myself.com',
- 'to': '
y...@yourself.com',
- 'body': '/foo foo bar',
- }
- client.open(method='POST', data=data)
-
- self.assertEqual(fake_local.get('message', None), {'body': 'Foo
command!'})
-
- data = {
- 'from': '
m...@myself.com',
- 'to': '
y...@yourself.com',
- 'body': '/bar foo bar',
- }
- client.open(method='POST', data=data)
-
- self.assertEqual(fake_local.get('message', None), {'body': 'Bar
command!'})
-
- def test_text_message(self):
- app = get_app()
- client = app.get_test_client()
-
- data = {
- 'from': '
m...@myself.com',
- 'to': '
y...@yourself.com',
- 'body': 'Hello, text message!',
- }
- client.open(method='POST', data=data)
-
- self.assertEqual(fake_local.get('message', None), {'body': 'Hello, text
message!'})
+ def setUp(self):
+ from tipfy.appengine import xmpp
+ self.xmpp_module = xmpp.xmpp
+ xmpp.xmpp = sys.modules[__name__]
+ test_utils.BaseTestCase.setUp(self)
+
+ def tearDown(self):
+ fake_local.clear()
+
+ from tipfy.appengine import xmpp
+ xmpp.xmpp = self.xmpp_module
+ test_utils.BaseTestCase.tearDown(self)
+
+ def test_no_command(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ data = {}
+ client.open(method='POST', data=data)
+
+ self.assertEqual(fake_local.get('message', None), None)
+
+ def test_not_implemented(self):
+ app = get_app()
+ app.config['tipfy']['enable_debugger'] = False
+ client = app.get_test_client()
+
+ data = {
+ 'from': '
m...@myself.com',
+ 'to': '
y...@yourself.com',
+ 'body': '/inexistent_command foo bar',
+ }
+ self.assertRaises(NotImplementedError,
client.post, path='/test2',
data=data)
+
+ def test_unknown_command(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ data = {
+ 'from': '
m...@myself.com',
+ 'to': '
y...@yourself.com',
+ 'body': '/inexistent_command foo bar',
+ }
+ client.open(method='POST', data=data)
+
+ self.assertEqual(fake_local.get('message', None),
{'body': 'Unknown command'})
+
+ def test_command(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ data = {
+ 'from': '
m...@myself.com',
+ 'to': '
y...@yourself.com',
+ 'body': '/foo foo bar',
+ }
+ client.open(method='POST', data=data)
+
+ self.assertEqual(fake_local.get('message', None), {'body': 'Foo
command!'})
+
+ data = {
+ 'from': '
m...@myself.com',
+ 'to': '
y...@yourself.com',
+ 'body': '/bar foo bar',
+ }
+ client.open(method='POST', data=data)
+
+ self.assertEqual(fake_local.get('message', None), {'body': 'Bar
command!'})
+
+ def test_text_message(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ data = {
+ 'from': '
m...@myself.com',
+ 'to': '
y...@yourself.com',
+ 'body': 'Hello, text message!',
+ }
+ client.open(method='POST', data=data)
+
+ self.assertEqual(fake_local.get('message', None), {'body': 'Hello,
text message!'})
if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/manage_test.py Sat Apr 2 13:52:07 2011
+++ /tests/manage_test.py Sun Apr 3 07:55:49 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfy.scripts.manage
+ Tests for tipfy.scripts.manage
"""
from __future__ import with_statement
@@ -18,179 +18,179 @@
class TestConfig(test_utils.BaseTestCase):
- def get_fp(self, config):
- return StringIO.StringIO(textwrap.dedent(config))
-
- def test_get(self):
- fp = self.get_fp("""\
- [DEFAULT]
- foo = bar
-
- [section_1]
- baz = ding
- """)
- config = Config()
- config.readfp(fp)
-
- self.assertEqual(config.get('section_1', 'foo'), 'bar')
- self.assertEqual(config.get('section_1', 'baz'), 'ding')
-
- # Invalid key.
- self.assertEqual(config.get('section_1', 'invalid'), None)
-
- def test_getboolean(self):
- fp = self.get_fp("""\
- [DEFAULT]
- true_1 = 1
- true_2 = yes
- false_1 = 0
- false_2 = no
-
- [section_1]
- true_3 = on
- true_4 = true
- false_3 = off
- false_4 = false
- invalid = bar
- """)
- config = Config()
- config.readfp(fp)
-
- self.assertEqual(config.getboolean('section_1', 'true_1'), True)
- self.assertEqual(config.getboolean('section_1', 'true_2'), True)
- self.assertEqual(config.getboolean('section_1', 'true_3'), True)
- self.assertEqual(config.getboolean('section_1', 'true_4'), True)
- self.assertEqual(config.getboolean('section_1', 'false_1'), False)
- self.assertEqual(config.getboolean('section_1', 'false_2'), False)
- self.assertEqual(config.getboolean('section_1', 'false_3'), False)
- self.assertEqual(config.getboolean('section_1', 'false_4'), False)
-
- # Invalid boolean.
- self.assertEqual(config.getboolean('section_1', 'invalid'), None)
-
- def test_getfloat(self):
- fp = self.get_fp("""\
- [DEFAULT]
- foo = 0.1
-
- [section_1]
- baz = 0.2
- invalid = bar
- """)
- config = Config()
- config.readfp(fp)
-
- self.assertEqual(config.getfloat('section_1', 'foo'), 0.1)
- self.assertEqual(config.getfloat('section_1', 'baz'), 0.2)
-
- # Invalid float.
- self.assertEqual(config.getboolean('section_1', 'invalid'), None)
-
- def test_getint(self):
- fp = self.get_fp("""\
- [DEFAULT]
- foo = 999
-
- [section_1]
- baz = 1999
- invalid = bar
- """)
- config = Config()
- config.readfp(fp)
-
- self.assertEqual(config.getint('section_1', 'foo'), 999)
- self.assertEqual(config.getint('section_1', 'baz'), 1999)
-
- # Invalid int.
- self.assertEqual(config.getboolean('section_1', 'invalid'), None)
-
- def test_getlist(self):
- fp = self.get_fp("""\
- [DEFAULT]
- animals =
- rhino
- rhino
- hamster
- hamster
- goat
- goat
-
- [section_1]
- fruits =
- orange
- watermellow
- grape
- """)
- config = Config()
- config.readfp(fp)
-
- # Non-unique values.
- self.assertEqual(config.getlist('section_1', 'animals'), [
- 'rhino',
- 'rhino',
- 'hamster',
- 'hamster',
- 'goat',
- 'goat',
- ])
- self.assertEqual(config.getlist('section_1', 'fruits'), [
- 'orange',
- 'watermellow',
- 'grape',
- ])
-
- # Unique values.
- self.assertEqual(config.getlist('section_1', 'animals', unique=True), [
- 'rhino',
- 'hamster',
- 'goat',
- ])
-
- def test_interpolation(self):
- fp = self.get_fp("""\
- [DEFAULT]
- path = /path/to/%(path_name)s
-
- [section_1]
- path_name = foo
- path_1 = /special%(path)s
- path_2 = /special/%(path_name)s
-
- [section_2]
- path_name = bar
-
- [section_3]
- path_1 = /path/to/%(section_1|path_name)s
- path_2 = /path/to/%(section_2|path_name)s
- path_3 = /%(section_1|path_name)s/%(section_2|path_name)s/%(section_1|
path_name)s/%(section_2|path_name)s
- path_error_1 = /path/to/%(section_3|path_error_1)s
- path_error_2 = /path/to/%(section_3|path_error_3)s
- path_error_3 = /path/to/%(section_3|path_error_2)s
- path_not_really = /path/to/%(foo
- """)
- config = Config()
- config.readfp(fp)
-
- self.assertEqual(config.get('section_1', 'path'), '/path/to/foo')
-
self.assertEqual(config.get('section_1', 'path_1'), '/special/path/to/foo')
- self.assertEqual(config.get('section_1', 'path_2'), '/special/foo')
- self.assertEqual(config.get('section_2', 'path'), '/path/to/bar')
-
- self.assertEqual(config.get('section_3', 'path_1'), '/path/to/foo')
- self.assertEqual(config.get('section_3', 'path_2'), '/path/to/bar')
- self.assertEqual(config.get('section_3', 'path_3'), '/foo/bar/foo/bar')
-
- # Failed interpolation (recursive)
- self.assertRaises(ConfigParser.InterpolationError, config.get,
- 'section_3', 'path_error_1')
- self.assertRaises(ConfigParser.InterpolationError, config.get,
- 'section_3', 'path_error_2')
- self.assertRaises(ConfigParser.InterpolationError, config.get,
- 'section_3', 'path_error_3')
-
-
self.assertEqual(config.get('section_3', 'path_not_really'), '/path/to/%(foo')
+ def get_fp(self, config):
+ return StringIO.StringIO(textwrap.dedent(config))
+
+ def test_get(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ foo = bar
+
+ [section_1]
+ baz = ding
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ self.assertEqual(config.get('section_1', 'foo'), 'bar')
+ self.assertEqual(config.get('section_1', 'baz'), 'ding')
+
+ # Invalid key.
+ self.assertEqual(config.get('section_1', 'invalid'), None)
+
+ def test_getboolean(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ true_1 = 1
+ true_2 = yes
+ false_1 = 0
+ false_2 = no
+
+ [section_1]
+ true_3 = on
+ true_4 = true
+ false_3 = off
+ false_4 = false
+ invalid = bar
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ self.assertEqual(config.getboolean('section_1', 'true_1'), True)
+ self.assertEqual(config.getboolean('section_1', 'true_2'), True)
+ self.assertEqual(config.getboolean('section_1', 'true_3'), True)
+ self.assertEqual(config.getboolean('section_1', 'true_4'), True)
+ self.assertEqual(config.getboolean('section_1', 'false_1'), False)
+ self.assertEqual(config.getboolean('section_1', 'false_2'), False)
+ self.assertEqual(config.getboolean('section_1', 'false_3'), False)
+ self.assertEqual(config.getboolean('section_1', 'false_4'), False)
+
+ # Invalid boolean.
+ self.assertEqual(config.getboolean('section_1', 'invalid'), None)
+
+ def test_getfloat(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ foo = 0.1
+
+ [section_1]
+ baz = 0.2
+ invalid = bar
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ self.assertEqual(config.getfloat('section_1', 'foo'), 0.1)
+ self.assertEqual(config.getfloat('section_1', 'baz'), 0.2)
+
+ # Invalid float.
+ self.assertEqual(config.getboolean('section_1', 'invalid'), None)
+
+ def test_getint(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ foo = 999
+
+ [section_1]
+ baz = 1999
+ invalid = bar
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ self.assertEqual(config.getint('section_1', 'foo'), 999)
+ self.assertEqual(config.getint('section_1', 'baz'), 1999)
+
+ # Invalid int.
+ self.assertEqual(config.getboolean('section_1', 'invalid'), None)
+
+ def test_getlist(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ animals =
+ rhino
+ rhino
+ hamster
+ hamster
+ goat
+ goat
+
+ [section_1]
+ fruits =
+ orange
+ watermellow
+ grape
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ # Non-unique values.
+ self.assertEqual(config.getlist('section_1', 'animals'), [
+ 'rhino',
+ 'rhino',
+ 'hamster',
+ 'hamster',
+ 'goat',
+ 'goat',
+ ])
+ self.assertEqual(config.getlist('section_1', 'fruits'), [
+ 'orange',
+ 'watermellow',
+ 'grape',
+ ])
+
+ # Unique values.
+ self.assertEqual(config.getlist('section_1', 'animals',
unique=True), [
+ 'rhino',
+ 'hamster',
+ 'goat',
+ ])
+
+ def test_interpolation(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ path = /path/to/%(path_name)s
+
+ [section_1]
+ path_name = foo
+ path_1 = /special%(path)s
+ path_2 = /special/%(path_name)s
+
+ [section_2]
+ path_name = bar
+
+ [section_3]
+ path_1 = /path/to/%(section_1|path_name)s
+ path_2 = /path/to/%(section_2|path_name)s
+ path_3 = /%(section_1|path_name)s/%(section_2|
path_name)s/%(section_1|path_name)s/%(section_2|path_name)s
+ path_error_1 = /path/to/%(section_3|path_error_1)s
+ path_error_2 = /path/to/%(section_3|path_error_3)s
+ path_error_3 = /path/to/%(section_3|path_error_2)s
+ path_not_really = /path/to/%(foo
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ self.assertEqual(config.get('section_1', 'path'), '/path/to/foo')
+
self.assertEqual(config.get('section_1', 'path_1'), '/special/path/to/foo')
+ self.assertEqual(config.get('section_1', 'path_2'), '/special/foo')
+ self.assertEqual(config.get('section_2', 'path'), '/path/to/bar')
+
+ self.assertEqual(config.get('section_3', 'path_1'), '/path/to/foo')
+ self.assertEqual(config.get('section_3', 'path_2'), '/path/to/bar')
+
self.assertEqual(config.get('section_3', 'path_3'), '/foo/bar/foo/bar')
+
+ # Failed interpolation (recursive)
+ self.assertRaises(ConfigParser.InterpolationError, config.get,
+ 'section_3', 'path_error_1')
+ self.assertRaises(ConfigParser.InterpolationError, config.get,
+ 'section_3', 'path_error_2')
+ self.assertRaises(ConfigParser.InterpolationError, config.get,
+ 'section_3', 'path_error_3')
+
+
self.assertEqual(config.get('section_3', 'path_not_really'), '/path/to/%(foo')
'''
if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/routing_test.py Sat Apr 2 13:52:07 2011
+++ /tests/routing_test.py Sun Apr 3 07:55:49 2011
@@ -8,162 +8,162 @@
class TestRouter(test_utils.BaseTestCase):
- def test_add(self):
- app = Tipfy()
- router = Router(app)
- self.assertEqual(len(list(router.map.iter_rules())), 0)
-
- router.add(Rule('/', name='home', handler='HomeHandler'))
- self.assertEqual(len(list(router.map.iter_rules())), 1)
-
- router.add([
- Rule('/about', name='about', handler='AboutHandler'),
- Rule('/contact', name='contact', handler='ContactHandler'),
- ])
- self.assertEqual(len(list(router.map.iter_rules())), 3)
+ def test_add(self):
+ app = Tipfy()
+ router = Router(app)
+ self.assertEqual(len(list(router.map.iter_rules())), 0)
+
+ router.add(Rule('/', name='home', handler='HomeHandler'))
+ self.assertEqual(len(list(router.map.iter_rules())), 1)
+
+ router.add([
+ Rule('/about', name='about', handler='AboutHandler'),
+ Rule('/contact', name='contact', handler='ContactHandler'),
+ ])
+ self.assertEqual(len(list(router.map.iter_rules())), 3)
class TestRouting(test_utils.BaseTestCase):
-
#==========================================================================
- # HandlerPrefix
-
#==========================================================================
- def test_handler_prefix(self):
- rules = [
- HandlerPrefix('resources.handlers.', [
- Rule('/', name='home', handler='HomeHandler'),
- Rule('/defaults', name='defaults', handler='HandlerWithRuleDefaults',
defaults={'foo': 'bar'}),
- ])
- ]
-
- app = Tipfy(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'Hello, World!')
-
- response = client.get('/defaults')
- self.assertEqual(response.data, 'bar')
-
-
#==========================================================================
- # NamePrefix
-
#==========================================================================
- def test_name_prefix(self):
- class DummyHandler(RequestHandler):
- def get(self, **kwargs):
- return ''
-
- rules = [
- NamePrefix('company-', [
- Rule('/', name='home', handler=DummyHandler),
- Rule('/about', name='about', handler=DummyHandler),
- Rule('/contact', name='contact', handler=DummyHandler),
- ]),
- ]
-
- app = Tipfy(rules)
-
- with app.get_test_handler('/') as handler:
- self.assertEqual(handler.url_for('company-home'), '/')
- self.assertEqual(handler.url_for('company-about'), '/about')
- self.assertEqual(handler.url_for('company-contact'), '/contact')
-
- with app.get_test_handler('/') as handler:
- self.assertEqual(
handler.request.rule.name, 'company-home')
-
- with app.get_test_handler('/about') as handler:
- self.assertEqual(
handler.request.rule.name, 'company-about')
-
- with app.get_test_handler('/contact') as handler:
- self.assertEqual(
handler.request.rule.name, 'company-contact')
-
-
#==========================================================================
- # RegexConverter
-
#==========================================================================
- def test_regex_converter(self):
- class TestHandler(RequestHandler):
- def get(self, **kwargs):
- return Response(kwargs.get('path'))
-
- app = Tipfy([
- Rule('/<regex(".*"):path>', name='home', handler=TestHandler),
- ])
- client = app.get_test_client()
-
- response = client.get('/foo')
- self.assertEqual(response.data, 'foo')
-
- response = client.get('/foo/bar')
- self.assertEqual(response.data, 'foo/bar')
-
- response = client.get('/foo/bar/baz')
- self.assertEqual(response.data, 'foo/bar/baz')
-
- def test_url_for(self):
- class DummyHandler(RequestHandler):
- def get(self, **kwargs):
- return ''
-
- rules = [
- NamePrefix('company-', [
- Rule('/', name='home', handler=DummyHandler),
- Rule('/about', name='about', handler=DummyHandler),
- Rule('/contact', name='contact', handler=DummyHandler),
- ]),
- ]
-
- app = Tipfy(rules)
-
- with app.get_test_handler('/') as handler:
- self.assertEqual(url_for('company-home'), '/')
- self.assertEqual(url_for('company-about'), '/about')
- self.assertEqual(url_for('company-contact'), '/contact')
+
#==========================================================================
+ # HandlerPrefix
+
#==========================================================================
+ def test_handler_prefix(self):
+ rules = [
+ HandlerPrefix('resources.handlers.', [
+ Rule('/', name='home', handler='HomeHandler'),
+ Rule('/defaults', name='defaults',
handler='HandlerWithRuleDefaults', defaults={'foo': 'bar'}),
+ ])
+ ]
+
+ app = Tipfy(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'Hello, World!')
+
+ response = client.get('/defaults')
+ self.assertEqual(response.data, 'bar')
+
+
#==========================================================================
+ # NamePrefix
+
#==========================================================================
+ def test_name_prefix(self):
+ class DummyHandler(RequestHandler):
+ def get(self, **kwargs):
+ return ''
+
+ rules = [
+ NamePrefix('company-', [
+ Rule('/', name='home', handler=DummyHandler),
+ Rule('/about', name='about', handler=DummyHandler),
+ Rule('/contact', name='contact', handler=DummyHandler),
+ ]),
+ ]
+
+ app = Tipfy(rules)
+
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(handler.url_for('company-home'), '/')
+ self.assertEqual(handler.url_for('company-about'), '/about')
+
self.assertEqual(handler.url_for('company-contact'), '/contact')
+
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(
handler.request.rule.name, 'company-home')
+
+ with app.get_test_handler('/about') as handler:
+ self.assertEqual(
handler.request.rule.name, 'company-about')
+
+ with app.get_test_handler('/contact') as handler:
+ self.assertEqual(
handler.request.rule.name, 'company-contact')
+
+
#==========================================================================
+ # RegexConverter
+
#==========================================================================
+ def test_regex_converter(self):
+ class TestHandler(RequestHandler):
+ def get(self, **kwargs):
+ return Response(kwargs.get('path'))
+
+ app = Tipfy([
+ Rule('/<regex(".*"):path>', name='home', handler=TestHandler),
+ ])
+ client = app.get_test_client()
+
+ response = client.get('/foo')
+ self.assertEqual(response.data, 'foo')
+
+ response = client.get('/foo/bar')
+ self.assertEqual(response.data, 'foo/bar')
+
+ response = client.get('/foo/bar/baz')
+ self.assertEqual(response.data, 'foo/bar/baz')
+
+ def test_url_for(self):
+ class DummyHandler(RequestHandler):
+ def get(self, **kwargs):
+ return ''
+
+ rules = [
+ NamePrefix('company-', [
+ Rule('/', name='home', handler=DummyHandler),
+ Rule('/about', name='about', handler=DummyHandler),
+ Rule('/contact', name='contact', handler=DummyHandler),
+ ]),
+ ]
+
+ app = Tipfy(rules)
+
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(url_for('company-home'), '/')
+ self.assertEqual(url_for('company-about'), '/about')
+ self.assertEqual(url_for('company-contact'), '/contact')
class TestAlternativeRouting(test_utils.BaseTestCase):
- def test_handler(self):
- rules = [
- HandlerPrefix('resources.alternative_routing.', [
- Rule('/', name='home', handler='HomeHandler'),
- Rule('/foo', name='home/foo', handler='HomeHandler:foo'),
- Rule('/bar', name='home/bar', handler='HomeHandler:bar'),
- Rule('/other/foo', name='other/foo', handler='OtherHandler:foo'),
- Rule('/other/bar', name='other/bar', handler='OtherHandler:bar'),
- ])
- ]
-
- app = Tipfy(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'home-get')
- response = client.get('/foo')
- self.assertEqual(response.data, 'home-foo')
- response = client.get('/bar')
- self.assertEqual(response.data, 'home-bar')
- response = client.get('/other/foo')
- self.assertEqual(response.data, 'other-foo')
- response = client.get('/other/bar')
- self.assertEqual(response.data, 'other-bar')
-
- def test_function_handler(self):
- rules = [
- HandlerPrefix('resources.alternative_routing.', [
- Rule('/', name='home', handler='home'),
- Rule('/foo', name='home/foo', handler='foo'),
- Rule('/bar', name='home/bar', handler='bar'),
- ])
- ]
-
- app = Tipfy(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'home')
- response = client.get('/foo')
- self.assertEqual(response.data, 'foo')
- response = client.get('/bar')
- self.assertEqual(response.data, 'bar')
+ def test_handler(self):
+ rules = [
+ HandlerPrefix('resources.alternative_routing.', [
+ Rule('/', name='home', handler='HomeHandler'),
+ Rule('/foo', name='home/foo', handler='HomeHandler:foo'),
+ Rule('/bar', name='home/bar', handler='HomeHandler:bar'),
+ Rule('/other/foo', name='other/foo',
handler='OtherHandler:foo'),
+ Rule('/other/bar', name='other/bar',
handler='OtherHandler:bar'),
+ ])
+ ]
+
+ app = Tipfy(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'home-get')
+ response = client.get('/foo')
+ self.assertEqual(response.data, 'home-foo')
+ response = client.get('/bar')
+ self.assertEqual(response.data, 'home-bar')
+ response = client.get('/other/foo')
+ self.assertEqual(response.data, 'other-foo')
+ response = client.get('/other/bar')
+ self.assertEqual(response.data, 'other-bar')
+
+ def test_function_handler(self):
+ rules = [
+ HandlerPrefix('resources.alternative_routing.', [
+ Rule('/', name='home', handler='home'),
+ Rule('/foo', name='home/foo', handler='foo'),
+ Rule('/bar', name='home/bar', handler='bar'),
+ ])
+ ]
+
+ app = Tipfy(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'home')
+ response = client.get('/foo')
+ self.assertEqual(response.data, 'foo')
+ response = client.get('/bar')
+ self.assertEqual(response.data, 'bar')
if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/secure_cookie_test.py Sat Apr 2 13:52:07 2011
+++ /tests/secure_cookie_test.py Sun Apr 3 07:55:49 2011
@@ -8,48 +8,48 @@
class TestSecureCookie(test_utils.BaseTestCase):
- def _get_app(self):
- return Tipfy(config={
- 'tipfy.sessions': {
- 'secret_key': 'something very secret',
- }
- })
-
- def test_get_cookie_no_cookie(self):
- store = SecureCookieStore('secret')
- request = Request.from_values('/')
- self.assertEqual(store.get_cookie(request, 'session'), None)
-
- def test_get_cookie_invalid_parts(self):
- store = SecureCookieStore('secret')
- request = Request.from_values('/',
headers=[('Cookie', 'session="invalid"; Path=/')])
- self.assertEqual(store.get_cookie(request, 'session'), None)
-
- def test_get_cookie_invalid_signature(self):
- store = SecureCookieStore('secret')
- request = Request.from_values('/', headers=[('Cookie', 'session="foo|bar|
baz"; Path=/')])
- self.assertEqual(store.get_cookie(request, 'session'), None)
-
- def test_get_cookie_expired(self):
- store = SecureCookieStore('secret')
- request = Request.from_values('/',
headers=[('Cookie', 'session="eyJmb28iOiJiYXIifQ==|1284849476|
847b472f2fabbf1efef55748a394b6f182acd8be"; Path=/')])
- self.assertEqual(store.get_cookie(request, 'session', max_age=-86400),
None)
-
- def test_get_cookie_badly_encoded(self):
- store = SecureCookieStore('secret')
- timestamp = str(int(time.time()))
- value = 'foo'
- signature = store._get_signature('session', value, timestamp)
- cookie_value = '|'.join([value, timestamp, signature])
-
- request = Request.from_values('/', headers=[('Cookie', 'session="%s";
Path=/' % cookie_value)])
- self.assertEqual(store.get_cookie(request, 'session'), None)
-
- def test_get_cookie_valid(self):
- store = SecureCookieStore('secret')
- request = Request.from_values('/',
headers=[('Cookie', 'session="eyJmb28iOiJiYXIifQ==|1284849476|
847b472f2fabbf1efef55748a394b6f182acd8be"; Path=/')])
- self.assertEqual(store.get_cookie(request, 'session'), {'foo': 'bar'})
+ def _get_app(self):
+ return Tipfy(config={
+ 'tipfy.sessions': {
+ 'secret_key': 'something very secret',
+ }
+ })
+
+ def test_get_cookie_no_cookie(self):
+ store = SecureCookieStore('secret')
+ request = Request.from_values('/')
+ self.assertEqual(store.get_cookie(request, 'session'), None)
+
+ def test_get_cookie_invalid_parts(self):
+ store = SecureCookieStore('secret')
+ request = Request.from_values('/',
headers=[('Cookie', 'session="invalid"; Path=/')])
+ self.assertEqual(store.get_cookie(request, 'session'), None)
+
+ def test_get_cookie_invalid_signature(self):
+ store = SecureCookieStore('secret')
+ request = Request.from_values('/',
headers=[('Cookie', 'session="foo|bar|baz"; Path=/')])
+ self.assertEqual(store.get_cookie(request, 'session'), None)
+
+ def test_get_cookie_expired(self):
+ store = SecureCookieStore('secret')
+ request = Request.from_values('/',
headers=[('Cookie', 'session="eyJmb28iOiJiYXIifQ==|1284849476|
847b472f2fabbf1efef55748a394b6f182acd8be"; Path=/')])
+ self.assertEqual(store.get_cookie(request, 'session',
max_age=-86400), None)
+
+ def test_get_cookie_badly_encoded(self):
+ store = SecureCookieStore('secret')
+ timestamp = str(int(time.time()))
+ value = 'foo'
+ signature = store._get_signature('session', value, timestamp)
+ cookie_value = '|'.join([value, timestamp, signature])
+
+ request = Request.from_values('/',
headers=[('Cookie', 'session="%s"; Path=/' % cookie_value)])
+ self.assertEqual(store.get_cookie(request, 'session'), None)
+
+ def test_get_cookie_valid(self):
+ store = SecureCookieStore('secret')
+ request = Request.from_values('/',
headers=[('Cookie', 'session="eyJmb28iOiJiYXIifQ==|1284849476|
847b472f2fabbf1efef55748a394b6f182acd8be"; Path=/')])
+ self.assertEqual(store.get_cookie(request, 'session'),
{'foo': 'bar'})
if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/template_test.py Sat Apr 2 13:52:07 2011
+++ /tests/template_test.py Sun Apr 3 07:55:49 2011
@@ -6,26 +6,26 @@
import test_utils
TEMPLATES_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
- 'resources', 'templates'))
+ 'resources', 'templates'))
TEMPLATES_ZIP_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
- 'resources', 'templates.zip'))
+ 'resources', 'templates.zip'))
class TestTemplate(test_utils.BaseTestCase):
- def test_generate(self):
- t = template.Template('<html>{{ myvalue }}</html>')
- self.assertEqual(t.generate(myvalue='XXX'), '<html>XXX</html>')
-
- def test_loader(self):
- loader = template.Loader(TEMPLATES_DIR)
- t = loader.load('template_tornado1.html')
-
self.assertEqual(t.generate(students=['calvin', 'hobbes', 'moe']), '\n\ncalvin\n\n\n\nhobbes\n\n\n\nmoe\n\n\n')
-
- def test_loader2(self):
- loader = template.ZipLoader(TEMPLATES_ZIP_DIR, 'templates')
- t = loader.load('template1.html')
- self.assertEqual(t.generate(message='Hello, World!'), 'Hello, World!\n')
+ def test_generate(self):
+ t = template.Template('<html>{{ myvalue }}</html>')
+ self.assertEqual(t.generate(myvalue='XXX'), '<html>XXX</html>')
+
+ def test_loader(self):
+ loader = template.Loader(TEMPLATES_DIR)
+ t = loader.load('template_tornado1.html')
+
self.assertEqual(t.generate(students=['calvin', 'hobbes', 'moe']), '\n\ncalvin\n\n\n\nhobbes\n\n\n\nmoe\n\n\n')
+
+ def test_loader2(self):
+ loader = template.ZipLoader(TEMPLATES_ZIP_DIR, 'templates')
+ t = loader.load('template1.html')
+ self.assertEqual(t.generate(message='Hello, World!'), 'Hello,
World!\n')
if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/utils_test.py Sun Apr 3 07:14:04 2011
+++ /tests/utils_test.py Sun Apr 3 07:55:49 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfy utils
+ Tests for tipfy utils
"""
from __future__ import with_statement
@@ -12,183 +12,183 @@
from
tipfy.app import local
from tipfy.utils import (xhtml_escape, xhtml_unescape, json_encode,
- json_decode, render_json_response, url_escape, url_unescape, utf8,
- _unicode)
+ json_decode, render_json_response, url_escape, url_unescape, utf8,
+ _unicode)
import test_utils
class HomeHandler(RequestHandler):
- def get(self, **kwargs):
- return 'Hello, World!'
+ def get(self, **kwargs):
+ return 'Hello, World!'
class ProfileHandler(RequestHandler):
- def get(self, **kwargs):
- return 'Username: %s' % kwargs.get('username')
+ def get(self, **kwargs):
+ return 'Username: %s' % kwargs.get('username')
class RedirectToHandler(RequestHandler):
- def get(self, **kwargs):
- username = kwargs.get('username', None)
- if username:
- return redirect_to('profile', username=username)
- else:
- return redirect_to('home')
+ def get(self, **kwargs):
+ username = kwargs.get('username', None)
+ if username:
+ return redirect_to('profile', username=username)
+ else:
+ return redirect_to('home')
class RedirectTo301Handler(RequestHandler):
- def get(self, **kwargs):
- username = kwargs.get('username', None)
- if username:
- return redirect_to('profile', username=username, _code=301)
- else:
- return redirect_to('home', _code=301)
+ def get(self, **kwargs):
+ username = kwargs.get('username', None)
+ if username:
+ return redirect_to('profile', username=username, _code=301)
+ else:
+ return redirect_to('home', _code=301)
class RedirectToInvalidCodeHandler(RequestHandler):
- def get(self, **kwargs):
- return redirect_to('home', _code=405)
+ def get(self, **kwargs):
+ return redirect_to('home', _code=405)
def get_app():
- return Tipfy(rules=[
- Rule('/', name='home', handler=HomeHandler),
- Rule('/people/<string:username>', name='profile',
handler=ProfileHandler),
- Rule('/redirect_to/', name='redirect_to', handler=RedirectToHandler),
- Rule('/redirect_to/<string:username>', name='redirect_to',
handler=RedirectToHandler),
- Rule('/redirect_to_301/', name='redirect_to',
handler=RedirectTo301Handler),
- Rule('/redirect_to_301/<string:username>', name='redirect_to',
handler=RedirectTo301Handler),
- Rule('/redirect_to_invalid', name='redirect_to_invalid',
handler=RedirectToInvalidCodeHandler),
- ])
+ return Tipfy(rules=[
+ Rule('/', name='home', handler=HomeHandler),
+ Rule('/people/<string:username>', name='profile',
handler=ProfileHandler),
+ Rule('/redirect_to/', name='redirect_to',
handler=RedirectToHandler),
+ Rule('/redirect_to/<string:username>', name='redirect_to',
handler=RedirectToHandler),
+ Rule('/redirect_to_301/', name='redirect_to',
handler=RedirectTo301Handler),
+ Rule('/redirect_to_301/<string:username>', name='redirect_to',
handler=RedirectTo301Handler),
+ Rule('/redirect_to_invalid', name='redirect_to_invalid',
handler=RedirectToInvalidCodeHandler),
+ ])
class TestRedirect(test_utils.BaseTestCase):
- '''
-
#===========================================================================
- # redirect()
-
#===========================================================================
- def test_redirect(self):
- response = redirect('
http://www.google.com/')
-
- self.assertEqual(response.headers['location'], '
http://www.google.com/')
- self.assertEqual(response.status_code, 302)
-
- def test_redirect_301(self):
- response = redirect('
http://www.google.com/', 301)
-
- self.assertEqual(response.headers['location'], '
http://www.google.com/')
- self.assertEqual(response.status_code, 301)
-
- def test_redirect_no_response(self):
- response = redirect('
http://www.google.com/')
-
- self.assertEqual(isinstance(response, werkzeug.BaseResponse), True)
- self.assertEqual(response.headers['location'], '
http://www.google.com/')
- self.assertEqual(response.status_code, 302)
-
- def test_redirect_no_response_301(self):
- response = redirect('
http://www.google.com/', 301)
-
- self.assertEqual(isinstance(response, werkzeug.BaseResponse), True)
- self.assertEqual(response.headers['location'], '
http://www.google.com/')
- self.assertEqual(response.status_code, 301)
-
- def test_redirect_invalid_code(self):
- self.assertRaises(AssertionError, redirect, '
http://www.google.com/',
404)
-
-
#===========================================================================
- # redirect_to()
-
#===========================================================================
- def test_redirect_to(self):
- app = get_app()
- client = app.get_test_client()
-
- response = client.get('/redirect_to/', base_url='
http://foo.com')
- self.assertEqual(response.headers['location'], '
http://foo.com/')
- self.assertEqual(response.status_code, 302)
+ '''
+
#===========================================================================
+ # redirect()
+
#===========================================================================
+ def test_redirect(self):
+ response = redirect('
http://www.google.com/')
+
+
self.assertEqual(response.headers['location'], '
http://www.google.com/')
+ self.assertEqual(response.status_code, 302)
+
+ def test_redirect_301(self):
+ response = redirect('
http://www.google.com/', 301)
+
+
self.assertEqual(response.headers['location'], '
http://www.google.com/')
+ self.assertEqual(response.status_code, 301)
+
+ def test_redirect_no_response(self):
+ response = redirect('
http://www.google.com/')
+
+ self.assertEqual(isinstance(response, werkzeug.BaseResponse), True)
+
self.assertEqual(response.headers['location'], '
http://www.google.com/')
+ self.assertEqual(response.status_code, 302)
+
+ def test_redirect_no_response_301(self):
+ response = redirect('
http://www.google.com/', 301)
+
+ self.assertEqual(isinstance(response, werkzeug.BaseResponse), True)
+
self.assertEqual(response.headers['location'], '
http://www.google.com/')
+ self.assertEqual(response.status_code, 301)
+
+ def test_redirect_invalid_code(self):
+ self.assertRaises(AssertionError,
redirect, '
http://www.google.com/', 404)
+
+
#===========================================================================
+ # redirect_to()
+
#===========================================================================
+ def test_redirect_to(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ response = client.get('/redirect_to/', base_url='
http://foo.com')
+ self.assertEqual(response.headers['location'], '
http://foo.com/')
+ self.assertEqual(response.status_code, 302)
- def test_redirect_to2(self):
- app = get_app()
- client = app.get_test_client()
-
- response = client.get('/redirect_to/calvin', base_url='
http://foo.com')
-
self.assertEqual(response.headers['location'], '
http://foo.com/people/calvin')
- self.assertEqual(response.status_code, 302)
-
- response = client.get('/redirect_to/hobbes', base_url='
http://foo.com')
-
self.assertEqual(response.headers['location'], '
http://foo.com/people/hobbes')
- self.assertEqual(response.status_code, 302)
-
- response = client.get('/redirect_to/moe', base_url='
http://foo.com')
-
self.assertEqual(response.headers['location'], '
http://foo.com/people/moe')
- self.assertEqual(response.status_code, 302)
-
- def test_redirect_to_301(self):
- app = get_app()
- client = app.get_test_client()
-
- response = client.get('/redirect_to_301/calvin',
base_url='
http://foo.com')
-
self.assertEqual(response.headers['location'], '
http://foo.com/people/calvin')
- self.assertEqual(response.status_code, 301)
-
- response = client.get('/redirect_to_301/hobbes',
base_url='
http://foo.com')
-
self.assertEqual(response.headers['location'], '
http://foo.com/people/hobbes')
- self.assertEqual(response.status_code, 301)
-
- response = client.get('/redirect_to_301/moe', base_url='
http://foo.com')
-
self.assertEqual(response.headers['location'], '
http://foo.com/people/moe')
- self.assertEqual(response.status_code, 301)
-
- def test_redirect_to_invalid_code(self):
- app = get_app()
- client = app.get_test_client()
-
- response = client.get('/redirect_to_invalid', base_url='
http://foo.com')
- self.assertEqual(response.status_code, 500)
- '''
+ def test_redirect_to2(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ response = client.get('/redirect_to/calvin',
base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/calvin')
+ self.assertEqual(response.status_code, 302)
+
+ response = client.get('/redirect_to/hobbes',
base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/hobbes')
+ self.assertEqual(response.status_code, 302)
+
+ response = client.get('/redirect_to/moe',
base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/moe')
+ self.assertEqual(response.status_code, 302)
+
+ def test_redirect_to_301(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ response = client.get('/redirect_to_301/calvin',
base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/calvin')
+ self.assertEqual(response.status_code, 301)
+
+ response = client.get('/redirect_to_301/hobbes',
base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/hobbes')
+ self.assertEqual(response.status_code, 301)
+
+ response = client.get('/redirect_to_301/moe',
base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/moe')
+ self.assertEqual(response.status_code, 301)
+
+ def test_redirect_to_invalid_code(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ response = client.get('/redirect_to_invalid',
base_url='
http://foo.com')
+ self.assertEqual(response.status_code, 500)
+ '''
class TestRenderJson(test_utils.BaseTestCase):
-
#===========================================================================
- # render_json_response()
-
#===========================================================================
- def test_render_json_response(self):
- with Tipfy().get_test_context() as request:
- response = render_json_response({'foo': 'bar'})
-
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'application/json')
- self.assertEqual(response.data, '{"foo":"bar"}')
+
#===========================================================================
+ # render_json_response()
+
#===========================================================================
+ def test_render_json_response(self):
+ with Tipfy().get_test_context() as request:
+ response = render_json_response({'foo': 'bar'})
+
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'application/json')
+ self.assertEqual(response.data, '{"foo":"bar"}')
class TestUtils(test_utils.BaseTestCase):
- def test_xhtml_escape(self):
- self.assertEqual(xhtml_escape('"foo"'), '"foo"')
-
- def test_xhtml_unescape(self):
- self.assertEqual(xhtml_unescape('"foo"'), '"foo"')
-
- def test_json_encode(self):
-
self.assertEqual(json_encode('<script>alert("hello")</script>'), '"<script>alert(\\"hello\\")<\\/script>"')
-
- def test_json_decode(self):
-
self.assertEqual(json_decode('"<script>alert(\\"hello\\")<\\/script>"'), '<script>alert("hello")</script>')
-
- def test_url_escape(self):
- self.assertEqual(url_escape('somewords&some more
words'), 'somewords%26some+more+words')
-
- def test_url_unescape(self):
-
self.assertEqual(url_unescape('somewords%26some+more+words'), 'somewords&some
more words')
-
- def test_utf8(self):
- self.assertEqual(isinstance(utf8(u'ááá'), str), True)
- self.assertEqual(isinstance(utf8('ááá'), str), True)
-
- def test_unicode(self):
- self.assertEqual(isinstance(_unicode(u'ááá'), unicode), True)
- self.assertEqual(isinstance(_unicode('ááá'), unicode), True)
+ def test_xhtml_escape(self):
+ self.assertEqual(xhtml_escape('"foo"'), '"foo"')
+
+ def test_xhtml_unescape(self):
+ self.assertEqual(xhtml_unescape('"foo"'), '"foo"')
+
+ def test_json_encode(self):
+
self.assertEqual(json_encode('<script>alert("hello")</script>'), '"<script>alert(\\"hello\\")<\\/script>"')
+
+ def test_json_decode(self):
+
self.assertEqual(json_decode('"<script>alert(\\"hello\\")<\\/script>"'), '<script>alert("hello")</script>')
+
+ def test_url_escape(self):
+ self.assertEqual(url_escape('somewords&some more
words'), 'somewords%26some+more+words')
+
+ def test_url_unescape(self):
+
self.assertEqual(url_unescape('somewords%26some+more+words'), 'somewords&some
more words')
+
+ def test_utf8(self):
+ self.assertEqual(isinstance(utf8(u'ááá'), str), True)
+ self.assertEqual(isinstance(utf8('ááá'), str), True)
+
+ def test_unicode(self):
+ self.assertEqual(isinstance(_unicode(u'ááá'), unicode), True)
+ self.assertEqual(isinstance(_unicode('ááá'), unicode), True)
if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tipfy/app.py Sun Apr 3 07:25:05 2011
+++ /tipfy/app.py Sun Apr 3 07:55:49 2011
@@ -1,12 +1,12 @@
# -*- coding: utf-8 -*-
"""
-
tipfy.app
- ~~~~~~~~~
-
- WSGI Application and RequestHandler.
-
- :copyright: 2011 by
tipfy.org.
- :license: BSD, see LICENSE.txt for more details.
+
tipfy.app
+ ~~~~~~~~~
+
+ WSGI Application and RequestHandler.
+
+ :copyright: 2011 by
tipfy.org.
+ :license: BSD, see LICENSE.txt for more details.
"""
from __future__ import with_statement
@@ -34,469 +34,469 @@
#: TODO: remove from here.
from tipfy.appengine import (APPENGINE, APPLICATION_ID, CURRENT_VERSION_ID,
- DEV_APPSERVER)
+ DEV_APPSERVER)
class Request(werkzeug.wrappers.Request):
- """Provides all environment variables for the current request: GET, POST,
- FILES, cookies and headers.
- """
- #: The WSGI app.
- app = None
- #: Exception caught by the WSGI app during dispatch().
- exception = None
- #: URL adapter.
- rule_adapter = None
- #: Matched :class:`tipfy.routing.Rule`.
- rule = None
- #: Keyword arguments from the matched rule.
- rule_args = None
- #: A dictionary for request variables.
- registry = None
-
- def __init__(self, *args, **kwargs):
- super(Request, self).__init__(*args, **kwargs)
- self.registry = {}
-
- @werkzeug.utils.cached_property
- def auth(self):
- """The auth store which provides access to the authenticated user and
- auth related functions.
-
- :returns:
- An auth store instance.
- """
- return self.app.auth_store_class(self)
-
- @werkzeug.utils.cached_property
- def json(self):
- """If the mimetype is `application/json` this will contain the
- parsed JSON data.
-
- This function is borrowed from `Flask`_.
-
- :returns:
- The decoded JSON request data.
- """
- if self.mimetype == 'application/json':
- from tipfy.json import json_decode
- return json_decode(self.data)
-
- @werkzeug.utils.cached_property
- def i18n(self):
- """The internationalization store which provides access to several
- translation and localization utilities.
-
- :returns:
- An i18n store instance.
- """
- return self.app.i18n_store_class(self)
-
- @werkzeug.utils.cached_property
- def session(self):
- """A session dictionary using the default session configuration.
-
- :returns:
- A dictionary-like object with the current session data.
- """
- return self.session_store.get_session()
-
- @werkzeug.utils.cached_property
- def session_store(self):
- """The session store, responsible for managing sessions and flashes.
-
- :returns:
- A session store instance.
- """
- return self.app.session_store_class(self)
-
- def _get_rule_adapter(self):
- from warnings import warn
- warn(DeprecationWarning("Request.url_adapter: this attribute "
- "is deprecated. Use Request.rule_adapter instead."))
- return self.rule_adapter
-
- def _set_rule_adapter(self, adapter):
- from warnings import warn
- warn(DeprecationWarning("Request.url_adapter: this attribute "
- "is deprecated. Use Request.rule_adapter instead."))
- self.rule_adapter = adapter
-
- # Old name
- url_adapter = property(_get_rule_adapter, _set_rule_adapter)
+ """Provides all environment variables for the current request: GET,
POST,
+ FILES, cookies and headers.
+ """
+ #: The WSGI app.
+ app = None
+ #: Exception caught by the WSGI app during dispatch().
+ exception = None
+ #: URL adapter.
+ rule_adapter = None
+ #: Matched :class:`tipfy.routing.Rule`.
+ rule = None
+ #: Keyword arguments from the matched rule.
+ rule_args = None
+ #: A dictionary for request variables.
+ registry = None
+
+ def __init__(self, *args, **kwargs):
+ super(Request, self).__init__(*args, **kwargs)
+ self.registry = {}
+
+ @werkzeug.utils.cached_property
+ def auth(self):
+ """The auth store which provides access to the authenticated user
and
+ auth related functions.
+
+ :returns:
+ An auth store instance.
+ """
+ return self.app.auth_store_class(self)
+
+ @werkzeug.utils.cached_property
+ def json(self):
+ """If the mimetype is `application/json` this will contain the
+ parsed JSON data.
+
+ This function is borrowed from `Flask`_.
+
+ :returns:
+ The decoded JSON request data.
+ """
+ if self.mimetype == 'application/json':
+ from tipfy.json import json_decode
+ return json_decode(self.data)
+
+ @werkzeug.utils.cached_property
+ def i18n(self):
+ """The internationalization store which provides access to several
+ translation and localization utilities.
+
+ :returns:
+ An i18n store instance.
+ """
+ return self.app.i18n_store_class(self)
+
+ @werkzeug.utils.cached_property
+ def session(self):
+ """A session dictionary using the default session configuration.
+
+ :returns:
+ A dictionary-like object with the current session data.
+ """
+ return self.session_store.get_session()
+
+ @werkzeug.utils.cached_property
+ def session_store(self):
+ """The session store, responsible for managing sessions and
flashes.
+
+ :returns:
+ A session store instance.
+ """
+ return self.app.session_store_class(self)
+
+ def _get_rule_adapter(self):
+ from warnings import warn
+ warn(DeprecationWarning("Request.url_adapter: this attribute "
+ "is deprecated. Use Request.rule_adapter instead."))
+ return self.rule_adapter
+
+ def _set_rule_adapter(self, adapter):
+ from warnings import warn
+ warn(DeprecationWarning("Request.url_adapter: this attribute "
+ "is deprecated. Use Request.rule_adapter instead."))
+ self.rule_adapter = adapter
+
+ # Old name
+ url_adapter = property(_get_rule_adapter, _set_rule_adapter)
class Response(werkzeug.wrappers.Response):
- """A response object with default mimetype set to ``text/html``."""
- default_mimetype = 'text/html'
+ """A response object with default mimetype set to ``text/html``."""
+ default_mimetype = 'text/html'
class RequestContext(object):
- """Sets and releases the context locals used during a request.
-
- User meth:`App.get_test_context` to build a `RequestContext` for
- testing purposes.
- """
- def __init__(self, app, environ):
- """Initializes the request context.
-
- :param app:
- An :class:`App` instance.
- :param environ:
- A WSGI environment.
- """
-
self.app = app
- self.environ = environ
-
- def __enter__(self):
- """Enters the request context.
-
- :returns:
- A :class:`Request` instance.
- """
- local.request = request = self.app.request_class(self.environ)
-
local.app =
request.app =
self.app
- return request
-
- def __exit__(self, exc_type, exc_value, traceback):
- """Exits the request context.
-
- This will release the context locals except if an exception is caught
- in debug mode. In this case the locals are kept to be inspected.
- """
- if exc_type is None or not self.app.debug:
- local.__release_local__()
+ """Sets and releases the context locals used during a request.
+
+ User meth:`App.get_test_context` to build a `RequestContext` for
+ testing purposes.
+ """
+ def __init__(self, app, environ):
+ """Initializes the request context.
+
+ :param app:
+ An :class:`App` instance.
+ :param environ:
+ A WSGI environment.
+ """
+
self.app = app
+ self.environ = environ
+
+ def __enter__(self):
+ """Enters the request context.
+
+ :returns:
+ A :class:`Request` instance.
+ """
+ local.request = request = self.app.request_class(self.environ)
+
local.app =
request.app =
self.app
+ return request
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """Exits the request context.
+
+ This will release the context locals except if an exception is
caught
+ in debug mode. In this case the locals are kept to be inspected.
+ """
+ if exc_type is None or not self.app.debug:
+ local.__release_local__()
class App(object):
- """The WSGI application."""
- # Allowed request methods.
- allowed_methods = frozenset(['DELETE', 'GET', 'HEAD', 'OPTIONS', 'POST',
- 'PUT', 'TRACE'])
- #: Default class for requests.
- request_class = Request
- #: Default class for responses.
- response_class = Response
- #: Default class for the configuration object.
- config_class = Config
- #: Default class for the configuration object.
- router_class = Router
- #: Context class used when a request comes in.
- request_context_class = RequestContext
-
- def __init__(self, rules=None, config=None, debug=False):
- """Initializes the application.
-
- :param rules:
- URL rules definitions for the application.
- :param config:
- Dictionary with configuration for the application modules.
- :param debug:
- True if this is debug mode, False otherwise.
- """
- local.current_app = self
- self.debug = debug
- self.registry = {}
- self.error_handlers = {}
- self.config = self.config_class(config, {'tipfy': default_config})
- self.router = self.router_class(self, rules)
-
- if debug:
- logging.getLogger().setLevel(logging.DEBUG)
-
- def __call__(self, environ, start_response):
- """Called when a request comes in."""
- if self.debug and self.config['tipfy']['enable_debugger']:
- return self._debugged_wsgi_app(environ, start_response)
-
- return self.dispatch(environ, start_response)
-
- def dispatch(self, environ, start_response):
- """This is the actual WSGI application. This is not implemented in
- :meth:`__call__` so that middlewares can be applied without losing a
- reference to the class. So instead of doing this::
-
- app = MyMiddleware(app)
-
- It's a better idea to do this instead::
-
- app.dispatch = MyMiddleware(app.dispatch)
-
- Then you still have the original application object around and
- can continue to call methods on it.
-
- This idea comes from `Flask`_.
-
- :param environ:
- A WSGI environment.
- :param start_response:
- A callable accepting a status code, a list of headers and an
- optional exception context to start the response.
- """
- with self.request_context_class(self, environ) as request:
- try:
- if request.method not in self.allowed_methods:
- abort(501)
-
- rv = self.router.dispatch(request)
- response = self.make_response(request, rv)
- except Exception, e:
- try:
- rv = self.handle_exception(request, e)
- response = self.make_response(request, rv)
- except HTTPException, e:
- response = self.make_response(request, e)
- except Exception, e:
- if self.debug:
- raise
-
- logging.exception(e)
- rv = werkzeug.exceptions.InternalServerError()
- response = self.make_response(request, rv)
-
- return response(environ, start_response)
-
- def handle_exception(self, request, exception):
- """Handles an exception. To set app-wide error handlers, define them
- using the corresponent HTTP status code in the ``error_handlers``
- dictionary of :class:`App`. For example, to set a custom
- `Not Found` page::
-
- class Handle404(RequestHandler):
- def __call__(self):
- logging.exception(self.request.exception)
- return Response('Oops! I could swear this page was here!',
- status=404)
-
- app = App([
- Rule('/', handler=MyHandler, name='home'),
- ])
- app.error_handlers[404] = Handle404
-
- When an ``HTTPException`` is raised using :func:`abort` or because the
- app could not fulfill the request, the error handler defined for the
- exception HTTP status code will be called. If it is not set, the
- exception is reraised.
-
- .. note::
- Although being a :class:`RequestHandler`, the error handler will
- execute the ``handle_exception`` method after instantiation, instead
- of the method corresponding to the current request.
-
- Also, the error handler is responsible for setting the response
- status code and logging the exception, as shown in the example
- above.
-
- :param request:
- A :attr:`request_class` instance.
- :param exception:
- The raised exception.
- """
- if isinstance(exception, HTTPException):
- code = exception.code
- else:
- code = 500
-
- handler = self.error_handlers.get(code)
- if not handler:
- raise
-
- request.exception = exception
- rv = handler(request)
- if not isinstance(rv, werkzeug.wrappers.BaseResponse):
- if hasattr(rv, '__call__'):
- # If it is a callable but not a response, we call it again.
- rv = rv()
-
- return rv
-
- def make_response(self, request, *rv):
- """Converts the returned value from a :class:`RequestHandler` to a
- response object that is an instance of :attr:`response_class`.
-
- This function is borrowed from `Flask`_.
-
- :param rv:
- - If no arguments are passed, returns an empty response.
- - If a single argument is passed, the returned value varies
- according to its type:
-
- - :attr:`response_class`: the response is returned unchanged.
- - :class:`str`: a response is created with the string as body.
- - :class:`unicode`: a response is created with the string
- encoded to utf-8 as body.
- - a WSGI function: the function is called as WSGI application
- and buffered as response object.
- - None: a ValueError exception is raised.
-
- - If multiple arguments are passed, a response is created using
- the arguments.
-
- :returns:
- A :attr:`response_class` instance.
- """
- if not rv:
- return self.response_class()
-
- if len(rv) == 1:
- rv = rv[0]
-
- if isinstance(rv, self.response_class):
- return rv
-
- if isinstance(rv, basestring):
- return self.response_class(rv)
-
- if rv is None:
- raise ValueError('RequestHandler did not return a response.')
-
- return self.response_class.force_type(rv, request.environ)
-
- return self.response_class(*rv)
-
- def get_config(self, module, key=None, default=REQUIRED_VALUE):
- """Returns a configuration value for a module.
-
- .. seealso:: :meth:`Config.get_config`.
- """
- from warnings import warn
- warn(DeprecationWarning("App.get_config(): this method "
- "is deprecated. Use App.config['module']['key'] instead."))
- return self.config.get_config(module, key=key, default=default)
-
- def get_test_client(self):
- """Creates a test client for this application.
-
- :returns:
- A ``werkzeug.Client`` with the WSGI application wrapped for tests.
- """
- from werkzeug.test import Client
- return Client(self, self.response_class, use_cookies=True)
-
- def get_test_context(self, *args, **kwargs):
- """Creates a test client for this application.
-
- :param args:
- Positional arguments to construct a `werkzeug.test.EnvironBuilder`.
- :param kwargs:
- Keyword arguments to construct a `werkzeug.test.EnvironBuilder`.
- :returns:
- A :class:``RequestContext`` instance.
- """
- from werkzeug.test import EnvironBuilder
- builder = EnvironBuilder(*args, **kwargs)
- return self.request_context_class(self, builder.get_environ())
-
- def get_test_handler(self, *args, **kwargs):
- """Returns a handler set as a current handler for testing purposes.
-
- .. seealso:: :class:`tipfy.testing.CurrentHandlerContext`.
-
- :returns:
- A :class:`tipfy.testing.CurrentHandlerContext` instance.
- """
- from tipfy.testing import CurrentHandlerContext
- return CurrentHandlerContext(self, *args, **kwargs)
-
- def run(self):
- """Runs the app using ``CGIHandler``. This must be called inside a
- ``main()`` function in the file defined in *app.yaml* to run the
- application::
-
- # ...
-
- app = App(rules=[
- Rule('/', name='home', handler=HelloWorldHandler),
- ])
-
- def main():
- app.run()
-
- if __name__ == '__main__':
- main()
-
- """
- wsgiref.handlers.CGIHandler().run(self)
-
- @werkzeug.utils.cached_property
- def _debugged_wsgi_app(self):
- """Returns the WSGI app wrapped by an interactive debugger."""
- from tipfy.debugger import DebuggedApplication
- return DebuggedApplication(self.dispatch, evalex=True)
-
- @werkzeug.utils.cached_property
- def auth_store_class(self):
- """Returns the configured auth store class.
-
- :returns:
- An auth store class.
- """
- cls = self.config['tipfy']['auth_store_class']
- return werkzeug.utils.import_string(cls)
-
- @werkzeug.utils.cached_property
- def i18n_store_class(self):
- """Returns the configured i18n store class.
-
- :returns:
- An i18n store class.
- """
- cls = self.config['tipfy']['i18n_store_class']
- return werkzeug.utils.import_string(cls)
-
- @werkzeug.utils.cached_property
- def session_store_class(self):
- """Returns the configured session store class.
-
- :returns:
- A session store class.
- """
- cls = self.config['tipfy']['session_store_class']
- return werkzeug.utils.import_string(cls)
-
- # Old names
- wsgi_app = dispatch
+ """The WSGI application."""
+ # Allowed request methods.
+ allowed_methods =
frozenset(['DELETE', 'GET', 'HEAD', 'OPTIONS', 'POST',
+ 'PUT', 'TRACE'])
+ #: Default class for requests.
+ request_class = Request
+ #: Default class for responses.
+ response_class = Response
+ #: Default class for the configuration object.
+ config_class = Config
+ #: Default class for the configuration object.
+ router_class = Router
+ #: Context class used when a request comes in.
+ request_context_class = RequestContext
+
+ def __init__(self, rules=None, config=None, debug=False):
+ """Initializes the application.
+
+ :param rules:
+ URL rules definitions for the application.
+ :param config:
+ Dictionary with configuration for the application modules.
+ :param debug:
+ True if this is debug mode, False otherwise.
+ """
+ local.current_app = self
+ self.debug = debug
+ self.registry = {}
+ self.error_handlers = {}
+ self.config = self.config_class(config, {'tipfy': default_config})
+ self.router = self.router_class(self, rules)
+
+ if debug:
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ def __call__(self, environ, start_response):
+ """Called when a request comes in."""
+ if self.debug and self.config['tipfy']['enable_debugger']:
+ return self._debugged_wsgi_app(environ, start_response)
+
+ return self.dispatch(environ, start_response)
+
+ def dispatch(self, environ, start_response):
+ """This is the actual WSGI application. This is not implemented in
+ :meth:`__call__` so that middlewares can be applied without losing
a
+ reference to the class. So instead of doing this::
+
+ app = MyMiddleware(app)
+
+ It's a better idea to do this instead::
+
+ app.dispatch = MyMiddleware(app.dispatch)
+
+ Then you still have the original application object around and
+ can continue to call methods on it.
+
+ This idea comes from `Flask`_.
+
+ :param environ:
+ A WSGI environment.
+ :param start_response:
+ A callable accepting a status code, a list of headers and an
+ optional exception context to start the response.
+ """
+ with self.request_context_class(self, environ) as request:
+ try:
+ if request.method not in self.allowed_methods:
+ abort(501)
+
+ rv = self.router.dispatch(request)
+ response = self.make_response(request, rv)
+ except Exception, e:
+ try:
+ rv = self.handle_exception(request, e)
+ response = self.make_response(request, rv)
+ except HTTPException, e:
+ response = self.make_response(request, e)
+ except Exception, e:
+ if self.debug:
+ raise
+
+ logging.exception(e)
+ rv = werkzeug.exceptions.InternalServerError()
+ response = self.make_response(request, rv)
+
+ return response(environ, start_response)
+
+ def handle_exception(self, request, exception):
+ """Handles an exception. To set app-wide error handlers, define
them
+ using the corresponent HTTP status code in the ``error_handlers``
+ dictionary of :class:`App`. For example, to set a custom
+ `Not Found` page::
+
+ class Handle404(RequestHandler):
+ def __call__(self):
+ logging.exception(self.request.exception)
+ return Response('Oops! I could swear this page was
here!',
+ status=404)
+
+ app = App([
+ Rule('/', handler=MyHandler, name='home'),
+ ])
+ app.error_handlers[404] = Handle404
+
+ When an ``HTTPException`` is raised using :func:`abort` or because
the
+ app could not fulfill the request, the error handler defined for
the
+ exception HTTP status code will be called. If it is not set, the
+ exception is reraised.
+
+ .. note::
+ Although being a :class:`RequestHandler`, the error handler will
+ execute the ``handle_exception`` method after instantiation,
instead
+ of the method corresponding to the current request.
+
+ Also, the error handler is responsible for setting the response
+ status code and logging the exception, as shown in the example
+ above.
+
+ :param request:
+ A :attr:`request_class` instance.
+ :param exception:
+ The raised exception.
+ """
+ if isinstance(exception, HTTPException):
+ code = exception.code
+ else:
+ code = 500
+
+ handler = self.error_handlers.get(code)
+ if not handler:
+ raise
+
+ request.exception = exception
+ rv = handler(request)
+ if not isinstance(rv, werkzeug.wrappers.BaseResponse):
+ if hasattr(rv, '__call__'):
+ # If it is a callable but not a response, we call it again.
+ rv = rv()
+
+ return rv
+
+ def make_response(self, request, *rv):
+ """Converts the returned value from a :class:`RequestHandler` to a
+ response object that is an instance of :attr:`response_class`.
+
+ This function is borrowed from `Flask`_.
+
+ :param rv:
+ - If no arguments are passed, returns an empty response.
+ - If a single argument is passed, the returned value varies
+ according to its type:
+
+ - :attr:`response_class`: the response is returned unchanged.
+ - :class:`str`: a response is created with the string as
body.
+ - :class:`unicode`: a response is created with the string
+ encoded to utf-8 as body.
+ - a WSGI function: the function is called as WSGI application
+ and buffered as response object.
+ - None: a ValueError exception is raised.
+
+ - If multiple arguments are passed, a response is created using
+ the arguments.
+
+ :returns:
+ A :attr:`response_class` instance.
+ """
+ if not rv:
+ return self.response_class()
+
+ if len(rv) == 1:
+ rv = rv[0]
+
+ if isinstance(rv, self.response_class):
+ return rv
+
+ if isinstance(rv, basestring):
+ return self.response_class(rv)
+
+ if rv is None:
+ raise ValueError('RequestHandler did not return a
response.')
+
+ return self.response_class.force_type(rv, request.environ)
+
+ return self.response_class(*rv)
+
+ def get_config(self, module, key=None, default=REQUIRED_VALUE):
+ """Returns a configuration value for a module.
+
+ .. seealso:: :meth:`Config.get_config`.
+ """
+ from warnings import warn
+ warn(DeprecationWarning("App.get_config(): this method "
+ "is deprecated. Use App.config['module']['key'] instead."))
+ return self.config.get_config(module, key=key, default=default)
+
+ def get_test_client(self):
+ """Creates a test client for this application.
+
+ :returns:
+ A ``werkzeug.Client`` with the WSGI application wrapped for
tests.
+ """
+ from werkzeug.test import Client
+ return Client(self, self.response_class, use_cookies=True)
+
+ def get_test_context(self, *args, **kwargs):
+ """Creates a test client for this application.
+
+ :param args:
+ Positional arguments to construct a
`werkzeug.test.EnvironBuilder`.
+ :param kwargs:
+ Keyword arguments to construct a
`werkzeug.test.EnvironBuilder`.
+ :returns:
+ A :class:``RequestContext`` instance.
+ """
+ from werkzeug.test import EnvironBuilder
+ builder = EnvironBuilder(*args, **kwargs)
+ return self.request_context_class(self, builder.get_environ())
+
+ def get_test_handler(self, *args, **kwargs):
+ """Returns a handler set as a current handler for testing purposes.
+
+ .. seealso:: :class:`tipfy.testing.CurrentHandlerContext`.
+
+ :returns:
+ A :class:`tipfy.testing.CurrentHandlerContext` instance.
+ """
+ from tipfy.testing import CurrentHandlerContext
+ return CurrentHandlerContext(self, *args, **kwargs)
+
+ def run(self):
+ """Runs the app using ``CGIHandler``. This must be called inside a
+ ``main()`` function in the file defined in *app.yaml* to run the
+ application::
+
+ # ...
+
+ app = App(rules=[
+ Rule('/', name='home', handler=HelloWorldHandler),
+ ])
+
+ def main():
+ app.run()
+
+ if __name__ == '__main__':
+ main()
+
+ """
+ wsgiref.handlers.CGIHandler().run(self)
+
+ @werkzeug.utils.cached_property
+ def _debugged_wsgi_app(self):
+ """Returns the WSGI app wrapped by an interactive debugger."""
+ from tipfy.debugger import DebuggedApplication
+ return DebuggedApplication(self.dispatch, evalex=True)
+
+ @werkzeug.utils.cached_property
+ def auth_store_class(self):
+ """Returns the configured auth store class.
+
+ :returns:
+ An auth store class.
+ """
+ cls = self.config['tipfy']['auth_store_class']
+ return werkzeug.utils.import_string(cls)
+
+ @werkzeug.utils.cached_property
+ def i18n_store_class(self):
+ """Returns the configured i18n store class.
+
+ :returns:
+ An i18n store class.
+ """
+ cls = self.config['tipfy']['i18n_store_class']
+ return werkzeug.utils.import_string(cls)
+
+ @werkzeug.utils.cached_property
+ def session_store_class(self):
+ """Returns the configured session store class.
+
+ :returns:
+ A session store class.
+ """
+ cls = self.config['tipfy']['session_store_class']
+ return werkzeug.utils.import_string(cls)
+
+ # Old names
+ wsgi_app = dispatch
def redirect(location, code=302, response_class=Response, body=None):
- """Returns a response object that redirects to the given location.
-
- Supported codes are 301, 302, 303, 305, and 307. 300 is not supported
- because it's not a real redirect and 304 because it's the answer for a
- request with a request with defined If-Modified-Since headers.
-
- :param location:
- A relative or absolute URI (e.g., '/contact'). If relative, it
- will be merged to the current request URL to form an absolute URL.
- :param code:
- The HTTP status code for the redirect. Default is 302.
- :param response_class:
- The class used to build the response. Default is :class:`Response`.
- :body:
- The response body. If not set uses a body with a standard message.
- :returns:
- A :class:`Response` object with headers set for redirection.
- """
- assert code in (301, 302, 303, 305, 307), 'invalid code'
-
- if location.startswith(('.', '/')):
- # Make it absolute.
- location = urlparse.urljoin(get_request().url, location)
-
- display_location = location
- if isinstance(location, unicode):
- location = werkzeug.urls.iri_to_uri(location)
-
- if body is None:
- body = '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n' \
- '<title>Redirecting...</title>\n<h1>Redirecting...</h1>\n' \
- '<p>You should be redirected automatically to target URL: ' \
- '<a href="%s">%s</a>. If not click the link.' % \
- (location, display_location)
-
- response = response_class(body, code, mimetype='text/html')
- response.headers['Location'] = location
- return response
+ """Returns a response object that redirects to the given location.
+
+ Supported codes are 301, 302, 303, 305, and 307. 300 is not supported
+ because it's not a real redirect and 304 because it's the answer for a
+ request with a request with defined If-Modified-Since headers.
+
+ :param location:
+ A relative or absolute URI (e.g., '/contact'). If relative, it
+ will be merged to the current request URL to form an absolute URL.
+ :param code:
+ The HTTP status code for the redirect. Default is 302.
+ :param response_class:
+ The class used to build the response. Default is :class:`Response`.
+ :body:
+ The response body. If not set uses a body with a standard message.
+ :returns:
+ A :class:`Response` object with headers set for redirection.
+ """
+ assert code in (301, 302, 303, 305, 307), 'invalid code'
+
+ if location.startswith(('.', '/')):
+ # Make it absolute.
+ location = urlparse.urljoin(get_request().url, location)
+
+ display_location = location
+ if isinstance(location, unicode):
+ location = werkzeug.urls.iri_to_uri(location)
+
+ if body is None:
+ body = '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n'
\
+ '<title>Redirecting...</title>\n<h1>Redirecting...</h1>\n' \
+ '<p>You should be redirected automatically to target URL: ' \
+ '<a href="%s">%s</a>. If not click the link.' % \
+ (location, display_location)
+
+ response = response_class(body, code, mimetype='text/html')
+ response.headers['Location'] = location
+ return response
# Old names.
=======================================
--- /tipfy/utils.py Sun Apr 3 07:14:04 2011
+++ /tipfy/utils.py Sun Apr 3 07:55:49 2011
@@ -28,133 +28,133 @@
def xhtml_escape(value):
- """Escapes a string so it is valid within XML or XHTML.
-
- :param value:
- The value to be escaped.
- :returns:
- The escaped value.
- """
- return utf8(xml.sax.saxutils.escape(value, {'"': """}))
+ """Escapes a string so it is valid within XML or XHTML.
+
+ :param value:
+ The value to be escaped.
+ :returns:
+ The escaped value.
+ """
+ return utf8(xml.sax.saxutils.escape(value, {'"': """}))
def xhtml_unescape(value):
- """Un-escapes an XML-escaped string.
-
- :param value:
- The value to be un-escaped.
- :returns:
- The un-escaped value.
- """
- return re.sub(r"&(#?)(\w+?);", _convert_entity, _unicode(value))
+ """Un-escapes an XML-escaped string.
+
+ :param value:
+ The value to be un-escaped.
+ :returns:
+ The un-escaped value.
+ """
+ return re.sub(r"&(#?)(\w+?);", _convert_entity, _unicode(value))
def render_json_response(*args, **kwargs):
- """Renders a JSON response.
-
- :param args:
- Arguments to be passed to json_encode().
- :param kwargs:
- Keyword arguments to be passed to json_encode().
- :returns:
- A :class:`Response` object with a JSON string in the body and
- mimetype set to ``application/json``.
- """
- return get_request().app.response_class(json_encode(*args, **kwargs),
- mimetype='application/json')
+ """Renders a JSON response.
+
+ :param args:
+ Arguments to be passed to json_encode().
+ :param kwargs:
+ Keyword arguments to be passed to json_encode().
+ :returns:
+ A :class:`Response` object with a JSON string in the body and
+ mimetype set to ``application/json``.
+ """
+ return get_request().app.response_class(json_encode(*args, **kwargs),
+ mimetype='application/json')
def squeeze(value):
- """Replace all sequences of whitespace chars with a single space."""
- return re.sub(r"[\x00-\x20]+", " ", value).strip()
+ """Replace all sequences of whitespace chars with a single space."""
+ return re.sub(r"[\x00-\x20]+", " ", value).strip()
def url_escape(value):
- """Returns a valid URL-encoded version of the given value."""
- return urllib.quote_plus(utf8(value))
+ """Returns a valid URL-encoded version of the given value."""
+ return urllib.quote_plus(utf8(value))
def url_unescape(value):
- """Decodes the given value from a URL."""
- return _unicode(urllib.unquote_plus(value))
+ """Decodes the given value from a URL."""
+ return _unicode(urllib.unquote_plus(value))
def utf8(value):
- """Encodes a unicode value to UTF-8 if not yet encoded.
-
- :param value:
- Value to be encoded.
- :returns:
- An encoded string.
- """
- if isinstance(value, unicode):
- return value.encode("utf-8")
-
- assert isinstance(value, str)
- return value
+ """Encodes a unicode value to UTF-8 if not yet encoded.
+
+ :param value:
+ Value to be encoded.
+ :returns:
+ An encoded string.
+ """
+ if isinstance(value, unicode):
+ return value.encode("utf-8")
+
+ assert isinstance(value, str)
+ return value
def _unicode(value):
- """Encodes a string value to unicode if not yet decoded.
-
- :param value:
- Value to be decoded.
- :returns:
- A decoded string.
- """
- if isinstance(value, str):
- return value.decode("utf-8")
-
- assert isinstance(value, unicode)
- return value
+ """Encodes a string value to unicode if not yet decoded.
+
+ :param value:
+ Value to be decoded.
+ :returns:
+ A decoded string.
+ """
+ if isinstance(value, str):
+ return value.decode("utf-8")
+
+ assert isinstance(value, unicode)
+ return value
def _convert_entity(m):
- if m.group(1) == "#":
- try:
- return unichr(int(m.group(2)))
- except ValueError:
- return "&#%s;" % m.group(2)
- try:
- return _HTML_UNICODE_MAP[m.group(2)]
- except KeyError:
- return "&%s;" % m.group(2)
+ if m.group(1) == "#":
+ try:
+ return unichr(int(m.group(2)))
+ except ValueError:
+ return "&#%s;" % m.group(2)
+ try:
+ return _HTML_UNICODE_MAP[m.group(2)]
+ except KeyError:
+ return "&%s;" % m.group(2)
def _build_unicode_map():
- return dict((name, unichr(value)) for \
- name, value in htmlentitydefs.name2codepoint.iteritems())
+ return dict((name, unichr(value)) for \
+ name, value in htmlentitydefs.name2codepoint.iteritems())
def slugify(value, max_length=None, default=None):
- """Converts a string to slug format (all lowercase, words separated by
- dashes).
-
- :param value:
- The string to be slugified.
- :param max_length:
- An integer to restrict the resulting string to a maximum length.
- Words are not broken when restricting length.
- :param default:
- A default value in case the resulting string is empty.
- :returns:
- A slugified string.
- """
- value = _unicode(value)
- s = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').lower()
- s = re.sub('-+', '-', re.sub('[^a-zA-Z0-9-]+', '-', s)).strip('-')
- if not s:
- return default
-
- if max_length:
- # Restrict length without breaking words.
- while len(s) > max_length:
- if s.find('-') == -1:
- s = s[:max_length]
- else:
- s = s.rsplit('-', 1)[0]
-
- return s
+ """Converts a string to slug format (all lowercase, words separated by
+ dashes).
+
+ :param value:
+ The string to be slugified.
+ :param max_length:
+ An integer to restrict the resulting string to a maximum length.
+ Words are not broken when restricting length.
+ :param default:
+ A default value in case the resulting string is empty.
+ :returns:
+ A slugified string.
+ """
+ value = _unicode(value)
+ s = unicodedata.normalize('NFKD',
value).encode('ascii', 'ignore').lower()
+ s = re.sub('-+', '-', re.sub('[^a-zA-Z0-9-]+', '-', s)).strip('-')
+ if not s:
+ return default
+
+ if max_length:
+ # Restrict length without breaking words.
+ while len(s) > max_length:
+ if s.find('-') == -1:
+ s = s[:max_length]
+ else:
+ s = s.rsplit('-', 1)[0]
+
+ return s
_HTML_UNICODE_MAP = _build_unicode_map()