[tipfy] push by rodrigo.moraes - Moved from a handler-centric approach to a request-centric, to accommo... on 2011-04-03 14:16 GMT

1 view
Skip to first unread message

ti...@googlecode.com

unread,
Apr 3, 2011, 10:17:50 AM4/3/11
to tipfy-...@googlegroups.com
Revision: 1e6ed30aa111
Author: Rodrigo Moraes <rodrigo...@gmail.com>
Date: Sun Apr 3 07:14:04 2011
Log: Moved from a handler-centric approach to a request-centric, to
accommodate view functions. High level usage remains the same. Added
run_tests.py.
http://code.google.com/p/tipfy/source/detail?r=1e6ed30aa111

Added:
/run_tests.py
Modified:
/Makefile
/tests/auth_test.py
/tests/ext_jinja2_test.py
/tests/ext_mako_test.py
/tests/gae_acl_test.py
/tests/gae_sharded_counter_test.py
/tests/i18n_test.py
/tests/sessions_test.py
/tests/utils_test.py
/tipfy/app.py
/tipfy/appengine/acl.py
/tipfy/appengine/sharded_counter.py
/tipfy/auth/__init__.py
/tipfy/handler.py
/tipfy/i18n.py
/tipfy/routing.py
/tipfy/sessions.py
/tipfy/utils.py
/tipfyext/jinja2/__init__.py

=======================================
--- /dev/null
+++ /run_tests.py Sun Apr 3 07:14:04 2011
@@ -0,0 +1,40 @@
+import os
+import sys
+import unittest
+
+gae_path = '/usr/local/google_appengine'
+
+current_path = os.path.abspath(os.path.dirname(__file__))
+tests_path = os.path.join(current_path, 'tests')
+sys.path[0:0] = [
+ tests_path,
+ gae_path,
+ os.path.join(gae_path, 'lib', 'django_0_96'),
+ os.path.join(gae_path, 'lib', 'webob'),
+ os.path.join(gae_path, 'lib', 'yaml', 'lib'),
+]
+
+all_tests = [f[:-8] for f in os.listdir(tests_path) if
f.endswith('_test.py')]
+
+def get_suite(tests):
+ tests = sorted(tests)
+ suite = unittest.TestSuite()
+ loader = unittest.TestLoader()
+ for test in tests:
+ suite.addTest(loader.loadTestsFromName(test))
+ return suite
+
+if __name__ == '__main__':
+ # To run all tests:
+ # $ python run_tests.py
+ # To run a single test:
+ # $ python run_tests.py app
+ # To run a couple of tests:
+ # $ python run_tests.py app config sessions
+ tests = sys.argv[1:]
+ if not tests:
+ tests = all_tests
+ tests = ['%s_test' % t for t in tests]
+ suite = get_suite(tests)
+ unittest.TextTestRunner(verbosity=2).run(suite)
+
=======================================
--- /Makefile Sat Apr 2 14:59:44 2011
+++ /Makefile Sun Apr 3 07:14:04 2011
@@ -12,6 +12,7 @@
PORT= 8080
ADDRESS=localhost
PYTHON= python -Wignore
+failed=0

define run_test
PYTHONPATH=$(GAEPATH):. $(PYTHON) -m tests.$1_test $(FLAGS)
@@ -19,10 +20,10 @@

test:
for i in $(TESTS); \
- do \
- echo $$i; \
- PYTHONPATH=$(GAEPATH):. $(PYTHON) -m tests.`basename $$i .py` $(FLAGS);
\
- done
+ do \
+ echo $$i; \
+ PYTHONPATH=$(GAEPATH):. $(PYTHON) -m tests.`basename $$i .py`
$(FLAGS); \
+ done

# 'app', 'auth', 'config', 'dev', 'ext_jinja2', 'ext_mako', 'gae_acl',
# 'gae_blobstore', 'gae_db', 'gae_mail', 'gae_sharded_counter',
=======================================
--- /tests/auth_test.py Sat Apr 2 13:52:07 2011
+++ /tests/auth_test.py Sun Apr 3 07:14:04 2011
@@ -1,3 +1,5 @@
+from __future__ import with_statement
+
import os
import unittest

@@ -60,39 +62,34 @@
app = get_app()
app.router.add(Rule('/', name='home', handler=HomeHandler))

- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
- app.router.match(request)
-
- store = AuthStore(local.current_handler)
- self.assertEqual(store.login_url(),
local.current_handler.url_for('auth/login', redirect='/'))
-
- tipfy.auth.DEV_APPSERVER_APPSERVER = False
- store.config['secure_urls'] = True
- self.assertEqual(store.login_url(),
local.current_handler.url_for('auth/login', redirect='/', _scheme='https'))
- tipfy.auth.DEV_APPSERVER_APPSERVER = True
+ with app.get_test_context() as request:
+ request.app.router.match(request)
+
+ store = AuthStore(request)
+ self.assertEqual(store.login_url(),
request.app.router.url_for(request, 'auth/login', dict(redirect='/')))
+
+ tipfy.auth.DEV_APPSERVER_APPSERVER = False
+ store.config['secure_urls'] = True
+ self.assertEqual(store.login_url(),
request.app.router.url_for(request, 'auth/login', dict(redirect='/',
_scheme='https')))
+ tipfy.auth.DEV_APPSERVER_APPSERVER = True

def test_logout_url(self):
app = get_app()
app.router.add(Rule('/', name='home', handler=HomeHandler))

- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
- app.router.match(request)
-
- store = AuthStore(local.current_handler)
- self.assertEqual(store.logout_url(),
local.current_handler.url_for('auth/logout', redirect='/'))
+ with app.get_test_context() as request:
+ request.app.router.match(request)
+ store = AuthStore(request)
+ self.assertEqual(store.logout_url(),
request.app.router.url_for(request, 'auth/logout', dict(redirect='/')))

def test_signup_url(self):
app = get_app()
app.router.add(Rule('/', name='home', handler=HomeHandler))

- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
- app.router.match(request)
-
- store = AuthStore(local.current_handler)
- self.assertEqual(store.signup_url(),
local.current_handler.url_for('auth/signup', redirect='/'))
+ with app.get_test_context() as request:
+ request.app.router.match(request)
+ store = AuthStore(request)
+ self.assertEqual(store.signup_url(),
request.app.router.url_for(request, 'auth/signup', dict(redirect='/')))


class TestMiddleware(test_utils.BaseTestCase):
@@ -615,122 +612,98 @@
app = Tipfy(config={'tipfy.sessions': {
'secret_key': 'secret',
}})
- local.current_handler = RequestHandler(app, Request.from_values('/'))
return app

def test_login_with_form_invalid(self):
- app = self.get_app()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = MultiAuthStore(local.current_handler)
- res = store.login_with_form('foo', 'bar', remember=True)
-
- self.assertEqual(res, False)
+ with self.get_app().get_test_context() as request:
+ store = MultiAuthStore(request)
+ res = store.login_with_form('foo', 'bar', remember=True)
+ self.assertEqual(res, False)

def test_login_with_form(self):
user = User.create('foo', 'foo_id', password='bar')
- app = self.get_app()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = MultiAuthStore(local.current_handler)
- res = store.login_with_form('foo', 'bar', remember=True)
- self.assertEqual(res, True)
-
- res = store.login_with_form('foo', 'bar', remember=False)
- self.assertEqual(res, True)
+ with self.get_app().get_test_context() as request:
+ store = MultiAuthStore(request)
+ res = store.login_with_form('foo', 'bar', remember=True)
+ self.assertEqual(res, True)
+
+ res = store.login_with_form('foo', 'bar', remember=False)
+ self.assertEqual(res, True)

def test_login_with_auth_id(self):
- app = self.get_app()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = MultiAuthStore(local.current_handler)
- store.login_with_auth_id('foo_id', remember=False)
-
- user = User.create('foo', 'foo_id', password='bar')
- app = self.get_app()
- store.login_with_auth_id('foo_id', remember=True)
+ with self.get_app().get_test_context() as request:
+ store = MultiAuthStore(request)
+ store.login_with_auth_id('foo_id', remember=False)
+
+ user = User.create('foo', 'foo_id', password='bar')
+ app = self.get_app()
+ store.login_with_auth_id('foo_id', remember=True)

def test_real_login(self):
user = User.create('foo', 'foo_id', auth_remember=True)
- app = self.get_app()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = MultiAuthStore(local.current_handler)
- store.login_with_auth_id('foo_id', remember=False)
-
- response = Response()
- local.current_handler.session_store.save(response)
-
- request = Request.from_values('/', headers={
+ with self.get_app().get_test_context() as request:
+ store = MultiAuthStore(request)
+ store.login_with_auth_id('foo_id', remember=False)
+
+ response = Response()
+ request.session_store.save(response)
+
+ with self.get_app().get_test_context('/', headers={
'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- local.current_handler = RequestHandler(app, request)
- store = MultiAuthStore(local.current_handler)
- self.assertNotEqual(store.user, None)
- self.assertEqual(store.user.username, 'foo')
- self.assertEqual(store.user.auth_id, 'foo_id')
+ }) as request:
+ store = MultiAuthStore(request)
+ self.assertNotEqual(store.user, None)
+ self.assertEqual(store.user.username, 'foo')
+ self.assertEqual(store.user.auth_id, 'foo_id')

def test_real_logout(self):
user = User.create('foo', 'foo_id', auth_remember=True)
- app = self.get_app()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = MultiAuthStore(local.current_handler)
- store.login_with_auth_id('foo_id', remember=False)
-
- response = Response()
- local.current_handler.session_store.save(response)
-
- request = Request.from_values('/', headers={
+ with self.get_app().get_test_context() as request:
+ store = MultiAuthStore(request)
+ store.login_with_auth_id('foo_id', remember=False)
+
+ response = Response()
+ request.session_store.save(response)
+
+ with self.get_app().get_test_context('/', headers={
'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- local.current_handler = RequestHandler(app, request)
- store = MultiAuthStore(local.current_handler)
- self.assertNotEqual(store.user, None)
- self.assertEqual(store.user.username, 'foo')
- store.logout()
-
- response = Response()
- local.current_handler.session_store.save(response)
-
- request = Request.from_values('/', headers={
+ }) as request:
+ store = MultiAuthStore(request)
+ self.assertNotEqual(store.user, None)
+ self.assertEqual(store.user.username, 'foo')
+ store.logout()
+
+ response = Response()
+ request.session_store.save(response)
+
+ with self.get_app().get_test_context('/', headers={
'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- local.current_handler = RequestHandler(app, request)
- store = MultiAuthStore(local.current_handler)
-
- #self.assertEqual(store.user, None)
- self.assertEqual(store.session, None)
+ }) as request:
+ store = MultiAuthStore(request)
+ self.assertEqual(store.session, None)

def test_real_login_no_user(self):
- app = self.get_app()
- store = MultiAuthStore(local.current_handler)
- user = store.create_user('foo', 'foo_id')
- store.login_with_auth_id('foo_id', remember=False)
-
- response = Response()
- local.current_handler.session_store.save(response)
-
- user.delete()
-
- request = Request.from_values('/', headers={
+ with self.get_app().get_test_context() as request:
+ store = MultiAuthStore(request)
+ user = store.create_user('foo', 'foo_id')
+ store.login_with_auth_id('foo_id', remember=False)
+
+ response = Response()
+ request.session_store.save(response)
+ user.delete()
+
+ with self.get_app().get_test_context('/', headers={
'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- local.current_handler = RequestHandler(app, request)
- store = MultiAuthStore(local.current_handler)
- self.assertEqual(store.session['id'], 'foo_id')
- self.assertEqual(store.user, None)
+ }) as request:
+ store = MultiAuthStore(request)
+ self.assertEqual(store.session['id'], 'foo_id')
+ self.assertEqual(store.user, None)

def test_real_login_invalid(self):
- app = self.get_app()
- store = MultiAuthStore(local.current_handler)
- self.assertEqual(store.user, None)
- self.assertEqual(store.session, None)
+ with self.get_app().get_test_context() as request:
+ store = MultiAuthStore(request)
+ self.assertEqual(store.user, None)
+ self.assertEqual(store.session, None)


if __name__ == '__main__':
=======================================
--- /tests/ext_jinja2_test.py Sat Apr 2 13:52:07 2011
+++ /tests/ext_jinja2_test.py Sun Apr 3 07:14:04 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfyext.jinja2
+ Tests for tipfyext.jinja2
"""
import os
import sys
@@ -20,199 +20,206 @@


class TestJinja2(test_utils.BaseTestCase):
- def test_render_template(self):
- app = Tipfy(config={'tipfyext.jinja2': {'templates_dir': templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, World!'
- res = jinja2.render_template(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(res, message)
-
- def test_render_template_with_i18n(self):
- app = Tipfy(config={
- 'tipfyext.jinja2': {
- 'templates_dir': templates_dir,
- 'environment_args': dict(
- autoescape=True,
-
extensions=['jinja2.ext.autoescape', 'jinja2.ext.with_', 'jinja2.ext.i18n'],
- ),
- },
- 'tipfy.sessions': {
- 'secret_key': 'secret',
- },
- })
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, i18n World!'
- res = jinja2.render_template(local.current_handler, 'template2.html',
message=message)
- self.assertEqual(res, message)
-
- def test_render_response(self):
- app = Tipfy(config={'tipfyext.jinja2': {'templates_dir': templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, World!'
- response =
jinja2.render_response(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'text/html')
- self.assertEqual(response.data, message)
-
- def test_render_response_force_compiled(self):
- app = Tipfy(config={
- 'tipfyext.jinja2': {
- 'templates_compiled_target': templates_compiled_target,
- 'force_use_compiled': True,
- }
- }, debug=False)
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, World!'
- response =
jinja2.render_response(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'text/html')
- self.assertEqual(response.data, message)
-
- def test_jinja2_mixin_render_template(self):
- class MyHandler(RequestHandler, Jinja2Mixin):
- def __init__(self, app, request):
- self.app = app
- self.request = request
- self.context = {}
-
- app = Tipfy(config={'tipfyext.jinja2': {'templates_dir': templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = MyHandler(app, request)
- jinja2 = Jinja2(app)
- message = 'Hello, World!'
-
- response = handler.render_template('template1.html', message=message)
- self.assertEqual(response, message)
-
- def test_jinja2_mixin_render_response(self):
- class MyHandler(RequestHandler, Jinja2Mixin):
- def __init__(self, app, request):
- self.app = app
- self.request = request
- self.context = {}
-
- app = Tipfy(config={'tipfyext.jinja2': {'templates_dir': templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = MyHandler(app, request)
- jinja2 = Jinja2(app)
- message = 'Hello, World!'
-
- response = handler.render_response('template1.html', message=message)
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'text/html')
- self.assertEqual(response.data, message)
-
- def test_get_template_attribute(self):
- app = Tipfy(config={'tipfyext.jinja2': {'templates_dir': templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- hello = jinja2.get_template_attribute('hello.html', 'hello')
- self.assertEqual(hello('World'), 'Hello, World!')
-
- def test_engine_factory(self):
- def get_jinja2_env():
- app = local.current_handler.app
- cfg = app.get_config('tipfyext.jinja2')
-
- loader = FileSystemLoader(cfg.get( 'templates_dir'))
-
- return Environment(loader=loader)
-
- app = Tipfy(config={'tipfyext.jinja2': {
- 'templates_dir': templates_dir,
- 'engine_factory': get_jinja2_env,
- }})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, World!'
- res = jinja2.render_template(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(res, message)
-
- def test_engine_factory2(self):
- old_sys_path = sys.path[:]
- sys.path.insert(0, current_dir)
-
- app = Tipfy(config={'tipfyext.jinja2': {
- 'templates_dir': templates_dir,
- 'engine_factory': 'resources.get_jinja2_env',
- }})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, World!'
- res = jinja2.render_template(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(res, message)
-
- sys.path = old_sys_path
-
- def test_engine_factory3(self):
- app = Tipfy()
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- _globals = {'message': 'Hey there!'}
- filters = {'ho': lambda e: e + ' Ho!'}
- jinja2 = Jinja2(app, _globals=_globals, filters=filters)
-
- template = jinja2.environment.from_string("""{{ message|ho }}""")
-
- self.assertEqual(template.render(), 'Hey there! Ho!')
-
- def test_after_environment_created(self):
- def after_creation(environment):
- environment.filters['ho'] = lambda x: x + ', Ho!'
-
- app = Tipfy(config={'tipfyext.jinja2': {'after_environment_created':
after_creation}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- template = jinja2.environment.from_string("""{{ 'Hey'|ho }}""")
- self.assertEqual(template.render(), 'Hey, Ho!')
-
- def test_after_environment_created_using_string(self):
- app = Tipfy(config={'tipfyext.jinja2':
{'after_environment_created': 'resources.jinja2_after_environment_created.after_creation'}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- template = jinja2.environment.from_string("""{{ 'Hey'|ho }}""")
- self.assertEqual(template.render(), 'Hey, Ho!')
-
- def test_translations(self):
- app = Tipfy(config={
- 'tipfyext.jinja2': {
- 'environment_args': {
- 'extensions': ['jinja2.ext.i18n',],
- },
- },
- 'tipfy.sessions': {
- 'secret_key': 'foo',
- },
- })
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- template = jinja2.environment.from_string("""{{ _('foo = %(bar)s',
bar='foo') }}""")
- self.assertEqual(template.render(), 'foo = foo')
+ def test_render_template(self):
+ app = Tipfy(config={'tipfyext.jinja2': {'templates_dir':
templates_dir}})
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, World!'
+ res = jinja2.render_template(handler, 'template1.html',
message=message)
+ self.assertEqual(res, message)
+
+ def test_render_template_with_i18n(self):
+ app = Tipfy(config={
+ 'tipfyext.jinja2': {
+ 'templates_dir': templates_dir,
+ 'environment_args': dict(
+ autoescape=True,
+
extensions=['jinja2.ext.autoescape', 'jinja2.ext.with_', 'jinja2.ext.i18n'],
+ ),
+ },
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ },
+ })
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, i18n World!'
+ res = jinja2.render_template(handler, 'template2.html',
message=message)
+ self.assertEqual(res, message)
+
+ def test_render_response(self):
+ app = Tipfy(config={'tipfyext.jinja2': {'templates_dir':
templates_dir}})
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, World!'
+ response = jinja2.render_response(handler, 'template1.html',
message=message)
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'text/html')
+ self.assertEqual(response.data, message)
+
+ def test_render_response_force_compiled(self):
+ app = Tipfy(config={
+ 'tipfyext.jinja2': {
+ 'templates_compiled_target': templates_compiled_target,
+ 'force_use_compiled': True,
+ }
+ }, debug=False)
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, World!'
+ response = jinja2.render_response(handler, 'template1.html',
message=message)
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'text/html')
+ self.assertEqual(response.data, message)
+
+ def test_jinja2_mixin_render_template(self):
+ class MyHandler(RequestHandler, Jinja2Mixin):
+ pass
+
+ app = Tipfy(config={'tipfyext.jinja2': {'templates_dir':
templates_dir}})
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = MyHandler(local.request)
+ jinja2 = Jinja2(app)
+ message = 'Hello, World!'
+
+ response = handler.render_template('template1.html',
message=message)
+ self.assertEqual(response, message)
+
+ def test_jinja2_mixin_render_response(self):
+ class MyHandler(RequestHandler, Jinja2Mixin):
+ pass
+
+ app = Tipfy(config={'tipfyext.jinja2': {'templates_dir':
templates_dir}})
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = MyHandler(local.request)
+ jinja2 = Jinja2(app)
+ message = 'Hello, World!'
+
+ response = handler.render_response('template1.html',
message=message)
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'text/html')
+ self.assertEqual(response.data, message)
+
+ def test_get_template_attribute(self):
+ app = Tipfy(config={'tipfyext.jinja2': {'templates_dir':
templates_dir}})
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ jinja2 = Jinja2(app)
+
+ hello = jinja2.get_template_attribute('hello.html', 'hello')
+ self.assertEqual(hello('World'), 'Hello, World!')
+
+ def test_engine_factory(self):
+ def get_jinja2_env():
+ app = handler.app
+ cfg = app.get_config('tipfyext.jinja2')
+
+ loader = FileSystemLoader(cfg.get( 'templates_dir'))
+
+ return Environment(loader=loader)
+
+ app = Tipfy(config={'tipfyext.jinja2': {
+ 'templates_dir': templates_dir,
+ 'engine_factory': get_jinja2_env,
+ }})
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, World!'
+ res = jinja2.render_template(handler, 'template1.html',
message=message)
+ self.assertEqual(res, message)
+
+ def test_engine_factory2(self):
+ old_sys_path = sys.path[:]
+ sys.path.insert(0, current_dir)
+
+ app = Tipfy(config={'tipfyext.jinja2': {
+ 'templates_dir': templates_dir,
+ 'engine_factory': 'resources.get_jinja2_env',
+ }})
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, World!'
+ res = jinja2.render_template(handler, 'template1.html',
message=message)
+ self.assertEqual(res, message)
+
+ sys.path = old_sys_path
+
+ def test_engine_factory3(self):
+ app = Tipfy()
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ _globals = {'message': 'Hey there!'}
+ filters = {'ho': lambda e: e + ' Ho!'}
+ jinja2 = Jinja2(app, _globals=_globals, filters=filters)
+
+ template = jinja2.environment.from_string("""{{ message|ho }}""")
+
+ self.assertEqual(template.render(), 'Hey there! Ho!')
+
+ def test_after_environment_created(self):
+ def after_creation(environment):
+ environment.filters['ho'] = lambda x: x + ', Ho!'
+
+ app = Tipfy(config={'tipfyext.jinja2':
{'after_environment_created': after_creation}})
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ jinja2 = Jinja2(app)
+
+ template = jinja2.environment.from_string("""{{ 'Hey'|ho }}""")
+ self.assertEqual(template.render(), 'Hey, Ho!')
+
+ def test_after_environment_created_using_string(self):
+ app = Tipfy(config={'tipfyext.jinja2':
{'after_environment_created': 'resources.jinja2_after_environment_created.after_creation'}})
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ jinja2 = Jinja2(app)
+
+ template = jinja2.environment.from_string("""{{ 'Hey'|ho }}""")
+ self.assertEqual(template.render(), 'Hey, Ho!')
+
+ def test_translations(self):
+ app = Tipfy(config={
+ 'tipfyext.jinja2': {
+ 'environment_args': {
+ 'extensions': ['jinja2.ext.i18n',],
+ },
+ },
+ 'tipfy.sessions': {
+ 'secret_key': 'foo',
+ },
+ })
+ local.request = Request.from_values()
+ local.request.app = app
+ handler = RequestHandler(local.request)
+ jinja2 = Jinja2(app)
+
+ template = jinja2.environment.from_string("""{{ _('foo = %(bar)s',
bar='foo') }}""")
+ self.assertEqual(template.render(), 'foo = foo')


if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/ext_mako_test.py Sat Apr 2 13:52:07 2011
+++ /tests/ext_mako_test.py Sun Apr 3 07:14:04 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfyext.mako
+ Tests for tipfyext.mako
"""
import os
import sys
@@ -17,62 +17,62 @@


class TestMako(test_utils.BaseTestCase):
- def test_render_template(self):
- app = Tipfy(config={'tipfyext.mako': {'templates_dir': templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- mako = Mako(app)
-
- message = 'Hello, World!'
- res = mako.render_template(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(res, message + '\n')
-
- def test_render_response(self):
- app = Tipfy(config={'tipfyext.mako': {'templates_dir': templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- mako = Mako(app)
-
- message = 'Hello, World!'
- response = mako.render_response(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'text/html')
- self.assertEqual(response.data, message + '\n')
-
- def test_mako_mixin_render_template(self):
- class MyHandler(RequestHandler, MakoMixin):
- def __init__(self, app, request):
- self.app = app
- self.request = request
- self.context = {}
-
- app = Tipfy(config={'tipfyext.mako': {'templates_dir': templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = MyHandler(app, request)
- mako = Mako(app)
- message = 'Hello, World!'
-
- response = handler.render_template('template1.html', message=message)
- self.assertEqual(response, message + '\n')
-
- def test_mako_mixin_render_response(self):
- class MyHandler(RequestHandler, MakoMixin):
- def __init__(self, app, request):
- self.app = app
- self.request = request
- self.context = {}
-
- app = Tipfy(config={'tipfyext.mako': {'templates_dir': templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = MyHandler(app, request)
- mako = Mako(app)
- message = 'Hello, World!'
-
- response = handler.render_response('template1.html', message=message)
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'text/html')
- self.assertEqual(response.data, message + '\n')
+ def test_render_template(self):
+ app = Tipfy(config={'tipfyext.mako': {'templates_dir':
templates_dir}})
+ request = Request.from_values()
+ handler = RequestHandler(app, request)
+ mako = Mako(app)
+
+ message = 'Hello, World!'
+ res = mako.render_template(handler, 'template1.html',
message=message)
+ self.assertEqual(res, message + '\n')
+
+ def test_render_response(self):
+ app = Tipfy(config={'tipfyext.mako': {'templates_dir':
templates_dir}})
+ request = Request.from_values()
+ handler = RequestHandler(app, request)
+ mako = Mako(app)
+
+ message = 'Hello, World!'
+ response = mako.render_response(handler, 'template1.html',
message=message)
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'text/html')
+ self.assertEqual(response.data, message + '\n')
+
+ def test_mako_mixin_render_template(self):
+ class MyHandler(RequestHandler, MakoMixin):
+ def __init__(self, app, request):
+ self.app = app
+ self.request = request
+ self.context = {}
+
+ app = Tipfy(config={'tipfyext.mako': {'templates_dir':
templates_dir}})
+ request = Request.from_values()
+ handler = MyHandler(app, request)
+ mako = Mako(app)
+ message = 'Hello, World!'
+
+ response = handler.render_template('template1.html',
message=message)
+ self.assertEqual(response, message + '\n')
+
+ def test_mako_mixin_render_response(self):
+ class MyHandler(RequestHandler, MakoMixin):
+ def __init__(self, app, request):
+ self.app = app
+ self.request = request
+ self.context = {}
+
+ app = Tipfy(config={'tipfyext.mako': {'templates_dir':
templates_dir}})
+ request = Request.from_values()
+ handler = MyHandler(app, request)
+ mako = Mako(app)
+ message = 'Hello, World!'
+
+ response = handler.render_response('template1.html',
message=message)
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'text/html')
+ self.assertEqual(response.data, message + '\n')


if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/gae_acl_test.py Sat Apr 2 13:52:07 2011
+++ /tests/gae_acl_test.py Sun Apr 3 07:14:04 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfy.appengine.acl
+ Tests for tipfy.appengine.acl
"""
import unittest

@@ -16,424 +16,425 @@


class TestAcl(test_utils.BaseTestCase):
- def setUp(self):
- # Clean up datastore.
- super(TestAcl, self).setUp()
-
- self.app = Tipfy()
- self.app.config['tipfy']['dev'] = False
- local.current_handler = RequestHandler(self.app, Request.from_values())
-
- Acl.roles_map = {}
- Acl.roles_lock = CURRENT_VERSION_ID
- _rules_map.clear()
- test_utils.BaseTestCase.setUp(self)
-
- def tearDown(self):
- self.app.config['tipfy']['dev'] = True
-
- Acl.roles_map = {}
- Acl.roles_lock = CURRENT_VERSION_ID
- _rules_map.clear()
- test_utils.BaseTestCase.tearDown(self)
-
- def test_test_insert_or_update(self):
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertEqual(user_acl, None)
-
- # Set empty rules.
- user_acl = AclRules.insert_or_update(area='test', user='test')
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertNotEqual(user_acl, None)
- self.assertEqual(user_acl.rules, [])
- self.assertEqual(user_acl.roles, [])
-
- rules = [
- ('topic_1', 'name_1', True),
- ('topic_1', 'name_2', True),
- ('topic_2', 'name_1', False),
- ]
-
- user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertNotEqual(user_acl, None)
- self.assertEqual(user_acl.rules, rules)
- self.assertEqual(user_acl.roles, [])
-
- extra_rule = ('topic_3', 'name_3', True)
- rules.append(extra_rule)
-
- user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules, roles=['foo', 'bar', 'baz'])
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertNotEqual(user_acl, None)
- self.assertEqual(user_acl.rules, rules)
- self.assertEqual(user_acl.roles, ['foo', 'bar', 'baz'])
-
- def test_set_rules(self):
- """Test setting and appending rules."""
- rules = [
- ('topic_1', 'name_1', True),
- ('topic_1', 'name_2', True),
- ('topic_2', 'name_1', False),
- ]
- extra_rule = ('topic_3', 'name_3', True)
-
- # Set empty rules.
- user_acl = AclRules.insert_or_update(area='test', user='test')
-
- # Set rules and save the record.
- user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
-
- # Fetch the record again, and compare.
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertEqual(user_acl.rules, rules)
-
- # Append more rules.
- user_acl.rules.append(extra_rule)
- user_acl.put()
- rules.append(extra_rule)
-
- # Fetch the record again, and compare.
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertEqual(user_acl.rules, rules)
-
- def test_delete_rules(self):
- rules = [
- ('topic_1', 'name_1', True),
- ('topic_1', 'name_2', True),
- ('topic_2', 'name_1', False),
- ]
- user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
-
- # Fetch the record again, and compare.
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertEqual(user_acl.rules, rules)
-
- key_name = AclRules.get_key_name('test', 'test')
- acl = Acl('test', 'test')
-
- cached = memcache.get(key_name, namespace=AclRules.__name__)
- self.assertEqual(key_name in _rules_map, True)
- self.assertEqual(cached, _rules_map[key_name])
-
- user_acl.delete()
- user_acl2 = AclRules.get_by_area_and_user('test', 'test')
-
- cached = memcache.get(key_name, namespace=AclRules.__name__)
- self.assertEqual(user_acl2, None)
- self.assertEqual(key_name not in _rules_map, True)
- self.assertEqual(cached, None)
-
- def test_is_rule_set(self):
- rules = [
- ('topic_1', 'name_1', True),
- ('topic_1', 'name_2', True),
- ('topic_2', 'name_1', False),
- ]
- user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
-
- # Fetch the record again, and compare.
- user_acl = AclRules.get_by_area_and_user('test', 'test')
-
- self.assertEqual(user_acl.is_rule_set(*rules[0]), True)
- self.assertEqual(user_acl.is_rule_set(*rules[1]), True)
- self.assertEqual(user_acl.is_rule_set(*rules[2]), True)
- self.assertEqual(user_acl.is_rule_set('topic_1', 'name_3', True), False)
-
- def test_no_area_or_no_user(self):
- acl1 = Acl('foo', None)
- acl2 = Acl(None, 'foo')
-
- self.assertEqual(acl1.has_any_access(), False)
- self.assertEqual(acl2.has_any_access(), False)
-
- def test_default_roles_lock(self):
- Acl.roles_lock = None
- acl2 = Acl('foo', 'foo')
-
- self.assertEqual(acl2.roles_lock, CURRENT_VERSION_ID)
-
- def test_set_invalid_rules(self):
- rules = {}
- self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
-
- rules = ['foo', 'bar', True]
- self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
-
- rules = [('foo',)]
- self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
-
- rules = [('foo', 'bar')]
- self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
-
- rules = [(1, 2, 3)]
- self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
-
- rules = [('foo', 'bar', True)]
- AclRules.insert_or_update(area='test', user='test', rules=rules)
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- user_acl.rules.append((1, 2, 3))
- self.assertRaises(AssertionError, user_acl.put)
-
- def test_example(self):
- """Tests the example set in the acl module."""
- # Set a dict of roles with an 'admin' role that has full access and
assign
- # users to it. Each role maps to a list of rules. Each rule, a tuple
- # (topic, name, flag), where flag, as bool to allow or disallow access.
- # Wildcard '*' can be used to match all topics and/or names.
- Acl.roles_map = {
- 'admin': [
- ('*', '*', True),
- ],
- }
-
- # Assign users 'user_1' and 'user_2' to the 'admin' role.
- AclRules.insert_or_update(area='my_area', user='user_1', roles=['admin'])
- AclRules.insert_or_update(area='my_area', user='user_2', roles=['admin'])
-
- # Restrict 'user_2' from accessing a specific resource, adding a new rule
- # with flag set to False. Now this user has access to everything except
this
- # resource.
- user_acl = AclRules.get_by_area_and_user('my_area', 'user_2')
- user_acl.rules.append(('UserAdmin', '*', False))
- user_acl.put()
-
- # Check 'user_2' permission.
- acl = Acl(area='my_area', user='user_2')
- self.assertEqual(acl.has_access(topic='UserAdmin', name='save'), False)
- self.assertEqual(acl.has_access(topic='UserAdmin', name='get'), False)
- self.assertEqual(acl.has_access(topic='AnythingElse', name='put'), True)
-
- def test_is_one(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
-
- acl = Acl(area='my_area', user='user_1')
- self.assertEqual(acl.is_one('editor'), True)
- self.assertEqual(acl.is_one('designer'), True)
- self.assertEqual(acl.is_one('admin'), False)
-
- def test_is_any(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
-
- acl = Acl(area='my_area', user='user_1')
- self.assertEqual(acl.is_any(['editor', 'admin']), True)
- self.assertEqual(acl.is_any(['admin', 'designer']), True)
- self.assertEqual(acl.is_any(['admin', 'user']), False)
-
- def test_is_all(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
-
- acl = Acl(area='my_area', user='user_1')
- self.assertEqual(acl.is_all(['editor', 'admin']), False)
- self.assertEqual(acl.is_all(['admin', 'designer']), False)
- self.assertEqual(acl.is_all(['admin', 'user']), False)
- self.assertEqual(acl.is_all(['editor', 'designer']), True)
-
- def test_non_existent_user(self):
- acl = Acl(area='my_area', user='user_3')
- self.assertEqual(acl.has_any_access(), False)
-
- def test_has_any_access(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
- AclRules.insert_or_update(area='my_area', user='user_2',
rules=[('*', '*', True)])
- AclRules.insert_or_update(area='my_area', user='user_3')
-
- acl = Acl(area='my_area', user='user_1')
- self.assertEqual(acl.has_any_access(), True)
-
- acl = Acl(area='my_area', user='user_2')
- self.assertEqual(acl.has_any_access(), True)
-
- acl = Acl(area='my_area', user='user_3')
- self.assertEqual(acl.has_any_access(), False)
- self.assertEqual(acl._rules, [])
- self.assertEqual(acl._roles, [])
-
- def test_has_access_invalid_parameters(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
rules=[('*', '*', True)])
-
- acl1 = Acl(area='my_area', user='user_1')
-
- self.assertRaises(ValueError, acl1.has_access, 'content', '*')
- self.assertRaises(ValueError, acl1.has_access, '*', 'content')
-
- def test_has_access(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
rules=[('*', '*', True)])
- AclRules.insert_or_update(area='my_area', user='user_2',
rules=[('content', '*', True), ('content', 'delete', False)])
- AclRules.insert_or_update(area='my_area', user='user_3',
rules=[('content', 'read', True)])
-
- acl1 = Acl(area='my_area', user='user_1')
- acl2 = Acl(area='my_area', user='user_2')
- acl3 = Acl(area='my_area', user='user_3')
-
- self.assertEqual(acl1.has_access('content', 'read'), True)
- self.assertEqual(acl1.has_access('content', 'update'), True)
- self.assertEqual(acl1.has_access('content', 'delete'), True)
-
- self.assertEqual(acl2.has_access('content', 'read'), True)
- self.assertEqual(acl2.has_access('content', 'update'), True)
- self.assertEqual(acl2.has_access('content', 'delete'), False)
-
- self.assertEqual(acl3.has_access('content', 'read'), True)
- self.assertEqual(acl3.has_access('content', 'update'), False)
- self.assertEqual(acl3.has_access('content', 'delete'), False)
-
- def test_has_access_with_roles(self):
- Acl.roles_map = {
- 'admin': [('*', '*', True),],
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete', False)],
- 'designer': [('design', '*', True),],
- }
-
- AclRules.insert_or_update(area='my_area', user='user_1', roles=['admin'])
- acl1 = Acl(area='my_area', user='user_1')
-
- AclRules.insert_or_update(area='my_area', user='user_2',
roles=['admin'], rules=[('ManageUsers', '*', False)])
- acl2 = Acl(area='my_area', user='user_2')
-
- AclRules.insert_or_update(area='my_area', user='user_3',
roles=['editor'])
- acl3 = Acl(area='my_area', user='user_3')
-
- AclRules.insert_or_update(area='my_area', user='user_4',
roles=['contributor'], rules=[('design', '*', True),])
- acl4 = Acl(area='my_area', user='user_4')
-
- self.assertEqual(acl1.has_access('ApproveUsers', 'save'), True)
- self.assertEqual(acl1.has_access('ManageUsers', 'edit'), True)
- self.assertEqual(acl1.has_access('ManageUsers', 'delete'), True)
-
- self.assertEqual(acl1.has_access('ApproveUsers', 'save'), True)
- self.assertEqual(acl2.has_access('ManageUsers', 'edit'), False)
- self.assertEqual(acl2.has_access('ManageUsers', 'delete'), False)
-
- self.assertEqual(acl3.has_access('ApproveUsers', 'save'), False)
- self.assertEqual(acl3.has_access('ManageUsers', 'edit'), False)
- self.assertEqual(acl3.has_access('ManageUsers', 'delete'), False)
- self.assertEqual(acl3.has_access('content', 'edit'), True)
- self.assertEqual(acl3.has_access('content', 'delete'), True)
- self.assertEqual(acl3.has_access('content', 'save'), True)
- self.assertEqual(acl3.has_access('design', 'edit'), False)
- self.assertEqual(acl3.has_access('design', 'delete'), False)
-
- self.assertEqual(acl4.has_access('ApproveUsers', 'save'), False)
- self.assertEqual(acl4.has_access('ManageUsers', 'edit'), False)
- self.assertEqual(acl4.has_access('ManageUsers', 'delete'), False)
- self.assertEqual(acl4.has_access('content', 'edit'), True)
- self.assertEqual(acl4.has_access('content', 'delete'), False)
- self.assertEqual(acl4.has_access('content', 'save'), True)
- self.assertEqual(acl4.has_access('design', 'edit'), True)
- self.assertEqual(acl4.has_access('design', 'delete'), True)
-
- def test_roles_lock_unchanged(self):
- roles_map1 = {
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete', False)],
- }
- Acl.roles_map = roles_map1
- Acl.roles_lock = 'initial'
-
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
- acl1 = Acl(area='my_area', user='user_1')
-
- AclRules.insert_or_update(area='my_area', user='user_2',
roles=['contributor'])
- acl2 = Acl(area='my_area', user='user_2')
-
- self.assertEqual(acl1.has_access('content', 'add'), True)
- self.assertEqual(acl1.has_access('content', 'edit'), True)
- self.assertEqual(acl1.has_access('content', 'delete'), True)
-
- self.assertEqual(acl2.has_access('content', 'add'), True)
- self.assertEqual(acl2.has_access('content', 'edit'), True)
- self.assertEqual(acl2.has_access('content', 'delete'), False)
-
- roles_map2 = {
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete', False),
('content', 'add', False)],
- }
- Acl.roles_map = roles_map2
- # Don't change the lock to check that the cache will be kept.
- # Acl.roles_lock = 'changed'
-
- acl1 = Acl(area='my_area', user='user_1')
- acl2 = Acl(area='my_area', user='user_2')
-
- self.assertEqual(acl1.has_access('content', 'add'), True)
- self.assertEqual(acl1.has_access('content', 'edit'), True)
- self.assertEqual(acl1.has_access('content', 'delete'), True)
-
- self.assertEqual(acl2.has_access('content', 'add'), True)
- self.assertEqual(acl2.has_access('content', 'edit'), True)
- self.assertEqual(acl2.has_access('content', 'delete'), False)
-
- def test_roles_lock_changed(self):
- roles_map1 = {
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete', False)],
- }
- Acl.roles_map = roles_map1
- Acl.roles_lock = 'initial'
-
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
- acl1 = Acl(area='my_area', user='user_1')
-
- AclRules.insert_or_update(area='my_area', user='user_2',
roles=['contributor'])
- acl2 = Acl(area='my_area', user='user_2')
-
- self.assertEqual(acl1.has_access('content', 'add'), True)
- self.assertEqual(acl1.has_access('content', 'edit'), True)
- self.assertEqual(acl1.has_access('content', 'delete'), True)
-
- self.assertEqual(acl2.has_access('content', 'add'), True)
- self.assertEqual(acl2.has_access('content', 'edit'), True)
- self.assertEqual(acl2.has_access('content', 'delete'), False)
-
- roles_map2 = {
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete', False),
('content', 'add', False)],
- }
- Acl.roles_map = roles_map2
- Acl.roles_lock = 'changed'
-
- acl1 = Acl(area='my_area', user='user_1')
- acl2 = Acl(area='my_area', user='user_2')
-
- self.assertEqual(acl1.has_access('content', 'add'), True)
- self.assertEqual(acl1.has_access('content', 'edit'), True)
- self.assertEqual(acl1.has_access('content', 'delete'), True)
-
- self.assertEqual(acl2.has_access('content', 'add'), False)
- self.assertEqual(acl2.has_access('content', 'edit'), True)
- self.assertEqual(acl2.has_access('content', 'delete'), False)
-
- def test_acl_mixin(self):
- roles_map1 = {
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete', False)],
- }
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
-
- class Area(object):
- def key(self):
- return 'my_area'
-
- class User(object):
- def key(self):
- return 'user_1'
-
- class MyHandler(AclMixin):
- roles_map = roles_map1
- roles_lock = 'foo'
-
- def __init__(self):
- self.area = Area()
- self.current_user = User()
-
- handler = MyHandler()
- self.assertEqual(handler.acl.has_access('content', 'add'), True)
- self.assertEqual(handler.acl.has_access('content', 'edit'), True)
- self.assertEqual(handler.acl.has_access('content', 'delete'), True)
- self.assertEqual(handler.acl.has_access('foo', 'delete'), False)
+ def setUp(self):
+ # Clean up datastore.
+ super(TestAcl, self).setUp()
+
+ self.app = Tipfy()
+ self.app.config['tipfy']['dev'] = False
+ local.request = Request.from_values()
+ local.request.app = self.app
+
+ Acl.roles_map = {}
+ Acl.roles_lock = CURRENT_VERSION_ID
+ _rules_map.clear()
+ test_utils.BaseTestCase.setUp(self)
+
+ def tearDown(self):
+ self.app.config['tipfy']['dev'] = True
+
+ Acl.roles_map = {}
+ Acl.roles_lock = CURRENT_VERSION_ID
+ _rules_map.clear()
+ test_utils.BaseTestCase.tearDown(self)
+
+ def test_test_insert_or_update(self):
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertEqual(user_acl, None)
+
+ # Set empty rules.
+ user_acl = AclRules.insert_or_update(area='test', user='test')
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertNotEqual(user_acl, None)
+ self.assertEqual(user_acl.rules, [])
+ self.assertEqual(user_acl.roles, [])
+
+ rules = [
+ ('topic_1', 'name_1', True),
+ ('topic_1', 'name_2', True),
+ ('topic_2', 'name_1', False),
+ ]
+
+ user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertNotEqual(user_acl, None)
+ self.assertEqual(user_acl.rules, rules)
+ self.assertEqual(user_acl.roles, [])
+
+ extra_rule = ('topic_3', 'name_3', True)
+ rules.append(extra_rule)
+
+ user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules, roles=['foo', 'bar', 'baz'])
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertNotEqual(user_acl, None)
+ self.assertEqual(user_acl.rules, rules)
+ self.assertEqual(user_acl.roles, ['foo', 'bar', 'baz'])
+
+ def test_set_rules(self):
+ """Test setting and appending rules."""
+ rules = [
+ ('topic_1', 'name_1', True),
+ ('topic_1', 'name_2', True),
+ ('topic_2', 'name_1', False),
+ ]
+ extra_rule = ('topic_3', 'name_3', True)
+
+ # Set empty rules.
+ user_acl = AclRules.insert_or_update(area='test', user='test')
+
+ # Set rules and save the record.
+ user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
+
+ # Fetch the record again, and compare.
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertEqual(user_acl.rules, rules)
+
+ # Append more rules.
+ user_acl.rules.append(extra_rule)
+ user_acl.put()
+ rules.append(extra_rule)
+
+ # Fetch the record again, and compare.
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertEqual(user_acl.rules, rules)
+
+ def test_delete_rules(self):
+ rules = [
+ ('topic_1', 'name_1', True),
+ ('topic_1', 'name_2', True),
+ ('topic_2', 'name_1', False),
+ ]
+ user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
+
+ # Fetch the record again, and compare.
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertEqual(user_acl.rules, rules)
+
+ key_name = AclRules.get_key_name('test', 'test')
+ acl = Acl('test', 'test')
+
+ cached = memcache.get(key_name, namespace=AclRules.__name__)
+ self.assertEqual(key_name in _rules_map, True)
+ self.assertEqual(cached, _rules_map[key_name])
+
+ user_acl.delete()
+ user_acl2 = AclRules.get_by_area_and_user('test', 'test')
+
+ cached = memcache.get(key_name, namespace=AclRules.__name__)
+ self.assertEqual(user_acl2, None)
+ self.assertEqual(key_name not in _rules_map, True)
+ self.assertEqual(cached, None)
+
+ def test_is_rule_set(self):
+ rules = [
+ ('topic_1', 'name_1', True),
+ ('topic_1', 'name_2', True),
+ ('topic_2', 'name_1', False),
+ ]
+ user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
+
+ # Fetch the record again, and compare.
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+
+ self.assertEqual(user_acl.is_rule_set(*rules[0]), True)
+ self.assertEqual(user_acl.is_rule_set(*rules[1]), True)
+ self.assertEqual(user_acl.is_rule_set(*rules[2]), True)
+ self.assertEqual(user_acl.is_rule_set('topic_1', 'name_3', True),
False)
+
+ def test_no_area_or_no_user(self):
+ acl1 = Acl('foo', None)
+ acl2 = Acl(None, 'foo')
+
+ self.assertEqual(acl1.has_any_access(), False)
+ self.assertEqual(acl2.has_any_access(), False)
+
+ def test_default_roles_lock(self):
+ Acl.roles_lock = None
+ acl2 = Acl('foo', 'foo')
+
+ self.assertEqual(acl2.roles_lock, CURRENT_VERSION_ID)
+
+ def test_set_invalid_rules(self):
+ rules = {}
+ self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
+
+ rules = ['foo', 'bar', True]
+ self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
+
+ rules = [('foo',)]
+ self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
+
+ rules = [('foo', 'bar')]
+ self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
+
+ rules = [(1, 2, 3)]
+ self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
+
+ rules = [('foo', 'bar', True)]
+ AclRules.insert_or_update(area='test', user='test', rules=rules)
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ user_acl.rules.append((1, 2, 3))
+ self.assertRaises(AssertionError, user_acl.put)
+
+ def test_example(self):
+ """Tests the example set in the acl module."""
+ # Set a dict of roles with an 'admin' role that has full access
and assign
+ # users to it. Each role maps to a list of rules. Each rule, a
tuple
+ # (topic, name, flag), where flag, as bool to allow or disallow
access.
+ # Wildcard '*' can be used to match all topics and/or names.
+ Acl.roles_map = {
+ 'admin': [
+ ('*', '*', True),
+ ],
+ }
+
+ # Assign users 'user_1' and 'user_2' to the 'admin' role.
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['admin'])
+ AclRules.insert_or_update(area='my_area', user='user_2',
roles=['admin'])
+
+ # Restrict 'user_2' from accessing a specific resource, adding a
new rule
+ # with flag set to False. Now this user has access to everything
except this
+ # resource.
+ user_acl = AclRules.get_by_area_and_user('my_area', 'user_2')
+ user_acl.rules.append(('UserAdmin', '*', False))
+ user_acl.put()
+
+ # Check 'user_2' permission.
+ acl = Acl(area='my_area', user='user_2')
+ self.assertEqual(acl.has_access(topic='UserAdmin', name='save'),
False)
+ self.assertEqual(acl.has_access(topic='UserAdmin', name='get'),
False)
+ self.assertEqual(acl.has_access(topic='AnythingElse', name='put'),
True)
+
+ def test_is_one(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
+
+ acl = Acl(area='my_area', user='user_1')
+ self.assertEqual(acl.is_one('editor'), True)
+ self.assertEqual(acl.is_one('designer'), True)
+ self.assertEqual(acl.is_one('admin'), False)
+
+ def test_is_any(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
+
+ acl = Acl(area='my_area', user='user_1')
+ self.assertEqual(acl.is_any(['editor', 'admin']), True)
+ self.assertEqual(acl.is_any(['admin', 'designer']), True)
+ self.assertEqual(acl.is_any(['admin', 'user']), False)
+
+ def test_is_all(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
+
+ acl = Acl(area='my_area', user='user_1')
+ self.assertEqual(acl.is_all(['editor', 'admin']), False)
+ self.assertEqual(acl.is_all(['admin', 'designer']), False)
+ self.assertEqual(acl.is_all(['admin', 'user']), False)
+ self.assertEqual(acl.is_all(['editor', 'designer']), True)
+
+ def test_non_existent_user(self):
+ acl = Acl(area='my_area', user='user_3')
+ self.assertEqual(acl.has_any_access(), False)
+
+ def test_has_any_access(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
+ AclRules.insert_or_update(area='my_area', user='user_2',
rules=[('*', '*', True)])
+ AclRules.insert_or_update(area='my_area', user='user_3')
+
+ acl = Acl(area='my_area', user='user_1')
+ self.assertEqual(acl.has_any_access(), True)
+
+ acl = Acl(area='my_area', user='user_2')
+ self.assertEqual(acl.has_any_access(), True)
+
+ acl = Acl(area='my_area', user='user_3')
+ self.assertEqual(acl.has_any_access(), False)
+ self.assertEqual(acl._rules, [])
+ self.assertEqual(acl._roles, [])
+
+ def test_has_access_invalid_parameters(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
rules=[('*', '*', True)])
+
+ acl1 = Acl(area='my_area', user='user_1')
+
+ self.assertRaises(ValueError, acl1.has_access, 'content', '*')
+ self.assertRaises(ValueError, acl1.has_access, '*', 'content')
+
+ def test_has_access(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
rules=[('*', '*', True)])
+ AclRules.insert_or_update(area='my_area', user='user_2',
rules=[('content', '*', True), ('content', 'delete', False)])
+ AclRules.insert_or_update(area='my_area', user='user_3',
rules=[('content', 'read', True)])
+
+ acl1 = Acl(area='my_area', user='user_1')
+ acl2 = Acl(area='my_area', user='user_2')
+ acl3 = Acl(area='my_area', user='user_3')
+
+ self.assertEqual(acl1.has_access('content', 'read'), True)
+ self.assertEqual(acl1.has_access('content', 'update'), True)
+ self.assertEqual(acl1.has_access('content', 'delete'), True)
+
+ self.assertEqual(acl2.has_access('content', 'read'), True)
+ self.assertEqual(acl2.has_access('content', 'update'), True)
+ self.assertEqual(acl2.has_access('content', 'delete'), False)
+
+ self.assertEqual(acl3.has_access('content', 'read'), True)
+ self.assertEqual(acl3.has_access('content', 'update'), False)
+ self.assertEqual(acl3.has_access('content', 'delete'), False)
+
+ def test_has_access_with_roles(self):
+ Acl.roles_map = {
+ 'admin': [('*', '*', True),],
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete',
False)],
+ 'designer': [('design', '*', True),],
+ }
+
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['admin'])
+ acl1 = Acl(area='my_area', user='user_1')
+
+ AclRules.insert_or_update(area='my_area', user='user_2',
roles=['admin'], rules=[('ManageUsers', '*', False)])
+ acl2 = Acl(area='my_area', user='user_2')
+
+ AclRules.insert_or_update(area='my_area', user='user_3',
roles=['editor'])
+ acl3 = Acl(area='my_area', user='user_3')
+
+ AclRules.insert_or_update(area='my_area', user='user_4',
roles=['contributor'], rules=[('design', '*', True),])
+ acl4 = Acl(area='my_area', user='user_4')
+
+ self.assertEqual(acl1.has_access('ApproveUsers', 'save'), True)
+ self.assertEqual(acl1.has_access('ManageUsers', 'edit'), True)
+ self.assertEqual(acl1.has_access('ManageUsers', 'delete'), True)
+
+ self.assertEqual(acl1.has_access('ApproveUsers', 'save'), True)
+ self.assertEqual(acl2.has_access('ManageUsers', 'edit'), False)
+ self.assertEqual(acl2.has_access('ManageUsers', 'delete'), False)
+
+ self.assertEqual(acl3.has_access('ApproveUsers', 'save'), False)
+ self.assertEqual(acl3.has_access('ManageUsers', 'edit'), False)
+ self.assertEqual(acl3.has_access('ManageUsers', 'delete'), False)
+ self.assertEqual(acl3.has_access('content', 'edit'), True)
+ self.assertEqual(acl3.has_access('content', 'delete'), True)
+ self.assertEqual(acl3.has_access('content', 'save'), True)
+ self.assertEqual(acl3.has_access('design', 'edit'), False)
+ self.assertEqual(acl3.has_access('design', 'delete'), False)
+
+ self.assertEqual(acl4.has_access('ApproveUsers', 'save'), False)
+ self.assertEqual(acl4.has_access('ManageUsers', 'edit'), False)
+ self.assertEqual(acl4.has_access('ManageUsers', 'delete'), False)
+ self.assertEqual(acl4.has_access('content', 'edit'), True)
+ self.assertEqual(acl4.has_access('content', 'delete'), False)
+ self.assertEqual(acl4.has_access('content', 'save'), True)
+ self.assertEqual(acl4.has_access('design', 'edit'), True)
+ self.assertEqual(acl4.has_access('design', 'delete'), True)
+
+ def test_roles_lock_unchanged(self):
+ roles_map1 = {
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete',
False)],
+ }
+ Acl.roles_map = roles_map1
+ Acl.roles_lock = 'initial'
+
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
+ acl1 = Acl(area='my_area', user='user_1')
+
+ AclRules.insert_or_update(area='my_area', user='user_2',
roles=['contributor'])
+ acl2 = Acl(area='my_area', user='user_2')
+
+ self.assertEqual(acl1.has_access('content', 'add'), True)
+ self.assertEqual(acl1.has_access('content', 'edit'), True)
+ self.assertEqual(acl1.has_access('content', 'delete'), True)
+
+ self.assertEqual(acl2.has_access('content', 'add'), True)
+ self.assertEqual(acl2.has_access('content', 'edit'), True)
+ self.assertEqual(acl2.has_access('content', 'delete'), False)
+
+ roles_map2 = {
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete',
False), ('content', 'add', False)],
+ }
+ Acl.roles_map = roles_map2
+ # Don't change the lock to check that the cache will be kept.
+ # Acl.roles_lock = 'changed'
+
+ acl1 = Acl(area='my_area', user='user_1')
+ acl2 = Acl(area='my_area', user='user_2')
+
+ self.assertEqual(acl1.has_access('content', 'add'), True)
+ self.assertEqual(acl1.has_access('content', 'edit'), True)
+ self.assertEqual(acl1.has_access('content', 'delete'), True)
+
+ self.assertEqual(acl2.has_access('content', 'add'), True)
+ self.assertEqual(acl2.has_access('content', 'edit'), True)
+ self.assertEqual(acl2.has_access('content', 'delete'), False)
+
+ def test_roles_lock_changed(self):
+ roles_map1 = {
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete',
False)],
+ }
+ Acl.roles_map = roles_map1
+ Acl.roles_lock = 'initial'
+
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
+ acl1 = Acl(area='my_area', user='user_1')
+
+ AclRules.insert_or_update(area='my_area', user='user_2',
roles=['contributor'])
+ acl2 = Acl(area='my_area', user='user_2')
+
+ self.assertEqual(acl1.has_access('content', 'add'), True)
+ self.assertEqual(acl1.has_access('content', 'edit'), True)
+ self.assertEqual(acl1.has_access('content', 'delete'), True)
+
+ self.assertEqual(acl2.has_access('content', 'add'), True)
+ self.assertEqual(acl2.has_access('content', 'edit'), True)
+ self.assertEqual(acl2.has_access('content', 'delete'), False)
+
+ roles_map2 = {
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete',
False), ('content', 'add', False)],
+ }
+ Acl.roles_map = roles_map2
+ Acl.roles_lock = 'changed'
+
+ acl1 = Acl(area='my_area', user='user_1')
+ acl2 = Acl(area='my_area', user='user_2')
+
+ self.assertEqual(acl1.has_access('content', 'add'), True)
+ self.assertEqual(acl1.has_access('content', 'edit'), True)
+ self.assertEqual(acl1.has_access('content', 'delete'), True)
+
+ self.assertEqual(acl2.has_access('content', 'add'), False)
+ self.assertEqual(acl2.has_access('content', 'edit'), True)
+ self.assertEqual(acl2.has_access('content', 'delete'), False)
+
+ def test_acl_mixin(self):
+ roles_map1 = {
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete',
False)],
+ }
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
+
+ class Area(object):
+ def key(self):
+ return 'my_area'
+
+ class User(object):
+ def key(self):
+ return 'user_1'
+
+ class MyHandler(AclMixin):
+ roles_map = roles_map1
+ roles_lock = 'foo'
+
+ def __init__(self):
+ self.area = Area()
+ self.current_user = User()
+
+ handler = MyHandler()
+ self.assertEqual(handler.acl.has_access('content', 'add'), True)
+ self.assertEqual(handler.acl.has_access('content', 'edit'), True)
+ self.assertEqual(handler.acl.has_access('content', 'delete'), True)
+ self.assertEqual(handler.acl.has_access('foo', 'delete'), False)


if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/gae_sharded_counter_test.py Sat Apr 2 13:52:07 2011
+++ /tests/gae_sharded_counter_test.py Sun Apr 3 07:14:04 2011
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- Tests for tipfy.appengine.sharded_counter
+ Tests for tipfy.appengine.sharded_counter
"""
from datetime import datetime, timedelta
import unittest
@@ -16,62 +16,63 @@


class TestCounter(test_utils.BaseTestCase):
- def setUp(self):
- app = Tipfy()
- local.current_handler = RequestHandler(app, Request.from_values())
- test_utils.BaseTestCase.setUp(self)
-
- def test_counter(self):
- # Build a new counter that uses the unique key name 'hits'.
- hits = Counter('hits')
-
- self.assertEqual(hits.count, 0)
-
- # Increment by 1.
- hits.increment()
- # Increment by 10.
- hits.increment(10)
- # Decrement by 3.
- hits.increment(-3)
- # This is the current count.
- self.assertEqual(hits.count, 8)
-
- # Forces fetching a non-cached count of all shards.
- self.assertEqual(hits.get_count(nocache=True), 8)
-
- # Set the counter to an arbitrary value.
- hits.count = 6
-
- self.assertEqual(hits.get_count(nocache=True), 6)
-
- def test_cache(self):
- # Build a new counter that uses the unique key name 'hits'.
- hits = Counter('hits')
-
- self.assertEqual(hits.count, 0)
-
- # Increment by 1.
- hits.increment()
- # Increment by 10.
- hits.increment(10)
- # Decrement by 3.
- hits.increment(-3)
- # This is the current count.
- self.assertEqual(hits.count, 8)
-
- # Forces fetching a non-cached count of all shards.
- self.assertEqual(hits.get_count(nocache=True), 8)
-
- # Set the counter to an arbitrary value.
- hits.delete()
-
- self.assertEqual(hits.get_count(), 8)
- self.assertEqual(hits.get_count(nocache=True), 0)
-
- hits.memcached.delete_count()
-
- self.assertEqual(hits.get_count(), 0)
+ def setUp(self):
+ app = Tipfy()
+ local.request = Request.from_values()
+ local.request.app = app
+ test_utils.BaseTestCase.setUp(self)
+
+ def test_counter(self):
+ # Build a new counter that uses the unique key name 'hits'.
+ hits = Counter('hits')
+
+ self.assertEqual(hits.count, 0)
+
+ # Increment by 1.
+ hits.increment()
+ # Increment by 10.
+ hits.increment(10)
+ # Decrement by 3.
+ hits.increment(-3)
+ # This is the current count.
+ self.assertEqual(hits.count, 8)
+
+ # Forces fetching a non-cached count of all shards.
+ self.assertEqual(hits.get_count(nocache=True), 8)
+
+ # Set the counter to an arbitrary value.
+ hits.count = 6
+
+ self.assertEqual(hits.get_count(nocache=True), 6)
+
+ def test_cache(self):
+ # Build a new counter that uses the unique key name 'hits'.
+ hits = Counter('hits')
+
+ self.assertEqual(hits.count, 0)
+
+ # Increment by 1.
+ hits.increment()
+ # Increment by 10.
+ hits.increment(10)
+ # Decrement by 3.
+ hits.increment(-3)
+ # This is the current count.
+ self.assertEqual(hits.count, 8)
+
+ # Forces fetching a non-cached count of all shards.
+ self.assertEqual(hits.get_count(nocache=True), 8)
+
+ # Set the counter to an arbitrary value.
+ hits.delete()
+
+ self.assertEqual(hits.get_count(), 8)
+ self.assertEqual(hits.get_count(nocache=True), 0)
+
+ hits.memcached.delete_count()
+
+ self.assertEqual(hits.get_count(), 0)


if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/i18n_test.py Sat Apr 2 13:52:07 2011
+++ /tests/i18n_test.py Sun Apr 3 07:14:04 2011
@@ -13,18 +13,19 @@

from pytz.gae import pytz

-from tipfy import (Tipfy, RequestHandler, Request, Response, Rule,
- current_handler)
-from tipfy.app import local
-import tipfy.i18n as i18n
+from tipfy.app import App, Request, Response
+from tipfy.handler import RequestHandler
+from tipfy.routing import Rule
+from tipfy.local import local, get_request
from tipfy.sessions import SessionMiddleware
+import tipfy.i18n as i18n

import test_utils


class BaseTestCase(test_utils.BaseTestCase):
def setUp(self):
- app = Tipfy(rules=[
+ app = App(rules=[
Rule('/', name='home', handler=RequestHandler)
], config={
'tipfy.sessions': {
@@ -34,14 +35,14 @@
'timezone': 'UTC'
},
})
- local.current_handler = RequestHandler(app,
Request.from_values('/'))
+ local.request = request = Request.from_values('/')
+ request.app = app
test_utils.BaseTestCase.setUp(self)

-
-#==============================================================================
-# I18nMiddleware
-#==============================================================================
-class TestI18nMiddleware(BaseTestCase):
+
#==========================================================================
+ # I18nMiddleware
+
#==========================================================================
+
def test_middleware_multiple_changes(self):
class MyHandler(RequestHandler):
middleware = [SessionMiddleware(), i18n.I18nMiddleware()]
@@ -50,7 +51,7 @@
locale = self.i18n.locale
return Response(locale)

- app = Tipfy(rules=[
+ app = App(rules=[
Rule('/', name='home', handler=MyHandler)
], config={
'tipfy.sessions': {
@@ -77,15 +78,14 @@
response = client.get('/')
self.assertEqual(response.data, 'en_US')

-
-#==============================================================================
-# _(), gettext(), ngettext(), lazy_gettext(), lazy_ngettext()
-#==============================================================================
-class TestGettext(BaseTestCase):
+
#==========================================================================
+ # _(), gettext(), ngettext(), lazy_gettext(), lazy_ngettext()
+
#==========================================================================
+
def test_translations_not_set(self):
- # TODO: shouldn't need to release local here
+ # We release it here because it is set on setUp()
local.__release_local__()
- self.assertRaises(RuntimeError, i18n.gettext, 'foo')
+ self.assertRaises(AttributeError, i18n.gettext, 'foo')

def test_gettext(self):
self.assertEqual(i18n.gettext('foo'), u'foo')
@@ -117,13 +117,12 @@
self.assertEqual(i18n.lazy_ngettext('One foo', 'Many foos', 1),
u'One foo')
self.assertEqual(i18n.lazy_ngettext('One foo', 'Many foos', 2),
u'Many foos')

-
-#==============================================================================
-# I18nStore.get_store_for_request()
-#==============================================================================
-class TestStoreForRequest(BaseTestCase):
+
#==========================================================================
+ # I18nStore.get_store_for_request()
+
#==========================================================================
+
def get_app(self):
- return Tipfy(rules=[
+ return App(rules=[
Rule('/', name='home', handler=RequestHandler)
], config={
'tipfy.sessions': {
@@ -180,11 +179,10 @@
handler.request.rule_args = {'locale': 'es_ES'}
self.assertEqual(handler.i18n.locale, 'es_ES')

-
-#==============================================================================
-# Date formatting
-#==============================================================================
-class TestDates(BaseTestCase):
+
#==========================================================================
+ # Date formatting
+
#==========================================================================
+
def test_format_date(self):
value = datetime.datetime(2009, 11, 10, 16, 36, 05)

@@ -198,7 +196,7 @@
self.assertEqual(i18n.format_date(value), u'Nov 10, 2009')

def test_format_date_no_format_but_configured(self):
- app = Tipfy(config={
+ app = App(config={
'tipfy.sessions': {
'secret_key': 'secret',
},
@@ -223,7 +221,8 @@
}
}
})
- local.current_handler = RequestHandler(app,
Request.from_values('/'))
+ local.request = request = Request.from_values('/')
+ request.app = app

value = datetime.datetime(2009, 11, 10, 16, 36, 05)
self.assertEqual(i18n.format_date(value), u'Tuesday, November 10,
2009')
@@ -333,22 +332,24 @@
self.assertEqual(i18n.format_time(value, format='iso'),
u'16:36:05')
self.assertEqual(i18n.format_datetime(value, format='iso'),
u'2009-11-10T16:36:05+0000')

-#==============================================================================
-# Timezones
-#==============================================================================
-class TestTimezones(BaseTestCase):
+
#==========================================================================
+ # Timezones
+
#==========================================================================
+
def test_set_timezone(self):
- current_handler.i18n.set_timezone('UTC')
- self.assertEqual(current_handler.i18n.tzinfo.zone, 'UTC')
-
- current_handler.i18n.set_timezone('America/Chicago')
-
self.assertEqual(current_handler.i18n.tzinfo.zone, 'America/Chicago')
-
- current_handler.i18n.set_timezone('America/Sao_Paulo')
-
self.assertEqual(current_handler.i18n.tzinfo.zone, 'America/Sao_Paulo')
+ request = get_request()
+ request.i18n.set_timezone('UTC')
+ self.assertEqual(request.i18n.tzinfo.zone, 'UTC')
+
+ request.i18n.set_timezone('America/Chicago')
+ self.assertEqual(request.i18n.tzinfo.zone, 'America/Chicago')
+
+ request.i18n.set_timezone('America/Sao_Paulo')
+ self.assertEqual(request.i18n.tzinfo.zone, 'America/Sao_Paulo')

def test_to_local_timezone(self):
- current_handler.i18n.set_timezone('US/Eastern')
+ request = get_request()
+ request.i18n.set_timezone('US/Eastern')

format = '%Y-%m-%d %H:%M:%S %Z%z'

@@ -365,7 +366,8 @@
self.assertEqual(result, '2002-10-27 01:00:00 EST-0500')

def test_to_utc(self):
- current_handler.i18n.set_timezone('US/Eastern')
+ request = get_request()
+ request.i18n.set_timezone('US/Eastern')

format = '%Y-%m-%d %H:%M:%S'

@@ -390,11 +392,10 @@
i18n.set_locale('de_DE')

self.assertEqual(i18n.get_timezone_location(pytz.timezone('Europe/Berlin')),
u'Deutschland')

-
-#==============================================================================
-# Number formatting
-#==============================================================================
-class TestNumberFormatting(BaseTestCase):
+
#==========================================================================
+ # Number formatting
+
#==========================================================================
+
def test_format_number(self):
i18n.set_locale('en_US')
self.assertEqual(i18n.format_number(1099), u'1,099')
@@ -459,11 +460,10 @@
i18n.set_locale('de')
self.assertRaises(NumberFormatError,
i18n.parse_decimal, '2,109,998')

-
-#==============================================================================
-# Miscelaneous
-#==============================================================================
-class TestMiscelaneous(BaseTestCase):
+
#==========================================================================
+ # Miscelaneous
+
#==========================================================================
+
def test_list_translations(self):
cwd = os.getcwd()

os.chdir(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'resources'))
=======================================
--- /tests/sessions_test.py Sat Apr 2 13:52:07 2011
+++ /tests/sessions_test.py Sun Apr 3 07:14:04 2011
@@ -1,449 +1,450 @@
+from __future__ import with_statement
+
import os
import unittest

from werkzeug import cached_property

-from tipfy import Tipfy, Request, RequestHandler, Response, Rule
-from tipfy.app import local
+from tipfy.app import App, Request, Response
+from tipfy.handler import RequestHandler
+from tipfy.json import json_b64decode
+from tipfy.local import local
+from tipfy.routing import Rule
from tipfy.sessions import (SecureCookieSession, SecureCookieStore,
- SessionMiddleware, SessionStore)
-from tipfy.utils import json_b64decode
+ SessionMiddleware, SessionStore)
from tipfy.appengine.sessions import (DatastoreSession, MemcacheSession,
- SessionModel)
+ SessionModel)

import test_utils


class BaseHandler(RequestHandler):
- middleware = [SessionMiddleware()]
+ middleware = [SessionMiddleware()]


class TestSessionStoreBase(test_utils.BaseTestCase):
- def _get_app(self):
- return Tipfy(config={
- 'tipfy.sessions': {
- 'secret_key': 'secret',
- }
- })
-
- def test_secure_cookie_store(self):
- local.current_handler = handler = RequestHandler(self._get_app(),
Request.from_values())
- store = SessionStore(handler)
-
- self.assertEqual(isinstance(store.secure_cookie_store,
SecureCookieStore), True)
-
- def test_secure_cookie_store_no_secret_key(self):
- local.current_handler = handler = RequestHandler(Tipfy(),
Request.from_values())
- store = SessionStore(handler)
-
- self.assertRaises(KeyError, getattr, store, 'secure_cookie_store')
-
- def test_get_cookie_args(self):
- local.current_handler = handler = RequestHandler(self._get_app(),
Request.from_values())
- store = SessionStore(handler)
-
- self.assertEqual(store.get_cookie_args(), {
- 'max_age': None,
- 'domain': None,
- 'path': '/',
- 'secure': None,
- 'httponly': False,
- })
-
- self.assertEqual(store.get_cookie_args(max_age=86400,
domain='.foo.com'), {
- 'max_age': 86400,
- 'domain': '.foo.com',
- 'path': '/',
- 'secure': None,
- 'httponly': False,
- })
-
- def test_get_save_session(self):
- local.current_handler = handler = RequestHandler(self._get_app(),
Request.from_values())
- store = SessionStore(handler)
-
- session = store.get_session()
- self.assertEqual(isinstance(session, SecureCookieSession), True)
- self.assertEqual(session, {})
-
- session['foo'] = 'bar'
-
- response = Response()
- store.save(response)
-
- request = Request.from_values('/',
headers={'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))})
- local.current_handler = handler = RequestHandler(self._get_app(),
request)
- store = SessionStore(handler)
-
- session = store.get_session()
- self.assertEqual(isinstance(session, SecureCookieSession), True)
- self.assertEqual(session, {'foo': 'bar'})
-
- def test_set_delete_cookie(self):
- local.current_handler = handler = RequestHandler(self._get_app(),
Request.from_values())
- store = SessionStore(handler)
-
- store.set_cookie('foo', 'bar')
- store.set_cookie('baz', 'ding')
-
- response = Response()
- store.save(response)
-
- headers = {'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))}
- request = Request.from_values('/', headers=headers)
-
- self.assertEqual(request.cookies.get('foo'), 'bar')
- self.assertEqual(request.cookies.get('baz'), 'ding')
-
- store.delete_cookie('foo')
- store.save(response)
-
- headers = {'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))}
- request = Request.from_values('/', headers=headers)
-
- self.assertEqual(request.cookies.get('foo', None), '')
- self.assertEqual(request.cookies['baz'], 'ding')
-
- def test_set_cookie_encoded(self):
- local.current_handler = handler = RequestHandler(self._get_app(),
Request.from_values())
- store = SessionStore(handler)
-
- store.set_cookie('foo', 'bar', format='json')
- store.set_cookie('baz', 'ding', format='json')
-
- response = Response()
- store.save(response)
-
- headers = {'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))}
- request = Request.from_values('/', headers=headers)
-
- self.assertEqual(json_b64decode(request.cookies.get('foo')), 'bar')
- self.assertEqual(json_b64decode(request.cookies.get('baz')), 'ding')
+ def _get_app(self):
+ return App(config={
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ }
+ })
+
+ def test_secure_cookie_store(self):
+ with self._get_app().get_test_context() as request:
+ store = request.session_store
+ self.assertEqual(isinstance(store.secure_cookie_store,
SecureCookieStore), True)
+
+ def test_secure_cookie_store_no_secret_key(self):
+ with App().get_test_context() as request:
+ store = request.session_store
+ self.assertRaises(KeyError, getattr,
store, 'secure_cookie_store')
+
+ def test_get_cookie_args(self):
+ with self._get_app().get_test_context() as request:
+ store = request.session_store
+
+ self.assertEqual(store.get_cookie_args(), {
+ 'max_age': None,
+ 'domain': None,
+ 'path': '/',
+ 'secure': None,
+ 'httponly': False,
+ })
+
+ self.assertEqual(store.get_cookie_args(max_age=86400,
domain='.foo.com'), {
+ 'max_age': 86400,
+ 'domain': '.foo.com',
+ 'path': '/',
+ 'secure': None,
+ 'httponly': False,
+ })
+
+ def test_get_save_session(self):
+ with self._get_app().get_test_context() as request:
+ store = request.session_store
+
+ session = store.get_session()
+ self.assertEqual(isinstance(session, SecureCookieSession),
True)
+ self.assertEqual(session, {})
+
+ session['foo'] = 'bar'
+
+ response = Response()
+ store.save(response)
+
+ with self._get_app().get_test_context('/',
headers={'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))}) as
request:
+ store = request.session_store
+
+ session = store.get_session()
+ self.assertEqual(isinstance(session, SecureCookieSession),
True)
+ self.assertEqual(session, {'foo': 'bar'})
+
+ def test_set_delete_cookie(self):
+ with self._get_app().get_test_context() as request:
+ store = request.session_store
+
+ store.set_cookie('foo', 'bar')
+ store.set_cookie('baz', 'ding')
+
+ response = Response()
+ store.save(response)
+
+ headers =
{'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))}
+ with self._get_app().get_test_context('/', headers=headers) as
request:
+ store = request.session_store
+
+ self.assertEqual(request.cookies.get('foo'), 'bar')
+ self.assertEqual(request.cookies.get('baz'), 'ding')
+
+ store.delete_cookie('foo')
+ store.save(response)
+
+ headers =
{'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))}
+ with self._get_app().get_test_context('/', headers=headers) as
request:
+ self.assertEqual(request.cookies.get('foo', None), '')
+ self.assertEqual(request.cookies['baz'], 'ding')
+
+ def test_set_cookie_encoded(self):
+ with self._get_app().get_test_context() as request:
+ store = request.session_store
+
+ store.set_cookie('foo', 'bar', format='json')
+ store.set_cookie('baz', 'ding', format='json')
+
+ response = Response()
+ store.save(response)
+
+ headers =
{'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))}
+ with self._get_app().get_test_context('/', headers=headers) as
request:
+ store = request.session_store
+
+
self.assertEqual(json_b64decode(request.cookies.get('foo')), 'bar')
+
self.assertEqual(json_b64decode(request.cookies.get('baz')), 'ding')


class TestSessionStore(test_utils.BaseTestCase):
- def setUp(self):
- SessionStore.default_backends.update({
- 'datastore': DatastoreSession,
- 'memcache': MemcacheSession,
- 'securecookie': SecureCookieSession,
- })
- test_utils.BaseTestCase.setUp(self)
-
- def _get_app(self, *args, **kwargs):
- app = Tipfy(config={
- 'tipfy.sessions': {
- 'secret_key': 'secret',
- },
- })
- local.current_handler = handler = RequestHandler(app,
Request.from_values(*args, **kwargs))
- return app
-
- def test_set_session(self):
- class MyHandler(BaseHandler):
- def get(self):
- res = self.session.get('key')
- if not res:
- res = 'undefined'
- session = SecureCookieSession()
- session['key'] = 'a session value'
-
self.session_store.set_session(self.session_store.config['cookie_name'],
session)
-
- return Response(res)
-
- rules = [Rule('/', name='test', handler=MyHandler)]
-
- app = self._get_app('/')
- app.router.add(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'undefined')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'a session value')
-
- def test_set_session_datastore(self):
- class MyHandler(BaseHandler):
- def get(self):
- session = self.session_store.get_session(backend='datastore')
- res = session.get('key')
- if not res:
- res = 'undefined'
- session = DatastoreSession(None, 'a_random_session_id')
- session['key'] = 'a session value'
-
self.session_store.set_session(self.session_store.config['cookie_name'],
session, backend='datastore')
-
- return Response(res)
-
- rules = [Rule('/', name='test', handler=MyHandler)]
-
- app = self._get_app('/')
- app.router.add(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'undefined')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'a session value')
-
- def test_get_memcache_session(self):
- class MyHandler(BaseHandler):
- def get(self):
- session = self.session_store.get_session(backend='memcache')
- res = session.get('test')
- if not res:
- res = 'undefined'
- session['test'] = 'a memcache session value'
-
- return Response(res)
-
- rules = [Rule('/', name='test', handler=MyHandler)]
-
- app = self._get_app('/')
- app.router.add(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'undefined')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'a memcache session value')
-
- def test_get_datastore_session(self):
- class MyHandler(BaseHandler):
- def get(self):
- session = self.session_store.get_session(backend='datastore')
- res = session.get('test')
- if not res:
- res = 'undefined'
- session['test'] = 'a datastore session value'
-
- return Response(res)
-
- rules = [Rule('/', name='test', handler=MyHandler)]
-
- app = self._get_app('/')
- app.router.add(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'undefined')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'a datastore session value')
-
- def test_set_delete_cookie(self):
- class MyHandler(BaseHandler):
- def get(self):
- res = self.request.cookies.get('test')
- if not res:
- res = 'undefined'
- self.session_store.set_cookie('test', 'a cookie value')
- else:
- self.session_store.delete_cookie('test')
-
- return Response(res)
-
- rules = [Rule('/', name='test', handler=MyHandler)]
-
- app = self._get_app('/')
- app.router.add(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'undefined')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'a cookie value')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'undefined')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'a cookie value')
-
- def test_set_unset_cookie(self):
- class MyHandler(BaseHandler):
- def get(self):
- res = self.request.cookies.get('test')
- if not res:
- res = 'undefined'
- self.session_store.set_cookie('test', 'a cookie value')
-
- self.session_store.unset_cookie('test')
- return Response(res)
-
- rules = [Rule('/', name='test', handler=MyHandler)]
-
- app = self._get_app('/')
- app.router.add(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'undefined')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'undefined')
-
- def test_set_get_secure_cookie(self):
- class MyHandler(BaseHandler):
- def get(self):
- response = Response()
-
- cookie = self.session_store.get_secure_cookie('test') or {}
- res = cookie.get('test')
- if not res:
- res = 'undefined'
- self.session_store.set_secure_cookie(response, 'test', {'test': 'a
secure cookie value'})
-
- response.data = res
- return response
-
- rules = [Rule('/', name='test', handler=MyHandler)]
-
- app = self._get_app('/')
- app.router.add(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'undefined')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'a secure cookie value')
-
- def test_set_get_flashes(self):
- class MyHandler(BaseHandler):
- def get(self):
- res = [msg for msg, level in self.session.get_flashes()]
- if not res:
- res = [{'body': 'undefined'}]
- self.session.flash({'body': 'a flash value'})
-
- return Response(res[0]['body'])
-
- rules = [Rule('/', name='test', handler=MyHandler)]
-
- app = self._get_app('/')
- app.router.add(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'undefined')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'a flash value')
-
- def test_set_get_messages(self):
- class MyHandler(BaseHandler):
- @cached_property
- def messages(self):
- """A list of status messages to be displayed to the user."""
- messages = []
- flashes = self.session.get_flashes(key='_messages')
- for msg, level in flashes:
- msg['level'] = level
- messages.append(msg)
-
- return messages
-
- def set_message(self, level, body, title=None, life=None, flash=False):
- """Adds a status message.
-
- :param level:
- Message level. Common values are "success", "error", "info" or
- "alert".
- :param body:
- Message contents.
- :param title:
- Optional message title.
- :param life:
- Message life time in seconds. User interface can implement
- a mechanism to make the message disappear after the elapsed time.
- If not set, the message is permanent.
- :returns:
- None.
- """
- message = {'title': title, 'body': body, 'life': life}
- if flash is True:
- self.session.flash(message, level, '_messages')
- else:
- self.messages.append(message)
-
- def get(self):
- self.set_message('success', 'a normal message value')
- self.set_message('success', 'a flash message value', flash=True)
- return Response('|'.join(msg['body'] for msg in self.messages))
-
- rules = [Rule('/', name='test', handler=MyHandler)]
-
- app = self._get_app('/')
- app.router.add(rules)
- client = app.get_test_client()
-
- response = client.get('/')
- self.assertEqual(response.data, 'a normal message value')
-
- response = client.get('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- self.assertEqual(response.data, 'a flash message value|a normal message
value')
+ def setUp(self):
+ SessionStore.default_backends.update({
+ 'datastore': DatastoreSession,
+ 'memcache': MemcacheSession,
+ 'securecookie': SecureCookieSession,
+ })
+ test_utils.BaseTestCase.setUp(self)
+
+ def _get_app(self, *args, **kwargs):
+ app = App(config={
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ },
+ })
+ return app
+
+ def test_set_session(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ res = self.session.get('key')
+ if not res:
+ res = 'undefined'
+ session = SecureCookieSession()
+ session['key'] = 'a session value'
+
self.session_store.set_session(self.session_store.config['cookie_name'],
session)
+
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a session value')
+
+ def test_set_session_datastore(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ session =
self.session_store.get_session(backend='datastore')
+ res = session.get('key')
+ if not res:
+ res = 'undefined'
+ session = DatastoreSession(None, 'a_random_session_id')
+ session['key'] = 'a session value'
+
self.session_store.set_session(self.session_store.config['cookie_name'],
session, backend='datastore')
+
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a session value')
+
+ def test_get_memcache_session(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ session =
self.session_store.get_session(backend='memcache')
+ res = session.get('test')
+ if not res:
+ res = 'undefined'
+ session['test'] = 'a memcache session value'
+
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a memcache session value')
+
+ def test_get_datastore_session(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ session =
self.session_store.get_session(backend='datastore')
+ res = session.get('test')
+ if not res:
+ res = 'undefined'
+ session['test'] = 'a datastore session value'
+
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a datastore session value')
+
+ def test_set_delete_cookie(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ res = self.request.cookies.get('test')
+ if not res:
+ res = 'undefined'
+ self.session_store.set_cookie('test', 'a cookie value')
+ else:
+ self.session_store.delete_cookie('test')
+
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a cookie value')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a cookie value')
+
+ def test_set_unset_cookie(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ res = self.request.cookies.get('test')
+ if not res:
+ res = 'undefined'
+ self.session_store.set_cookie('test', 'a cookie value')
+
+ self.session_store.unset_cookie('test')
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'undefined')
+
+ def test_set_get_secure_cookie(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ response = Response()
+
+ cookie = self.session_store.get_secure_cookie('test') or {}
+ res = cookie.get('test')
+ if not res:
+ res = 'undefined'
+ self.session_store.set_secure_cookie(response, 'test',
{'test': 'a secure cookie value'})
+
+ response.data = res
+ return response
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a secure cookie value')
+
+ def test_set_get_flashes(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ res = [msg for msg, level in self.session.get_flashes()]
+ if not res:
+ res = [{'body': 'undefined'}]
+ self.session.flash({'body': 'a flash value'})
+
+ return Response(res[0]['body'])
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a flash value')
+
+ def test_set_get_messages(self):
+ class MyHandler(BaseHandler):
+ @cached_property
+ def messages(self):
+ """A list of status messages to be displayed to the
user."""
+ messages = []
+ flashes = self.session.get_flashes(key='_messages')
+ for msg, level in flashes:
+ msg['level'] = level
+ messages.append(msg)
+
+ return messages
+
+ def set_message(self, level, body, title=None, life=None,
flash=False):
+ """Adds a status message.
+
+ :param level:
+ Message level. Common values
are "success", "error", "info" or
+ "alert".
+ :param body:
+ Message contents.
+ :param title:
+ Optional message title.
+ :param life:
+ Message life time in seconds. User interface can
implement
+ a mechanism to make the message disappear after the
elapsed time.
+ If not set, the message is permanent.
+ :returns:
+ None.
+ """
+ message = {'title': title, 'body': body, 'life': life}
+ if flash is True:
+ self.session.flash(message, level, '_messages')
+ else:
+ self.messages.append(message)
+
+ def get(self):
+ self.set_message('success', 'a normal message value')
+ self.set_message('success', 'a flash message value',
flash=True)
+ return Response('|'.join(msg['body'] for msg in
self.messages))
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'a normal message value')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a flash message value|a normal
message value')


class TestSessionModel(test_utils.BaseTestCase):
- def setUp(self):
- self.app = Tipfy()
- test_utils.BaseTestCase.setUp(self)
-
- def test_get_by_sid_without_cache(self):
- sid = 'test'
- entity = SessionModel.create(sid, {'foo': 'bar', 'baz': 'ding'})
- entity.put()
-
- cached_data = SessionModel.get_cache(sid)
- self.assertNotEqual(cached_data, None)
-
- entity.delete_cache()
- cached_data = SessionModel.get_cache(sid)
- self.assertEqual(cached_data, None)
-
- entity = SessionModel.get_by_sid(sid)
- self.assertNotEqual(entity, None)
-
- # Now will fetch cache.
- entity = SessionModel.get_by_sid(sid)
- self.assertNotEqual(entity, None)
-
- self.assertEqual('foo' in entity.data, True)
- self.assertEqual('baz' in entity.data, True)
- self.assertEqual(entity.data['foo'], 'bar')
- self.assertEqual(entity.data['baz'], 'ding')
-
- entity.delete()
- entity = SessionModel.get_by_sid(sid)
- self.assertEqual(entity, None)
+ def setUp(self):
+ self.app = App()
+ test_utils.BaseTestCase.setUp(self)
+
+ def test_get_by_sid_without_cache(self):
+ sid = 'test'
+ entity = SessionModel.create(sid, {'foo': 'bar', 'baz': 'ding'})
+ entity.put()
+
+ cached_data = SessionModel.get_cache(sid)
+ self.assertNotEqual(cached_data, None)
+
+ entity.delete_cache()
+ cached_data = SessionModel.get_cache(sid)
+ self.assertEqual(cached_data, None)
+
+ entity = SessionModel.get_by_sid(sid)
+ self.assertNotEqual(entity, None)
+
+ # Now will fetch cache.
+ entity = SessionModel.get_by_sid(sid)
+ self.assertNotEqual(entity, None)
+
+ self.assertEqual('foo' in entity.data, True)
+ self.assertEqual('baz' in entity.data, True)
+ self.assertEqual(entity.data['foo'], 'bar')
+ self.assertEqual(entity.data['baz'], 'ding')
+
+ entity.delete()
+ entity = SessionModel.get_by_sid(sid)
+ self.assertEqual(entity, None)


if __name__ == '__main__':
- test_utils.main()
+ test_utils.main()
=======================================
--- /tests/utils_test.py Sat Apr 2 13:52:07 2011
+++ /tests/utils_test.py Sun Apr 3 07:14:04 2011
@@ -2,6 +2,8 @@
"""
Tests for tipfy utils
"""
+from __future__ import with_statement
+
import unittest

import werkzeug
@@ -152,12 +154,12 @@
# render_json_response()

#===========================================================================
def test_render_json_response(self):
- local.current_handler = HomeHandler(Tipfy(), Request.from_values())
- response = render_json_response({'foo': 'bar'})
-
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'application/json')
- self.assertEqual(response.data, '{"foo":"bar"}')
+ with Tipfy().get_test_context() as request:
+ response = render_json_response({'foo': 'bar'})
+
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'application/json')
+ self.assertEqual(response.data, '{"foo":"bar"}')


class TestUtils(test_utils.BaseTestCase):
=======================================
--- /tipfy/app.py Sat Apr 2 14:59:44 2011
+++ /tipfy/app.py Sun Apr 3 07:14:04 2011
@@ -52,6 +52,16 @@
#: Keyword arguments from the matched rule.
rule_args = None

+ @werkzeug.utils.cached_property
+ def auth(self):
+ """The auth store which provides access to the authenticated user
and
+ auth related functions.
+
+ :returns:
+ An auth store instance.
+ """
+ return self.app.auth_store_class(self)
+
@werkzeug.utils.cached_property
def json(self):
"""If the mimetype is `application/json` this will contain the
@@ -66,6 +76,34 @@
from tipfy.json import json_decode
return json_decode(self.data)

+ @werkzeug.utils.cached_property
+ def i18n(self):
+ """The internationalization store which provides access to several
+ translation and localization utilities.
+
+ :returns:
+ An i18n store instance.
+ """
+ return self.app.i18n_store_class(self)
+
+ @werkzeug.utils.cached_property
+ def session(self):
+ """A session dictionary using the default session configuration.
+
+ :returns:
+ A dictionary-like object with the current session data.
+ """
+ return self.session_store.get_session()
+
+ @werkzeug.utils.cached_property
+ def session_store(self):
+ """The session store, responsible for managing sessions and
flashes.
+
+ :returns:
+ A session store instance.
+ """
+ return self.app.session_store_class(self)
+
def _get_rule_adapter(self):
from warnings import warn
warn(DeprecationWarning("Request.url_adapter: this attribute "
=======================================
--- /tipfy/appengine/acl.py Wed Mar 30 05:57:11 2011
+++ /tipfy/appengine/acl.py Sun Apr 3 07:14:04 2011
@@ -1,400 +1,401 @@
# -*- coding: utf-8 -*-
"""
- tipfy.appengine.acl
- ~~~~~~~~~~~~~~~~~~~
-
- Simple Access Control List
-
- This module provides utilities to manage permissions for anything that
- requires some level of restriction, such as datastore models or
handlers.
-
- Access permissions can be grouped into roles for convenience, so that
a new
- user can be assigned to a role directly instead of having all
- permissions defined manually. Individual access permissions can then
- override or extend the role permissions.
-
- .. note::
-
- Roles are optional, so this module doesn't define a roles model (to
keep
- things simple and fast). Role definitions are set directly in the
Acl
- class. The strategy to load roles is open to the implementation; for
- best performance, define them statically in a module.
-
- Usage example::
-
- # Set a dict of roles with an 'admin' role that has full access and
- # assign users to it. Each role maps to a list of rules. Each rule
is a
- # tuple (topic, name, flag), where flag is as bool to allow or
disallow
- # access. Wildcard '*' can be used to match all topics and/or
names.
- Acl.roles_map = {
- 'admin': [
- ('*', '*', True),
- ],
- }
-
- # Assign users 'user_1' and 'user_2' to the 'admin' role.
- AclRules.insert_or_update(area='my_area', user='user_1',
- roles=['admin'])
- AclRules.insert_or_update(area='my_area', user='user_2',
- roles=['admin'])
-
- # Restrict 'user_2' from accessing a specific resource, adding a
new
- # rule with flag set to False. Now this user has access to
everything
- # except this resource.
- user_acl = AclRules.get_by_area_and_user('my_area', 'user_2')
- user_acl.rules.append(('UserAdmin', '*', False))
- user_acl.put()
-
- # Check that 'user_2' permissions are correct.
- acl = Acl(area='my_area', user='user_2')
- assert acl.has_access(topic='UserAdmin', name='save') is False
- assert acl.has_access(topic='AnythingElse', name='put') is True
-
- The Acl object should be created once after a user is loaded, so that
- it becomes available for the app to do all necessary permissions
checkings.
-
- Based on concept from `Solar <http://solarphp.com>`_ Access and Role
- classes.
-
- :copyright: 2011 by tipfy.org.
- :license: BSD, see LICENSE.txt for more details.
+ tipfy.appengine.acl
+ ~~~~~~~~~~~~~~~~~~~
+
+ Simple Access Control List
+
+ This module provides utilities to manage permissions for anything that
+ requires some level of restriction, such as datastore models or handlers.
+
+ Access permissions can be grouped into roles for convenience, so that a
new
+ user can be assigned to a role directly instead of having all
+ permissions defined manually. Individual access permissions can then
+ override or extend the role permissions.
+
+ .. note::
+
+ Roles are optional, so this module doesn't define a roles model (to
keep
+ things simple and fast). Role definitions are set directly in the Acl
+ class. The strategy to load roles is open to the implementation; for
+ best performance, define them statically in a module.
+
+ Usage example::
+
+ # Set a dict of roles with an 'admin' role that has full access and
+ # assign users to it. Each role maps to a list of rules. Each rule is a
+ # tuple (topic, name, flag), where flag is as bool to allow or disallow
+ # access. Wildcard '*' can be used to match all topics and/or names.
+ Acl.roles_map = {
+ 'admin': [
+ ('*', '*', True),
+ ],
+ }
+
+ # Assign users 'user_1' and 'user_2' to the 'admin' role.
+ AclRules.insert_or_update(area='my_area', user='user_1',
+ roles=['admin'])
+ AclRules.insert_or_update(area='my_area', user='user_2',
+ roles=['admin'])
+
+ # Restrict 'user_2' from accessing a specific resource, adding a new
+ # rule with flag set to False. Now this user has access to everything
+ # except this resource.
+ user_acl = AclRules.get_by_area_and_user('my_area', 'user_2')
+ user_acl.rules.append(('UserAdmin', '*', False))
+ user_acl.put()
+
+ # Check that 'user_2' permissions are correct.
+ acl = Acl(area='my_area', user='user_2')
+ assert acl.has_access(topic='UserAdmin', name='save') is False
+ assert acl.has_access(topic='AnythingElse', name='put') is True
+
+ The Acl object should be created once after a user is loaded, so that
+ it becomes available for the app to do all necessary permissions
checkings.
+
+ Based on concept from `Solar <http://solarphp.com>`_ Access and Role
+ classes.
+
+ :copyright: 2011 by tipfy.org.
+ :license: BSD, see LICENSE.txt for more details.
"""
from google.appengine.ext import db
from google.appengine.api import memcache

from werkzeug import cached_property

-from tipfy import current_handler, CURRENT_VERSION_ID
+from tipfy.appengine import CURRENT_VERSION_ID
from tipfy.appengine.db import PickleProperty
+from tipfy.local import get_request

#: Cache for loaded rules.
_rules_map = {}


class AclMixin(object):
- """A mixin that adds an acl property to a ``tipfy.RequestHandler``.
-
- The handler *must* have the properties area and current_user set for
- it to work.
- """
- roles_map = None
- roles_lock = None
-
- @cached_property
- def acl(self):
- """Loads and returns the access permission for the currently
logged in
- user. This requires the handler to have the area and
- current_user attributes. Casted to a string they must return the
- object identifiers.
- """
- return Acl(str(self.area.key()), str(self.current_user.key()),
- self.roles_map, self.roles_lock)
+ """A mixin that adds an acl property to a ``tipfy.RequestHandler``.
+
+ The handler *must* have the properties area and current_user set for
+ it to work.
+ """
+ roles_map = None
+ roles_lock = None
+
+ @cached_property
+ def acl(self):
+ """Loads and returns the access permission for the currently logged in
+ user. This requires the handler to have the area and
+ current_user attributes. Casted to a string they must return the
+ object identifiers.
+ """
+ return Acl(str(self.area.key()), str(self.current_user.key()),
+ self.roles_map, self.roles_lock)


def validate_rules(rules):
- """Ensures that the list of rule tuples is set correctly."""
- assert isinstance(rules, list), 'Rules must be a list'
-
- for rule in rules:
- assert isinstance(rule, tuple), 'Each rule must be tuple'
- assert len(rule) == 3, 'Each rule must have three elements'
- assert isinstance(rule[0], basestring), 'Rule topic must be a
string'
- assert isinstance(rule[1], basestring), 'Rule name must be a
string'
- assert isinstance(rule[2], bool), 'Rule flag must be a bool'
+ """Ensures that the list of rule tuples is set correctly."""
+ assert isinstance(rules, list), 'Rules must be a list'
+
+ for rule in rules:
+ assert isinstance(rule, tuple), 'Each rule must be tuple'
+ assert len(rule) == 3, 'Each rule must have three elements'
+ assert isinstance(rule[0], basestring), 'Rule topic must be a string'
+ assert isinstance(rule[1], basestring), 'Rule name must be a string'
+ assert isinstance(rule[2], bool), 'Rule flag must be a bool'


class AclRules(db.Model):
- """Stores roles and rules for a user in a given area."""
- #: Creation date.
- created = db.DateTimeProperty(auto_now_add=True)
- #: Modification date.
- updated = db.DateTimeProperty(auto_now=True)
- #: Area to which this role is related.
- area = db.StringProperty(required=True)
- #: User identifier.
- user = db.StringProperty(required=True)
- #: List of role names.
- roles = db.StringListProperty()
- #: Lists of rules. Each rule is a tuple (topic, name, flag).
- rules = PickleProperty(validator=validate_rules)
-
- @classmethod
- def get_key_name(cls, area, user):
- """Returns this entity's key name, also used as memcache key.
-
- :param area:
- Area string identifier.
- :param user:
- User string identifier.
- :returns:
- The key name.
- """
- return '%s:%s' % (str(area), str(user))
-
- @classmethod
- def get_by_area_and_user(cls, area, user):
- """Returns an AclRules entity for a given user in a given area.
-
- :param area:
- Area string identifier.
- :param user:
- User string identifier.
- :returns:
- An AclRules entity.
- """
- return cls.get_by_key_name(cls.get_key_name(area, user))
-
- @classmethod
- def insert_or_update(cls, area, user, roles=None, rules=None):
- """Inserts or updates ACL rules and roles for a given user. This
will
- reset roles and rules if the user exists and the values are not
passed.
-
- :param area:
- Area string identifier.
- :param user:
- User string identifier.
- :param roles:
- List of the roles for the user.
- :param rules:
- List of the rules for the user.
- :returns:
- An AclRules entity.
- """
- if roles is None:
- roles = []
-
- if rules is None:
- rules = []
-
- user_acl = cls(key_name=cls.get_key_name(area, user), area=area,
- user=user, roles=roles, rules=rules)
- user_acl.put()
- return user_acl
-
- @classmethod
- def get_roles_and_rules(cls, area, user, roles_map, roles_lock):
- """Returns a tuple (roles, rules) for a given user in a given area.
-
- :param area:
- Area string identifier.
- :param user:
- User string identifier.
- :param roles_map:
- Dictionary of available role names mapping to list of rules.
- :param roles_lock:
- Lock for the roles map: a unique identifier to track changes.
- :returns:
- A tuple of (roles, rules) for the given user in the given area.
- """
- res = None
- cache_key = cls.get_key_name(area, user)
- if cache_key in _rules_map:
- res = _rules_map[cache_key]
- else:
- res = memcache.get(cache_key, namespace=cls.__name__)
-
- if res is not None:
- lock, roles, rules = res
-
- if res is None or lock != roles_lock or current_handler.app.debug:
- entity = cls.get_by_key_name(cache_key)
- if entity is None:
- res = (roles_lock, [], [])
- else:
- rules = []
- # Apply role rules.
- for role in entity.roles:
- rules.extend(roles_map.get(role, []))
-
- # Extend with rules, eventually overriding some role rules.
- rules.extend(entity.rules)
-
- # Reverse everything, as rules are checked from last to
first.
- rules.reverse()
-
- # Set results for cache, applying current roles_lock.
- res = (roles_lock, entity.roles, rules)
-
- cls.set_cache(cache_key, res)
-
- return (res[1], res[2])
-
- @classmethod
- def set_cache(cls, cache_key, spec):
- """Sets a memcache value.
-
- :param cache_key:
- The Cache key.
- :param spec:
- Value to be saved.
- """
- _rules_map[cache_key] = spec
- memcache.set(cache_key, spec, namespace=cls.__name__)
-
- @classmethod
- def delete_cache(cls, cache_key):
- """Deletes a memcache value.
-
- :param cache_key:
- The Cache key.
- """
- if cache_key in _rules_map:
- del _rules_map[cache_key]
-
- memcache.delete(cache_key, namespace=cls.__name__)
-
- def put(self):
- """Saves the entity and clears the cache."""
- self.delete_cache(self.get_key_name(self.area, self.user))
- super(AclRules, self).put()
-
- def delete(self):
- """Deletes the entity and clears the cache."""
- self.delete_cache(self.get_key_name(self.area, self.user))
- super(AclRules, self).delete()
-
- def is_rule_set(self, topic, name, flag):
- """Checks if a given rule is set.
-
- :param topic:
- A rule topic, as a string.
- :param roles:
- A rule name, as a string.
- :param flag:
- A rule flag, a boolean.
- :returns:
- True if the rule already exists, False otherwise.
- """
- for rule_topic, rule_name, rule_flag in self.rules:
- if rule_topic == topic and rule_name == name and rule_flag ==
flag:
- return True
-
- return False
+ """Stores roles and rules for a user in a given area."""
+ #: Creation date.
+ created = db.DateTimeProperty(auto_now_add=True)
+ #: Modification date.
+ updated = db.DateTimeProperty(auto_now=True)
+ #: Area to which this role is related.
+ area = db.StringProperty(required=True)
+ #: User identifier.
+ user = db.StringProperty(required=True)
+ #: List of role names.
+ roles = db.StringListProperty()
+ #: Lists of rules. Each rule is a tuple (topic, name, flag).
+ rules = PickleProperty(validator=validate_rules)
+
+ @classmethod
+ def get_key_name(cls, area, user):
+ """Returns this entity's key name, also used as memcache key.
+
+ :param area:
+ Area string identifier.
+ :param user:
+ User string identifier.
+ :returns:
+ The key name.
+ """
+ return '%s:%s' % (str(area), str(user))
+
+ @classmethod
+ def get_by_area_and_user(cls, area, user):
+ """Returns an AclRules entity for a given user in a given area.
+
+ :param area:
+ Area string identifier.
+ :param user:
+ User string identifier.
+ :returns:
+ An AclRules entity.
+ """
+ return cls.get_by_key_name(cls.get_key_name(area, user))
+
+ @classmethod
+ def insert_or_update(cls, area, user, roles=None, rules=None):
+ """Inserts or updates ACL rules and roles for a given user. This will
+ reset roles and rules if the user exists and the values are not passed.
+
+ :param area:
+ Area string identifier.
+ :param user:
+ User string identifier.
+ :param roles:
+ List of the roles for the user.
+ :param rules:
+ List of the rules for the user.
+ :returns:
+ An AclRules entity.
+ """
+ if roles is None:
+ roles = []
+
+ if rules is None:
+ rules = []
+
+ user_acl = cls(key_name=cls.get_key_name(area, user), area=area,
+ user=user, roles=roles, rules=rules)
+ user_acl.put()
+ return user_acl
+
+ @classmethod
+ def get_roles_and_rules(cls, area, user, roles_map, roles_lock):
+ """Returns a tuple (roles, rules) for a given user in a given area.
+
+ :param area:
+ Area string identifier.
+ :param user:
+ User string identifier.
+ :param roles_map:
+ Dictionary of available role names mapping to list of rules.
+ :param roles_lock:
+ Lock for the roles map: a unique identifier to track changes.
+ :returns:
+ A tuple of (roles, rules) for the given user in the given area.
+ """
+ res = None
+ cache_key = cls.get_key_name(area, user)
+ if cache_key in _rules_map:
+ res = _rules_map[cache_key]
+ else:
+ res = memcache.get(cache_key, namespace=cls.__name__)
+
+ if res is not None:
+ lock, roles, rules = res
+
+ if res is None or lock != roles_lock or get_request().app.debug:
+ entity = cls.get_by_key_name(cache_key)
+ if entity is None:
+ res = (roles_lock, [], [])
+ else:
+ rules = []
+ # Apply role rules.
+ for role in entity.roles:
+ rules.extend(roles_map.get(role, []))
+
+ # Extend with rules, eventually overriding some role rules.
+ rules.extend(entity.rules)
+
+ # Reverse everything, as rules are checked from last to first.
+ rules.reverse()
+
+ # Set results for cache, applying current roles_lock.
+ res = (roles_lock, entity.roles, rules)
+
+ cls.set_cache(cache_key, res)
+
+ return (res[1], res[2])
+
+ @classmethod
+ def set_cache(cls, cache_key, spec):
+ """Sets a memcache value.
+
+ :param cache_key:
+ The Cache key.
+ :param spec:
+ Value to be saved.
+ """
+ _rules_map[cache_key] = spec
+ memcache.set(cache_key, spec, namespace=cls.__name__)
+
+ @classmethod
+ def delete_cache(cls, cache_key):
+ """Deletes a memcache value.
+
+ :param cache_key:
+ The Cache key.
+ """
+ if cache_key in _rules_map:
+ del _rules_map[cache_key]
+
+ memcache.delete(cache_key, namespace=cls.__name__)
+
+ def put(self):
+ """Saves the entity and clears the cache."""
+ self.delete_cache(self.get_key_name(self.area, self.user))
+ super(AclRules, self).put()
+
+ def delete(self):
+ """Deletes the entity and clears the cache."""
+ self.delete_cache(self.get_key_name(self.area, self.user))
+ super(AclRules, self).delete()
+
+ def is_rule_set(self, topic, name, flag):
+ """Checks if a given rule is set.
+
+ :param topic:
+ A rule topic, as a string.
+ :param roles:
+ A rule name, as a string.
+ :param flag:
+ A rule flag, a boolean.
+ :returns:
+ True if the rule already exists, False otherwise.
+ """
+ for rule_topic, rule_name, rule_flag in self.rules:
+ if rule_topic == topic and rule_name == name and rule_flag == flag:
+ return True
+
+ return False


class Acl(object):
- """Loads access rules and roles for a given user in a given area and
- provides a centralized interface to check permissions. Each Acl object
- checks the permissions for a single user. For example::
-
- from tipfy.appengine.acl import Acl
-
- # Build an Acl object for user 'John' in the 'code-reviews' area.
- acl = Acl('code-reviews', 'John')
-
- # Check if 'John' is 'admin' in the 'code-reviews' area.
- is_admin = acl.is_one('admin')
-
- # Check if 'John' can approve new reviews.
- can_edit = acl.has_access('EditReview', 'approve')
- """
- #: Dictionary of available role names mapping to list of rules.
- roles_map = {}
-
- #: Lock for role changes. This is needed because if role definitions
change
- #: we must invalidate existing cache that applied the previous
definitions.
- roles_lock = None
-
- def __init__(self, area, user, roles_map=None, roles_lock=None):
- """Loads access privileges and roles for a given user in a given
area.
-
- :param area:
- An area identifier, as a string.
- :param user:
- A user identifier, as a string.
- :param roles_map:
- A dictionary of roles mapping to a list of rule tuples.
- :param roles_lock:
- Roles lock string to validate cache. If not set, uses
- the application version id.
- """
- if roles_map is not None:
- self.roles_map = roles_map
-
- if roles_lock is not None:
- self.roles_lock = roles_lock
- elif self.roles_lock is None:
- # Set roles_lock default.
- self.roles_lock = CURRENT_VERSION_ID
-
- if area and user:
- self._roles, self._rules = AclRules.get_roles_and_rules(area,
user,
- self.roles_map, self.roles_lock)
- else:
- self.reset()
-
- def reset(self):
- """Resets the currently loaded access rules and user roles."""
- self._rules = []
- self._roles = []
-
- def is_one(self, role):
- """Check to see if a user is in a role group.
-
- :param role:
- A role name, as a string.
- :returns:
- True if the user is in this role group, False otherwise.
- """
- return role in self._roles
-
- def is_any(self, roles):
- """Check to see if a user is in any of the listed role groups.
-
- :param roles:
- An iterable of role names.
- :returns:
- True if the user is in any of the role groups, False otherwise.
- """
- for role in roles:
- if role in self._roles:
- return True
-
- return False
-
- def is_all(self, roles):
- """Check to see if a user is in all of the listed role groups.
-
- :param roles:
- An iterable of role names.
- :returns:
- True if the user is in all of the role groups, False otherwise.
- """
- for role in roles:
- if role not in self._roles:
- return False
-
- return True
-
- def has_any_access(self):
- """Checks if the user has any access or roles.
-
- :returns:
- True if the user has any access rule or role set, False
otherwise.
- """
- if self._rules or self._roles:
- return True
-
- return False
-
- def has_access(self, topic, name):
- """Checks if the user has access to a topic/name combination.
-
- :param topic:
- A rule topic, as a string.
- :param roles:
- A rule name, as a string.
- :returns:
- True if the user has access to this rule, False otherwise.
- """
- if topic == '*' or name == '*':
- raise ValueError("has_access() can't be called passing '*'")
-
- for rule_topic, rule_name, rule_flag in self._rules:
- if (rule_topic == topic or rule_topic == '*') and \
- (rule_name == name or rule_name == '*'):
- # Topic and name matched, so return the flag.
- return rule_flag
-
- # No match.
- return False
+ """Loads access rules and roles for a given user in a given area and
+ provides a centralized interface to check permissions. Each Acl object
+ checks the permissions for a single user. For example::
+
+ from tipfy.appengine.acl import Acl
+
+ # Build an Acl object for user 'John' in the 'code-reviews' area.
+ acl = Acl('code-reviews', 'John')
+
+ # Check if 'John' is 'admin' in the 'code-reviews' area.
+ is_admin = acl.is_one('admin')
+
+ # Check if 'John' can approve new reviews.
+ can_edit = acl.has_access('EditReview', 'approve')
+ """
+ #: Dictionary of available role names mapping to list of rules.
+ roles_map = {}
+
+ #: Lock for role changes. This is needed because if role definitions
change
+ #: we must invalidate existing cache that applied the previous
definitions.
+ roles_lock = None
+
+ def __init__(self, area, user, roles_map=None, roles_lock=None):
+ """Loads access privileges and roles for a given user in a given area.
+
+ :param area:
+ An area identifier, as a string.
+ :param user:
+ A user identifier, as a string.
+ :param roles_map:
+ A dictionary of roles mapping to a list of rule tuples.
+ :param roles_lock:
+ Roles lock string to validate cache. If not set, uses
+ the application version id.
+ """
+ if roles_map is not None:
+ self.roles_map = roles_map
+
+ if roles_lock is not None:
+ self.roles_lock = roles_lock
+ elif self.roles_lock is None:
+ # Set roles_lock default.
+ self.roles_lock = CURRENT_VERSION_ID
+
+ if area and user:
+ self._roles, self._rules = AclRules.get_roles_and_rules(area, user,
+ self.roles_map, self.roles_lock)
+ else:
+ self.reset()
+
+ def reset(self):
+ """Resets the currently loaded access rules and user roles."""
+ self._rules = []
+ self._roles = []
+
+ def is_one(self, role):
+ """Check to see if a user is in a role group.
+
+ :param role:
+ A role name, as a string.
+ :returns:
+ True if the user is in this role group, False otherwise.
+ """
+ return role in self._roles
+
+ def is_any(self, roles):
+ """Check to see if a user is in any of the listed role groups.
+
+ :param roles:
+ An iterable of role names.
+ :returns:
+ True if the user is in any of the role groups, False otherwise.
+ """
+ for role in roles:
+ if role in self._roles:
+ return True
+
+ return False
+
+ def is_all(self, roles):
+ """Check to see if a user is in all of the listed role groups.
+
+ :param roles:
+ An iterable of role names.
+ :returns:
+ True if the user is in all of the role groups, False otherwise.
+ """
+ for role in roles:
+ if role not in self._roles:
+ return False
+
+ return True
+
+ def has_any_access(self):
+ """Checks if the user has any access or roles.
+
+ :returns:
+ True if the user has any access rule or role set, False otherwise.
+ """
+ if self._rules or self._roles:
+ return True
+
+ return False
+
+ def has_access(self, topic, name):
+ """Checks if the user has access to a topic/name combination.
+
+ :param topic:
+ A rule topic, as a string.
+ :param roles:
+ A rule name, as a string.
+ :returns:
+ True if the user has access to this rule, False otherwise.
+ """
+ if topic == '*' or name == '*':
+ raise ValueError("has_access() can't be called passing '*'")
+
+ for rule_topic, rule_name, rule_flag in self._rules:
+ if (rule_topic == topic or rule_topic == '*') and \
+ (rule_name == name or rule_name == '*'):
+ # Topic and name matched, so return the flag.
+ return rule_flag
+
+ # No match.
+ return False
=======================================
--- /tipfy/appengine/sharded_counter.py Wed Mar 30 05:57:11 2011
+++ /tipfy/appengine/sharded_counter.py Sun Apr 3 07:14:04 2011
@@ -1,12 +1,12 @@
# -*- coding: utf-8 -*-
"""
- tipfy.appengine.sharded_counter
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
- A general purpose sharded counter implementation for the datastore.
-
- :copyright: 2011 by tipfy.org.
- :license: Apache Software License, see LICENSE.txt for more details.
+ tipfy.appengine.sharded_counter
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ A general purpose sharded counter implementation for the datastore.
+
+ :copyright: 2011 by tipfy.org.
+ :license: Apache Software License, see LICENSE.txt for more details.
"""
import string
import random
@@ -16,160 +16,160 @@
from google.appengine.ext import db
from google.appengine.runtime import apiproxy_errors

-from tipfy import current_handler
+from tipfy.local import get_request

#: Default configuration values for this module. Keys are:
#:
#: shards
#: The amount of shards to use.
default_config = {
- 'shards': 10,
+ 'shards': 10,
}


class MemcachedCount(object):
- # Allows negative numbers in unsigned memcache
- DELTA_ZERO = 500000
-
- @property
- def namespace(self):
- return __name__ + '.' + self.__class__.__name__
-
- def __init__(self, name):
- self.key = 'MemcachedCount' + name
-
- def get_count(self):
- value = memcache.get(self.key, namespace=self.namespace)
- if value is None:
- return 0
- else:
- return string.atoi(value) - MemcachedCount.DELTA_ZERO
-
- def set_count(self, value):
- memcache.set(self.key, str(MemcachedCount.DELTA_ZERO + value),
- namespace=self.namespace)
-
- def delete_count(self):
- memcache.delete(self.key)
-
- count = property(get_count, set_count, delete_count)
-
- def increment(self, incr=1):
- value = memcache.get(self.key, namespace=self.namespace)
- if value is None:
- self.count = incr
- elif incr > 0:
- memcache.incr(self.key, incr, namespace=self.namespace)
- elif incr < 0:
- memcache.decr(self.key, -incr, namespace=self.namespace)
+ # Allows negative numbers in unsigned memcache
+ DELTA_ZERO = 500000
+
+ @property
+ def namespace(self):
+ return __name__ + '.' + self.__class__.__name__
+
+ def __init__(self, name):
+ self.key = 'MemcachedCount' + name
+
+ def get_count(self):
+ value = memcache.get(self.key, namespace=self.namespace)
+ if value is None:
+ return 0
+ else:
+ return string.atoi(value) - MemcachedCount.DELTA_ZERO
+
+ def set_count(self, value):
+ memcache.set(self.key, str(MemcachedCount.DELTA_ZERO + value),
+ namespace=self.namespace)
+
+ def delete_count(self):
+ memcache.delete(self.key)
+
+ count = property(get_count, set_count, delete_count)
+
+ def increment(self, incr=1):
+ value = memcache.get(self.key, namespace=self.namespace)
+ if value is None:
+ self.count = incr
+ elif incr > 0:
+ memcache.incr(self.key, incr, namespace=self.namespace)
+ elif incr < 0:
+ memcache.decr(self.key, -incr, namespace=self.namespace)


class Counter(object):
- """A counter using sharded writes to prevent contentions.
-
- Should be used for counters that handle a lot of concurrent use.
- Follows the pattern described in the Google I/O talk:
-
http://sites.google.com/site/io/building-scalable-web-applications-with-google-app-engine
-
- Memcache is used for caching counts and if a cached count is available,
- it is the most correct. If there are datastore put issues, we store the
- un-put values into a delayed_incr memcache that will be applied as soon
- as the next shard put is successful. Changes will only be lost if we
lose
- memcache before a successful datastore shard put or there's a
- failure/error in memcache.
-
- Example::
-
- # Build a new counter that uses the unique key name 'hits'.
- hits = Counter('hits')
- # Increment by 1.
- hits.increment()
- # Increment by 10.
- hits.increment(10)
- # Decrement by 3.
- hits.increment(-3)
- # This is the current count.
- my_hits = hits.count
- # Forces fetching a non-cached count of all shards.
- hits.get_count(nocache=True)
- # Set the counter to an arbitrary value.
- hits.count = 6
- """
- #: Number of shards to use.
- shards = None
-
- def __init__(self, name):
- self.name = name
- self.memcached = MemcachedCount('counter:' + name)
- self.delayed_incr = MemcachedCount('delayed:' + name)
-
- @property
- def number_of_shards(self):
- return self.shards or
current_handler.app.config[__name__]['shards']
-
- def delete(self):
- q = db.Query(CounterShard).filter('name =', self.name)
- shards = q.fetch(limit=self.number_of_shards)
- db.delete(shards)
-
- def get_count_and_cache(self):
- q = db.Query(CounterShard).filter('name =', self.name)
- shards = q.fetch(limit=self.number_of_shards)
- datastore_count = 0
- for shard in shards:
- datastore_count += shard.count
-
- count = datastore_count + self.delayed_incr.count
- self.memcached.count = count
- return count
-
- def get_count(self, nocache=False):
- total = self.memcached.count
- if nocache or total is None:
- return self.get_count_and_cache()
- else:
- return int(total)
-
- def set_count(self, value):
- cur_value = self.get_count()
- self.memcached.count = value
- delta = value - cur_value
- if delta != 0:
- CounterShard.increment(self, incr=delta)
-
- count = property(get_count, set_count)
-
- def increment(self, incr=1, refresh=False):
- CounterShard.increment(self, incr)
- self.memcached.increment(incr)
+ """A counter using sharded writes to prevent contentions.
+
+ Should be used for counters that handle a lot of concurrent use.
+ Follows the pattern described in the Google I/O talk:
+
http://sites.google.com/site/io/building-scalable-web-applications-with-google-app-engine
+
+ Memcache is used for caching counts and if a cached count is available,
+ it is the most correct. If there are datastore put issues, we store the
+ un-put values into a delayed_incr memcache that will be applied as soon
+ as the next shard put is successful. Changes will only be lost if we lose
+ memcache before a successful datastore shard put or there's a
+ failure/error in memcache.
+
+ Example::
+
+ # Build a new counter that uses the unique key name 'hits'.
+ hits = Counter('hits')
+ # Increment by 1.
+ hits.increment()
+ # Increment by 10.
+ hits.increment(10)
+ # Decrement by 3.
+ hits.increment(-3)
+ # This is the current count.
+ my_hits = hits.count
+ # Forces fetching a non-cached count of all shards.
+ hits.get_count(nocache=True)
+ # Set the counter to an arbitrary value.
+ hits.count = 6
+ """
+ #: Number of shards to use.
+ shards = None
+
+ def __init__(self, name):
+ self.name = name
+ self.memcached = MemcachedCount('counter:' + name)
+ self.delayed_incr = MemcachedCount('delayed:' + name)
+
+ @property
+ def number_of_shards(self):
+ return self.shards or get_request().app.config[__name__]['shards']
+
+ def delete(self):
+ q = db.Query(CounterShard).filter('name =', self.name)
+ shards = q.fetch(limit=self.number_of_shards)
+ db.delete(shards)
+
+ def get_count_and_cache(self):
+ q = db.Query(CounterShard).filter('name =', self.name)
+ shards = q.fetch(limit=self.number_of_shards)
+ datastore_count = 0
+ for shard in shards:
+ datastore_count += shard.count
+
+ count = datastore_count + self.delayed_incr.count
+ self.memcached.count = count
+ return count
+
+ def get_count(self, nocache=False):
+ total = self.memcached.count
+ if nocache or total is None:
+ return self.get_count_and_cache()
+ else:
+ return int(total)
+
+ def set_count(self, value):
+ cur_value = self.get_count()
+ self.memcached.count = value
+ delta = value - cur_value
+ if delta != 0:
+ CounterShard.increment(self, incr=delta)
+
+ count = property(get_count, set_count)
+
+ def increment(self, incr=1, refresh=False):
+ CounterShard.increment(self, incr)
+ self.memcached.increment(incr)


class CounterShard(db.Model):
- name = db.StringProperty(required=True)
- count = db.IntegerProperty(default=0)
-
- @classmethod
- def increment(cls, counter, incr=1):
- index = random.randint(1, counter.number_of_shards)
- counter_name = counter.name
- delayed_incr = counter.delayed_incr.count
- shard_key_name = 'Shard' + counter_name + str(index)
- def get_or_create_shard():
- shard = CounterShard.get_by_key_name(shard_key_name)
- if shard is None:
- shard = CounterShard(key_name=shard_key_name,
name=counter_name)
- shard.count += incr + delayed_incr
- key = shard.put()
-
- try:
- db.run_in_transaction(get_or_create_shard)
- except (db.Error, apiproxy_errors.Error), e:
- counter.delayed_incr.increment(incr)
- logging.error('CounterShard (%s) delayed increment %d: %s',
- counter_name, incr, e)
- return False
-
- if delayed_incr:
- counter.delayed_incr.count = 0
-
- return True
+ name = db.StringProperty(required=True)
+ count = db.IntegerProperty(default=0)
+
+ @classmethod
+ def increment(cls, counter, incr=1):
+ index = random.randint(1, counter.number_of_shards)
+ counter_name = counter.name
+ delayed_incr = counter.delayed_incr.count
+ shard_key_name = 'Shard' + counter_name + str(index)
+ def get_or_create_shard():
+ shard = CounterShard.get_by_key_name(shard_key_name)
+ if shard is None:
+ shard = CounterShard(key_name=shard_key_name, name=counter_name)
+ shard.count += incr + delayed_incr
+ key = shard.put()
+
+ try:
+ db.run_in_transaction(get_or_create_shard)
+ except (db.Error, apiproxy_errors.Error), e:
+ counter.delayed_incr.increment(incr)
+ logging.error('CounterShard (%s) delayed increment %d: %s',
+ counter_name, incr, e)
+ return False
+
+ if delayed_incr:
+ counter.delayed_incr.count = 0
+
+ return True
=======================================
--- /tipfy/auth/__init__.py Wed Mar 30 05:57:11 2011
+++ /tipfy/auth/__init__.py Sun Apr 3 07:14:04 2011
@@ -45,11 +45,10 @@


class BaseAuthStore(object):
- def __init__(self, handler):
- self.handler = handler
- self.app = handler.app
- self.request = handler.request
- self.config = handler.app.config[__name__]
+ def __init__(self, request):
+ self.request = request
+ self.app = request.app
+ self.config = request.app.config[__name__]

@cached_property
def user_model(self):
@@ -68,14 +67,14 @@
@cached_property
def _session_base(self):
cookie_name = self.config['cookie_name']
- return self.handler.session_store.get_session(cookie_name)
+ return self.request.session_store.get_session(cookie_name)

def _url(self, _name, **kwargs):
kwargs.setdefault('redirect', self.request.path)
if not DEV_APPSERVER and self.config['secure_urls']:
kwargs['_scheme'] = 'https'

- return self.handler.url_for(_name, **kwargs)
+ return self.app.router.url_for(self.request, _name, kwargs)

def login_url(self, **kwargs):
"""Returns a URL that, when visited, prompts the user to sign in.
@@ -206,7 +205,7 @@
kwargs['max_age'] = None

self._session_base['_auth'] = self._session = session
- self.handler.session_store.update_session_args(
+ self.request.session_store.update_session_args(
self.config['cookie_name'], **kwargs)


=======================================
--- /tipfy/handler.py Sat Apr 2 15:00:06 2011
+++ /tipfy/handler.py Sun Apr 3 07:14:04 2011
@@ -105,7 +105,7 @@
:returns:
An auth store instance.
"""
- return self.app.auth_store_class(self)
+ return self.request.auth

@werkzeug.utils.cached_property
def i18n(self):
@@ -115,7 +115,7 @@
:returns:
An i18n store instance.
"""
- return self.app.i18n_store_class(self)
+ return self.request.i18n

@werkzeug.utils.cached_property
def session(self):
@@ -133,7 +133,7 @@
:returns:
A session store instance.
"""
- return self.app.session_store_class(self)
+ return self.request.session_store

def abort(self, code, *args, **kwargs):
"""Raises an :class:`HTTPException`. This stops code execution,
=======================================
--- /tipfy/i18n.py Sat Apr 2 14:59:44 2011
+++ /tipfy/i18n.py Sun Apr 3 07:14:04 2011
@@ -33,7 +33,7 @@
except ImportError:
raise RuntimeError('gaepytz or pytz are required.')

-from tipfy.app import current_handler
+from tipfy.local import get_request

#: Default configuration values for this module. Keys are:
#:
@@ -142,20 +142,20 @@
#: Current tzinfo.
tzinfo = None

- def __init__(self, handler):
- self.config = handler.app.config[__name__]
- self.loaded_translations = handler.app.registry.setdefault(
+ def __init__(self, request):
+ self.config = request.app.config[__name__]
+ self.loaded_translations = request.app.registry.setdefault(
'i18n.translations', {})
- self.set_locale_for_request(handler)
- self.set_timezone_for_request(handler)
-
- def set_locale_for_request(self, handler):
- locale = _get_request_value(handler,
+ self.set_locale_for_request(request)
+ self.set_timezone_for_request(request)
+
+ def set_locale_for_request(self, request):
+ locale = _get_request_value(request,
self.config['locale_request_lookup'], self.config['locale'])
self.set_locale(locale)

- def set_timezone_for_request(self, handler):
- timezone = _get_request_value(handler,
+ def set_timezone_for_request(self, request):
+ timezone = _get_request_value(request,
self.config['timezone_request_lookup'],
self.config['timezone'])
self.set_timezone(timezone)

@@ -635,109 +635,109 @@

def set_locale(locale):
"""See :meth:`I18nStore.set_locale`."""
- return current_handler.i18n.set_locale(locale)
+ return get_request().i18n.set_locale(locale)


def set_timezone(timezone):
"""See :meth:`I18nStore.set_timezone`."""
- return current_handler.i18n.set_timezone(timezone)
+ return get_request().i18n.set_timezone(timezone)


def gettext(string, **variables):
"""See :meth:`I18nStore.gettext`."""
- return current_handler.i18n.gettext(string, **variables)
+ return get_request().i18n.gettext(string, **variables)


def ngettext(singular, plural, n, **variables):
"""See :meth:`I18nStore.ngettext`."""
- return current_handler.i18n.ngettext(singular, plural, n, **variables)
+ return get_request().i18n.ngettext(singular, plural, n, **variables)


def to_local_timezone(datetime):
"""See :meth:`I18nStore.to_local_timezone`."""
- return current_handler.i18n.to_local_timezone(datetime)
+ return get_request().i18n.to_local_timezone(datetime)


def to_utc(datetime):
"""See :meth:`I18nStore.to_utc`."""
- return current_handler.i18n.to_utc(datetime)
+ return get_request().i18n.to_utc(datetime)


def format_date(date=None, format=None, rebase=True):
"""See :meth:`I18nStore.format_date`."""
- return current_handler.i18n.format_date(date, format, rebase)
+ return get_request().i18n.format_date(date, format, rebase)


def format_datetime(datetime=None, format=None, rebase=True):
"""See :meth:`I18nStore.format_datetime`."""
- return current_handler.i18n.format_datetime(datetime, format, rebase)
+ return get_request().i18n.format_datetime(datetime, format, rebase)


def format_time(time=None, format=None, rebase=True):
"""See :meth:`I18nStore.format_time`."""
- return current_handler.i18n.format_time(time, format, rebase)
+ return get_request().i18n.format_time(time, format, rebase)


def format_timedelta(datetime_or_timedelta, granularity='second',
threshold=.85):
"""See :meth:`I18nStore.format_timedelta`."""
- return current_handler.i18n.format_timedelta(datetime_or_timedelta,
+ return get_request().i18n.format_timedelta(datetime_or_timedelta,
granularity, threshold)


def format_number(number):
"""See :meth:`I18nStore.format_number`."""
- return current_handler.i18n.format_number(number)
+ return get_request().i18n.format_number(number)


def format_decimal(number, format=None):
"""See :meth:`I18nStore.format_decimal`."""
- return current_handler.i18n.format_decimal(number, format)
+ return get_request().i18n.format_decimal(number, format)


def format_currency(number, currency, format=None):
"""See :meth:`I18nStore.format_currency`."""
- return current_handler.i18n.format_currency(number, currency, format)
+ return get_request().i18n.format_currency(number, currency, format)


def format_percent(number, format=None):
"""See :meth:`I18nStore.format_percent`."""
- return current_handler.i18n.format_percent(number, format)
+ return get_request().i18n.format_percent(number, format)


def format_scientific(number, format=None):
"""See :meth:`I18nStore.format_scientific`."""
- return current_handler.i18n.format_scientific(number, format)
+ return get_request().i18n.format_scientific(number, format)


def parse_date(string):
"""See :meth:`I18nStore.parse_date`"""
- return current_handler.i18n.parse_date(string)
+ return get_request().i18n.parse_date(string)


def parse_datetime(string):
"""See :meth:`I18nStore.parse_datetime`."""
- return current_handler.i18n.parse_datetime(string)
+ return get_request().i18n.parse_datetime(string)


def parse_time(string):
"""See :meth:`I18nStore.parse_time`."""
- return current_handler.i18n.parse_time(string)
+ return get_request().i18n.parse_time(string)


def parse_number(string):
"""See :meth:`I18nStore.parse_number`."""
- return current_handler.i18n.parse_number(string)
+ return get_request().i18n.parse_number(string)


def parse_decimal(string):
"""See :meth:`I18nStore.parse_decimal`."""
- return current_handler.i18n.parse_decimal(string)
+ return get_request().i18n.parse_decimal(string)


def get_timezone_location(dt_or_tzinfo):
"""See :meth:`I18nStore.get_timezone_location`."""
- return current_handler.i18n.get_timezone_location(dt_or_tzinfo)
+ return get_request().i18n.get_timezone_location(dt_or_tzinfo)


def list_translations(dirname='locale'):
@@ -793,7 +793,7 @@
return support.LazyProxy(ngettext, singular, plural, n, **variables)


-def _get_request_value(handler, lookup_list, default=None):
+def _get_request_value(request, lookup_list, default=None):
"""Returns a locale code or timezone for the current request.

It will use the configuration for ``locale_request_lookup`` or
@@ -811,18 +811,19 @@
:returns:
A locale code or timezone setting.
"""
- request = handler.request
+ value = None
+ attrs = ('args', 'form', 'cookies', 'session', 'rule_args')
for method, key in lookup_list:
- if method in ('session', 'context'):
- # Get from session or handler context.
- obj = getattr(handler, method)
- else:
+ if method in attrs:
# Get from GET, POST, cookies or rule_args.
obj = getattr(request, method)
-
- value = obj.get(key, None)
-
- if value is not None:
+ else:
+ obj = None
+
+ if obj:
+ value = obj.get(key, None)
+
+ if value:
break
else:
value = default
=======================================
--- /tipfy/routing.py Sat Apr 2 14:59:44 2011
+++ /tipfy/routing.py Sun Apr 3 07:14:04 2011
@@ -407,7 +407,7 @@
.. seealso:: :meth:`Router.url_for`.
"""
request = get_request()
- return request.rule_adapter.url_for(request, _name, kwargs)
+ return request.app.router.url_for(request, _name, kwargs)


# Add regex converter to the list of converters.
=======================================
--- /tipfy/sessions.py Sat Apr 2 14:59:44 2011
+++ /tipfy/sessions.py Sun Apr 3 07:14:04 2011
@@ -235,10 +235,10 @@
'securecookie': SecureCookieSession,
}

- def __init__(self, handler, backends=None):
- self.request = handler.request
+ def __init__(self, request, backends=None):
+ self.request = request
# Base configuration.
- self.config = handler.app.config[__name__]
+ self.config = request.app.config[__name__]
# A dictionary of support backend classes.
self.backends = backends or self.default_backends
# The default backend to use when none is provided.
=======================================
--- /tipfy/utils.py Wed Mar 30 04:47:00 2011
+++ /tipfy/utils.py Sun Apr 3 07:14:04 2011
@@ -21,147 +21,140 @@
import urllib
import xml.sax.saxutils

-from .app import current_handler
# Imported here for compatibility.
from .json import json_encode, json_decode, json_b64encode, json_b64decode
+from .local import get_request
+from .routing import url_for


def xhtml_escape(value):
- """Escapes a string so it is valid within XML or XHTML.
-
- :param value:
- The value to be escaped.
- :returns:
- The escaped value.
- """
- return utf8(xml.sax.saxutils.escape(value, {'"': "&quot;"}))
+ """Escapes a string so it is valid within XML or XHTML.
+
+ :param value:
+ The value to be escaped.
+ :returns:
+ The escaped value.
+ """
+ return utf8(xml.sax.saxutils.escape(value, {'"': "&quot;"}))


def xhtml_unescape(value):
- """Un-escapes an XML-escaped string.
-
- :param value:
- The value to be un-escaped.
- :returns:
- The un-escaped value.
- """
- return re.sub(r"&(#?)(\w+?);", _convert_entity, _unicode(value))
+ """Un-escapes an XML-escaped string.
+
+ :param value:
+ The value to be un-escaped.
+ :returns:
+ The un-escaped value.
+ """
+ return re.sub(r"&(#?)(\w+?);", _convert_entity, _unicode(value))


def render_json_response(*args, **kwargs):
- """Renders a JSON response.
-
- :param args:
- Arguments to be passed to json_encode().
- :param kwargs:
- Keyword arguments to be passed to json_encode().
- :returns:
- A :class:`Response` object with a JSON string in the body and
- mimetype set to ``application/json``.
- """
- return current_handler.app.response_class(json_encode(*args, **kwargs),
- mimetype='application/json')
+ """Renders a JSON response.
+
+ :param args:
+ Arguments to be passed to json_encode().
+ :param kwargs:
+ Keyword arguments to be passed to json_encode().
+ :returns:
+ A :class:`Response` object with a JSON string in the body and
+ mimetype set to ``application/json``.
+ """
+ return get_request().app.response_class(json_encode(*args, **kwargs),
+ mimetype='application/json')


def squeeze(value):
- """Replace all sequences of whitespace chars with a single space."""
- return re.sub(r"[\x00-\x20]+", " ", value).strip()
+ """Replace all sequences of whitespace chars with a single space."""
+ return re.sub(r"[\x00-\x20]+", " ", value).strip()


def url_escape(value):
- """Returns a valid URL-encoded version of the given value."""
- return urllib.quote_plus(utf8(value))
+ """Returns a valid URL-encoded version of the given value."""
+ return urllib.quote_plus(utf8(value))


def url_unescape(value):
- """Decodes the given value from a URL."""
- return _unicode(urllib.unquote_plus(value))
+ """Decodes the given value from a URL."""
+ return _unicode(urllib.unquote_plus(value))


def utf8(value):
- """Encodes a unicode value to UTF-8 if not yet encoded.
-
- :param value:
- Value to be encoded.
- :returns:
- An encoded string.
- """
- if isinstance(value, unicode):
- return value.encode("utf-8")
-
- assert isinstance(value, str)
- return value
+ """Encodes a unicode value to UTF-8 if not yet encoded.
+
+ :param value:
+ Value to be encoded.
+ :returns:
+ An encoded string.
+ """
+ if isinstance(value, unicode):
+ return value.encode("utf-8")
+
+ assert isinstance(value, str)
+ return value


def _unicode(value):
- """Encodes a string value to unicode if not yet decoded.
-
- :param value:
- Value to be decoded.
- :returns:
- A decoded string.
- """
- if isinstance(value, str):
- return value.decode("utf-8")
-
- assert isinstance(value, unicode)
- return value
+ """Encodes a string value to unicode if not yet decoded.
+
+ :param value:
+ Value to be decoded.
+ :returns:
+ A decoded string.
+ """
+ if isinstance(value, str):
+ return value.decode("utf-8")
+
+ assert isinstance(value, unicode)
+ return value


def _convert_entity(m):
- if m.group(1) == "#":
- try:
- return unichr(int(m.group(2)))
- except ValueError:
- return "&#%s;" % m.group(2)
- try:
- return _HTML_UNICODE_MAP[m.group(2)]
- except KeyError:
- return "&%s;" % m.group(2)
+ if m.group(1) == "#":
+ try:
+ return unichr(int(m.group(2)))
+ except ValueError:
+ return "&#%s;" % m.group(2)
+ try:
+ return _HTML_UNICODE_MAP[m.group(2)]
+ except KeyError:
+ return "&%s;" % m.group(2)


def _build_unicode_map():
- return dict((name, unichr(value)) for \
- name, value in htmlentitydefs.name2codepoint.iteritems())
-
-
-def url_for(_name, **kwargs):
- """A proxy to :meth:`RequestHandler.url_for`.
-
- .. seealso:: :meth:`Router.url_for`.
- """
- return current_handler.url_for(_name, **kwargs)
+ return dict((name, unichr(value)) for \
+ name, value in htmlentitydefs.name2codepoint.iteritems())


def slugify(value, max_length=None, default=None):
- """Converts a string to slug format (all lowercase, words separated by
- dashes).
-
- :param value:
- The string to be slugified.
- :param max_length:
- An integer to restrict the resulting string to a maximum length.
- Words are not broken when restricting length.
- :param default:
- A default value in case the resulting string is empty.
- :returns:
- A slugified string.
- """
- value = _unicode(value)
- s = unicodedata.normalize('NFKD',
value).encode('ascii', 'ignore').lower()
- s = re.sub('-+', '-', re.sub('[^a-zA-Z0-9-]+', '-', s)).strip('-')
- if not s:
- return default
-
- if max_length:
- # Restrict length without breaking words.
- while len(s) > max_length:
- if s.find('-') == -1:
- s = s[:max_length]
- else:
- s = s.rsplit('-', 1)[0]
-
- return s
+ """Converts a string to slug format (all lowercase, words separated by
+ dashes).
+
+ :param value:
+ The string to be slugified.
+ :param max_length:
+ An integer to restrict the resulting string to a maximum length.
+ Words are not broken when restricting length.
+ :param default:
+ A default value in case the resulting string is empty.
+ :returns:
+ A slugified string.
+ """
+ value = _unicode(value)
+ s = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').lower()
+ s = re.sub('-+', '-', re.sub('[^a-zA-Z0-9-]+', '-', s)).strip('-')
+ if not s:
+ return default
+
+ if max_length:
+ # Restrict length without breaking words.
+ while len(s) > max_length:
+ if s.find('-') == -1:
+ s = s[:max_length]
+ else:
+ s = s.rsplit('-', 1)[0]
+
+ return s


_HTML_UNICODE_MAP = _build_unicode_map()
=======================================
--- /tipfyext/jinja2/__init__.py Wed Mar 30 05:57:11 2011
+++ /tipfyext/jinja2/__init__.py Sun Apr 3 07:14:04 2011
@@ -16,8 +16,8 @@

from werkzeug import cached_property, import_string

-from tipfy import current_handler
-from tipfy.utils import url_for
+from tipfy.local import get_request
+from tipfy.routing import url_for

#: Default configuration values for this module. Keys are:
#:
@@ -88,8 +88,8 @@
# Install i18n.
from tipfy import i18n
env.install_gettext_callables(
- lambda x: current_handler.i18n.translations.ugettext(x),
- lambda s, p, n:
current_handler.i18n.translations.ungettext(s,
+ lambda x: get_request().i18n.translations.ugettext(x),
+ lambda s, p, n:
get_request().i18n.translations.ungettext(s,
p, n),
newstyle=True)
format_functions = {

Reply all
Reply to author
Forward
0 new messages