Revision: 9daf269f091b
Author: Rodrigo Moraes <
rodrigo...@gmail.com>
Date: Sat Apr 2 13:52:07 2011
Log: Refactoring tests, adding a Makefile to run them. Removed
tipfy/dev.py and related tests (not used since the issue was fixed).
http://code.google.com/p/tipfy/source/detail?r=9daf269f091b
Added:
/Makefile
/tests/README.txt
/tests/app_test.py
/tests/auth_test.py
/tests/config_test.py
/tests/ext_jinja2_test.py
/tests/ext_mako_test.py
/tests/gae_acl_test.py
/tests/gae_blobstore_test.py
/tests/gae_db_test.py
/tests/gae_mail_test.py
/tests/gae_sharded_counter_test.py
/tests/gae_taskqueue_test.py
/tests/gae_xmpp_test.py
/tests/i18n_test.py
/tests/manage_test.py
/tests/routing_test.py
/tests/secure_cookie_test.py
/tests/sessions_test.py
/tests/template_test.py
/tests/utils_test.py
Deleted:
/tests/project/README.txt
/tests/project/app/app.yaml
/tests/project/app/config.py
/tests/project/app/hello_world/__init__.py
/tests/project/app/hello_world/handlers.py
/tests/project/app/index.yaml
/tests/project/app/lib/README.txt
/tests/project/app/locale/README.txt
/tests/project/app/main.py
/tests/project/app/static/favicon.ico
/tests/project/app/static/robots.txt
/tests/project/app/urls.py
/tests/project/babel.cfg
/tests/project/bin/README.txt
/tests/project/bootstrap.py
/tests/project/buildout.cfg
/tests/project/gaetools.cfg
/tests/project/var/README.txt
/tests/project/var/downloads/README.txt
/tests/project/versions.cfg
/tests/run_tests.py
/tests/test_app.py
/tests/test_auth.py
/tests/test_config.py
/tests/test_dev.py
/tests/test_ext_jinja2.py
/tests/test_ext_mako.py
/tests/test_gae_acl.py
/tests/test_gae_blobstore.py
/tests/test_gae_db.py
/tests/test_gae_mail.py
/tests/test_gae_sharded_counter.py
/tests/test_gae_taskqueue.py
/tests/test_gae_xmpp.py
/tests/test_i18n.py
/tests/test_manage.py
/tests/test_routing.py
/tests/test_secure_cookie.py
/tests/test_sessions.py
/tests/test_template.py
/tipfy/dev.py
Modified:
/.hgignore
/tests/test_utils.py
/tipfy/app.py
=======================================
--- /dev/null
+++ /Makefile Sat Apr 2 13:52:07 2011
@@ -0,0 +1,124 @@
+# Convenience to run tests and coverage.
+# You must have installed the App Engine SDK toolkit, version 1.4.0 or
+# later, and it must be installed in /usr/local/google_appengine.
+# This probably won't work on Windows.
+# Borrowed from
http://code.google.com/p/appengine-ndb-experiment/
+
+FLAGS=
+GAE= /usr/local/google_appengine
+GAEPATH=$(GAE):$(GAE)/lib/django_0_96:$(GAE)/lib/webob:$(GAE)/lib/yaml/lib:tests
+TESTS= `find tests -name [a-z]\*_test.py`
+NONTESTS=`find tipfy -name [a-z]\*.py
+PORT= 8080
+ADDRESS=localhost
+PYTHON= python -Wignore
+
+define run_test
+ PYTHONPATH=$(GAEPATH):. $(PYTHON) -m tests.$1_test $(FLAGS)
+endef
+
+test:
+ for i in $(TESTS); \
+ do \
+ echo $$i; \
+ PYTHONPATH=$(GAEPATH):. $(PYTHON) -m tests.`basename $$i .py` $(FLAGS);
\
+ done
+
+# 'app', 'auth', 'config', 'dev', 'ext_jinja2', 'ext_mako', 'gae_acl',
+# 'gae_blobstore', 'gae_db', 'gae_mail', 'gae_sharded_counter',
+# 'gae_taskqueue', 'gae_xmpp', 'i18n', 'manage', 'routing', 'secure_cookie',
+# 'sessions', 'template', 'utils'
+
+app_test:
+ $(call run_test,app)
+
+auth_test:
+ $(call run_test,auth)
+
+config_test:
+ $(call run_test,config)
+
+dev_test:
+ $(call run_test,dev)
+
+ext_jinja2_test:
+ $(call run_test,ext_jinja2)
+
+ext_mako_test:
+ $(call run_test,ext_mako)
+
+gae_acl_test:
+ $(call run_test,gae_acl)
+
+gae_blobstore_test:
+ $(call run_test,gae_blobstore)
+
+gae_db_test:
+ $(call run_test,gae_db)
+
+gae_mail_test:
+ $(call run_test,gae_mail)
+
+gae_sharded_counter_test:
+ $(call run_test,gae_sharded_counter)
+
+gae_taskqueue_test:
+ $(call run_test,gae_taskqueue)
+
+gae_xmpp_test:
+ $(call run_test,gae_xmpp)
+
+i18n_test:
+ $(call run_test,i18n)
+
+manage_test:
+ $(call run_test,manage)
+
+routing_test:
+ $(call run_test,routing)
+
+secure_cookie_test:
+ $(call run_test,secure_cookie)
+
+sessions_test:
+ $(call run_test,sessions)
+
+template_test:
+ $(call run_test,template)
+
+utils_test:
+ $(call run_test,utils)
+
+c cov cove cover coverage:
+ coverage erase
+ for i in $(TESTS); \
+ do \
+ echo $$i; \
+ PYTHONPATH=$(GAEPATH):. coverage run -p $$i; \
+ done
+ coverage combine
+ coverage html $(NONTESTS)
+ coverage report -m $(NONTESTS)
+ echo "open file://`pwd`/htmlcov/index.html"
+
+serve:
+ $(GAE)/dev_appserver.py . --port $(PORT) --address $(ADDRESS)
+
+debug:
+ $(GAE)/dev_appserver.py . --port $(PORT) --address $(ADDRESS) --debug
+
+deploy:
+ appcfg.py update .
+
+python:
+ PYTHONPATH=$(GAEPATH):. $(PYTHON) -i startup.py
+
+python_raw:
+ PYTHONPATH=$(GAEPATH):. $(PYTHON)
+
+zip:
+ D=`pwd`; D=`basename $$D`; cd ..; rm $$D.zip; zip $$D.zip `hg st -c -m -a
-n -X $$D/.idea $$D`
+
+clean:
+ rm -rf htmlcov
+ rm -f `find . -name \*.pyc -o -name \*~ -o -name @* -o -name \*.orig`
=======================================
--- /dev/null
+++ /tests/README.txt Sat Apr 2 13:52:07 2011
@@ -0,0 +1,52 @@
+Unit test setup
+---------------
+Run the tests inside a virtualenv. If you don't have it installed:
+
+ $ pip install virtualenv
+
+Then create a virtualenv inside the tests directory:
+
+ $ cd path/to/tipfy/tests
+ $ virtualenv env
+
+Activate the virtualenv. On *nix:
+
+ $ . env/bin/activate
+
+...or on Windows:
+
+ $ env\scripts\activate
+
+Then install dependencies and libraries ued by the tests:
+
+ $ pip install werkzeug
+ $ pip install jinja2
+ $ pip install blinker
+ $ pip install babel
+ $ pip install gaepytz
+ $ pip install mako
+ $ pip install coverage
+
+Alternatively, install this compiled babel trunk for format_timedelta
support:
+
http://tipfy.googlecode.com/files/babel_trunk_with_format_timedelta.tar.bz2
+
+Uncompress, access the uncompressed dir, then:
+
+ $ python setup.py install
+
+Running the tests
+-----------------
+Activate the virtualenv. Then use the Makefile to run all tests (only
tested
+on Linux):
+
+ $ make test
+
+Or to run single tests:
+
+ $ make app_test
+ $ make config_test
+ $ ...
+
+Or to run the coverage:
+
+ $ make coverage
=======================================
--- /dev/null
+++ /tests/app_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,539 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for
tipfy.app
+"""
+from __future__ import with_statement
+
+import os
+import sys
+import StringIO
+import unittest
+
+import tipfy
+from tipfy import Request, RequestHandler, Response, Rule, Tipfy
+from tipfy.utils import json_encode
+from tipfy.local import local
+
+#from
tipfy.app import Request, Response, Tipfy
+#from tipfy.handler import RequestHandler
+#from tipfy.json import json_encode
+#from tipfy.routing import Rule
+
+import test_utils
+
+class BaseTestCase(test_utils.BaseTestCase):
+ def setUp(self):
+ self.appengine = tipfy.app.APPENGINE
+ self.dev_appserver = tipfy.app.DEV_APPSERVER
+ test_utils.BaseTestCase.setUp(self)
+
+ def tearDown(self):
+ tipfy.app.APPENGINE = self.appengine
+ tipfy.app.DEV_APPSERVER = self.dev_appserver
+ test_utils.BaseTestCase.tearDown(self)
+
+ def _set_dev_server_flag(self, flag):
+ tipfy.app.APPENGINE = flag
+ tipfy.app.DEV_APPSERVER = flag
+
+
+class AllMethodsHandler(RequestHandler):
+ def get(self, **kwargs):
+ return Response('Method: %s' % self.request.method)
+
+ delete = head = options = post = put = trace = get
+
+
+class BrokenHandler(RequestHandler):
+ def get(self, **kwargs):
+ raise ValueError('booo!')
+
+
+class BrokenButFixedHandler(BrokenHandler):
+ def handle_exception(self, exception=None):
+ # Let's fix it.
+ return Response('That was close!', status=200)
+
+
+class Handle404(RequestHandler):
+ def handle_exception(self, exception=None):
+ return Response('404 custom handler', status=404)
+
+
+class Handle405(RequestHandler):
+ def handle_exception(self, exception=None):
+ response = Response('405 custom handler', status=405)
+ response.headers['Allow'] = 'GET'
+ return response
+
+
+class Handle500(RequestHandler):
+ def handle_exception(self, exception=None):
+ return Response('500 custom handler', status=500)
+
+
+class TestRequestHandler(BaseTestCase):
+ def test_200(self):
+ app = Tipfy(rules=[Rule('/', name='home', handler=AllMethodsHandler)])
+ client = app.get_test_client()
+
+ for method in app.allowed_methods:
+ response = client.open('/', method=method)
+ self.assertEqual(response.status_code, 200, method)
+ if method == 'HEAD':
+ self.assertEqual(response.data, '')
+ else:
+ self.assertEqual(response.data, 'Method: %s' % method)
+
+ # App Engine mode.
+ self._set_dev_server_flag(True)
+ response = client.get('/')
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'Method: GET')
+
+ def test_404(self):
+ """No URL rules defined."""
+ app = Tipfy()
+ client = app.get_test_client()
+
+ # Normal mode.
+ response = client.get('/')
+ self.assertEqual(response.status_code, 404)
+
+ # Debug mode.
+ app.debug = True
+ response = client.get('/')
+ self.assertEqual(response.status_code, 404)
+
+ def test_500(self):
+ """Handler import will fail."""
+ app = Tipfy(rules=[Rule('/', name='home',
handler='non.existent.handler')])
+ client = app.get_test_client()
+
+ # Normal mode.
+ response = client.get('/')
+ self.assertEqual(response.status_code, 500)
+
+ # Debug mode.
+ app.debug = True
+ app.config['tipfy']['enable_debugger'] = False
+ self.assertRaises(ImportError, client.get, '/')
+
+ def test_501(self):
+ """Method is not in app.allowed_methods."""
+ app = Tipfy()
+ client = app.get_test_client()
+
+ # Normal mode.
+ response = client.open('/', method='CONNECT')
+ self.assertEqual(response.status_code, 501)
+
+ # Debug mode.
+ app.debug = True
+ response = client.open('/', method='CONNECT')
+ self.assertEqual(response.status_code, 501)
+
+ def test_abort(self):
+ class HandlerWithAbort(RequestHandler):
+ def get(self, **kwargs):
+ self.abort(kwargs.get('status_code'))
+
+ app = Tipfy(rules=[
+ Rule('/<int:status_code>', name='abort-me', handler=HandlerWithAbort),
+ ])
+ client = app.get_test_client()
+
+ response = client.get('/400')
+ self.assertEqual(response.status_code, 400)
+
+ response = client.get('/403')
+ self.assertEqual(response.status_code, 403)
+
+ response = client.get('/404')
+ self.assertEqual(response.status_code, 404)
+
+ def test_get_config(self):
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ ], config = {
+ 'foo': {
+ 'bar': 'baz',
+ }
+ })
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(handler.get_config('foo', 'bar'), 'baz')
+
+ def test_handle_exception(self):
+ app = Tipfy([
+ Rule('/', handler=AllMethodsHandler, name='home'),
+ Rule('/broken', handler=BrokenHandler, name='broken'),
+ Rule('/broken-but-fixed', handler=BrokenButFixedHandler,
name='broken-but-fixed'),
+ ], debug=False)
+ client = app.get_test_client()
+
+ response = client.get('/broken')
+ self.assertEqual(response.status_code, 500)
+
+ response = client.get('/broken-but-fixed')
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'That was close!')
+
+ def test_redirect(self):
+ class HandlerWithRedirect(RequestHandler):
+ def get(self, **kwargs):
+ return self.redirect('/')
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ Rule('/redirect-me', name='redirect', handler=HandlerWithRedirect),
+ ])
+
+ client = app.get_test_client()
+ response = client.get('/redirect-me', follow_redirects=True)
+ self.assertEqual(response.data, 'Method: GET')
+
+ def test_redirect_empty(self):
+ class HandlerWithRedirect(RequestHandler):
+ def get(self, **kwargs):
+ return self.redirect('/', empty=True)
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ Rule('/redirect-me', name='redirect', handler=HandlerWithRedirect),
+ ])
+
+ client = app.get_test_client()
+ response = client.get('/redirect-me', follow_redirects=False)
+ self.assertEqual(response.data, '')
+
+ def test_redirect_to(self):
+ class HandlerWithRedirectTo(RequestHandler):
+ def get(self, **kwargs):
+ return self.redirect_to('home')
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ Rule('/redirect-me', name='redirect', handler=HandlerWithRedirectTo),
+ ])
+
+ client = app.get_test_client()
+ response = client.get('/redirect-me', follow_redirects=True)
+ self.assertEqual(response.data, 'Method: GET')
+
+ def test_redirect_to_empty(self):
+ class HandlerWithRedirectTo(RequestHandler):
+ def get(self, **kwargs):
+ return self.redirect_to('home', _empty=True)
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ Rule('/redirect-me', name='redirect', handler=HandlerWithRedirectTo),
+ ])
+
+ client = app.get_test_client()
+ response = client.get('/redirect-me', follow_redirects=False)
+ self.assertEqual(response.data, '')
+
+ def test_redirect_relative_uris(self):
+ class MyHandler(RequestHandler):
+ def get(self):
+ return self.redirect(self.request.args.get('redirect'))
+
+ app = Tipfy(rules=[
+ Rule('/foo/bar', name='test1', handler=MyHandler),
+ Rule('/foo/bar/', name='test2', handler=MyHandler),
+ ])
+ client = app.get_test_client()
+
+ response = client.get('/foo/bar/', query_string={'redirect': '/baz'})
+ self.assertEqual(response.headers['Location'], '
http://localhost/baz')
+
+ response = client.get('/foo/bar/', query_string={'redirect': './baz'})
+
self.assertEqual(response.headers['Location'], '
http://localhost/foo/bar/baz')
+
+ response = client.get('/foo/bar/', query_string={'redirect': '../baz'})
+
self.assertEqual(response.headers['Location'], '
http://localhost/foo/baz')
+
+ response = client.get('/foo/bar', query_string={'redirect': '/baz'})
+ self.assertEqual(response.headers['Location'], '
http://localhost/baz')
+
+ response = client.get('/foo/bar', query_string={'redirect': './baz'})
+
self.assertEqual(response.headers['Location'], '
http://localhost/foo/baz')
+
+ response = client.get('/foo/bar', query_string={'redirect': '../baz'})
+ self.assertEqual(response.headers['Location'], '
http://localhost/baz')
+
+ def test_url_for(self):
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ Rule('/about', name='about', handler='handlers.About'),
+ Rule('/contact', name='contact', handler='handlers.Contact'),
+ ])
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(handler.url_for('home'), '/')
+ self.assertEqual(handler.url_for('about'), '/about')
+ self.assertEqual(handler.url_for('contact'), '/contact')
+
+ # Extras
+ self.assertEqual(handler.url_for('about',
_anchor='history'), '/about#history')
+ self.assertEqual(handler.url_for('about',
_full=True), '
http://localhost/about')
+ self.assertEqual(handler.url_for('about',
_netloc='
www.google.com'), '
http://www.google.com/about')
+ self.assertEqual(handler.url_for('about',
_scheme='https'), '
https://localhost/about')
+
+ def test_store_instances(self):
+ from tipfy.appengine.auth import AuthStore
+ from tipfy.i18n import I18nStore
+ from tipfy.sessions import SecureCookieSession
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ ], config={
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ },
+ })
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(isinstance(handler.session, SecureCookieSession), True)
+ self.assertEqual(isinstance(handler.auth, AuthStore), True)
+ self.assertEqual(isinstance(handler.i18n, I18nStore), True)
+
+
+class TestHandlerMiddleware(BaseTestCase):
+ def test_before_dispatch(self):
+ res = 'Intercepted!'
+
+ class MyMiddleware(object):
+ def before_dispatch(self, handler):
+ return Response(res)
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ return Response('default')
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ])
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.data, res)
+
+ def test_after_dispatch(self):
+ res = 'Intercepted!'
+
+ class MyMiddleware(object):
+ def after_dispatch(self, handler, response):
+ response.data += res
+ return response
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ return Response('default')
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ])
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.data, 'default' + res)
+
+ def test_handle_exception(self):
+ res = 'Catched!'
+
+ class MyMiddleware(object):
+ def handle_exception(self, handler, exception):
+ return Response(res)
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ raise ValueError()
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ])
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.data, res)
+
+ def test_handle_exception_2(self):
+ res = 'I fixed it!'
+
+ class MyMiddleware(object):
+ def handle_exception(self, handler, exception):
+ raise ValueError()
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ raise ValueError()
+
+ class ErrorHandler(RequestHandler):
+ def handle_exception(self, exception):
+ return Response(res)
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ], debug=False)
+ app.error_handlers[500] = ErrorHandler
+
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.data, res)
+
+ def test_handle_exception_3(self):
+ class MyMiddleware(object):
+ def handle_exception(self, handler, exception):
+ pass
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ raise ValueError()
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ])
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.status_code, 500)
+
+ def test_handle_exception_with_app_error_handler(self):
+ class MyMiddleware(object):
+ def handle_exception(self, handler, exception):
+ raise ValueError()
+
+ class MyHandler(RequestHandler):
+ middleware = [MyMiddleware()]
+
+ def get(self, **kwargs):
+ raise ValueError()
+
+ class ErrorHandler(RequestHandler):
+ def handle_exception(self, exception):
+ raise ValueError()
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler),
+ ], debug=False)
+ app.error_handlers[500] = ErrorHandler
+
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.status_code, 500)
+
+
+class TestTipfy(BaseTestCase):
+ def test_custom_error_handlers(self):
+ app = Tipfy([
+ Rule('/', handler=AllMethodsHandler, name='home'),
+ Rule('/broken', handler=BrokenHandler, name='broken'),
+ ], debug=False)
+ app.error_handlers = {
+ 404: Handle404,
+ 405: Handle405,
+ 500: Handle500,
+ }
+ client = app.get_test_client()
+
+ res = client.get('/nowhere')
+ self.assertEqual(res.status_code, 404)
+ self.assertEqual(res.data, '404 custom handler')
+
+ res = client.put('/broken')
+ self.assertEqual(res.status_code, 405)
+ self.assertEqual(res.data, '405 custom handler')
+ self.assertEqual(res.headers.get('Allow'), 'GET')
+
+ res = client.get('/broken')
+ self.assertEqual(res.status_code, 500)
+ self.assertEqual(res.data, '500 custom handler')
+
+ def test_store_classes(self):
+ from tipfy.appengine.auth import AuthStore
+ from tipfy.i18n import I18nStore
+ from tipfy.sessions import SessionStore
+
+ app = Tipfy()
+ self.assertEqual(app.auth_store_class, AuthStore)
+ self.assertEqual(app.i18n_store_class, I18nStore)
+ self.assertEqual(app.session_store_class, SessionStore)
+
+ def test_make_response(self):
+ app = Tipfy()
+ request = Request.from_values()
+
+ # Empty.
+ response = app.make_response(request)
+ self.assertEqual(isinstance(response, app.response_class), True)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, '')
+
+ # From Response.
+ response = app.make_response(request, Response('Hello, World!'))
+ self.assertEqual(isinstance(response, app.response_class), True)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'Hello, World!')
+
+ # From string.
+ response = app.make_response(request, 'Hello, World!')
+ self.assertEqual(isinstance(response, app.response_class), True)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'Hello, World!')
+
+ # From tuple.
+ response = app.make_response(request, 'Hello, World!', 404)
+ self.assertEqual(isinstance(response, app.response_class), True)
+ self.assertEqual(response.status_code, 404)
+ self.assertEqual(response.data, 'Hello, World!')
+
+ # From None.
+ self.assertRaises(ValueError, app.make_response, request, None)
+
+ def test_dev_run(self):
+ self._set_dev_server_flag(True)
+
+ sys.stdout = StringIO.StringIO()
+
+ os.environ['APPLICATION_ID'] = 'my-app'
+ os.environ['SERVER_SOFTWARE'] = 'Development'
+ os.environ['SERVER_NAME'] = 'localhost'
+ os.environ['SERVER_PORT'] = '8080'
+ os.environ['REQUEST_METHOD'] = 'GET'
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=AllMethodsHandler),
+ ], debug=True)
+
+ app.run()
+ self.assertEqual(sys.stdout.getvalue(), 'Status: 200 OK\r\nContent-Type:
text/html; charset=utf-8\r\nContent-Length: 11\r\n\r\nMethod: GET')
+
+ def test_get_config(self):
+ app = Tipfy(config={'tipfy': {'foo': 'bar'}})
+ self.assertEqual(app.get_config('tipfy', 'foo'), 'bar')
+
+
+class TestRequest(BaseTestCase):
+ def test_json(self):
+ class JsonHandler(RequestHandler):
+ def get(self, **kwargs):
+ return Response(self.request.json['foo'])
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=JsonHandler),
+ ], debug=True)
+
+ data = json_encode({'foo': 'bar'})
+ client = app.get_test_client()
+ response = client.get('/', content_type='application/json', data=data)
+ self.assertEqual(response.data, 'bar')
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/auth_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,737 @@
+import os
+import unittest
+
+from tipfy import Request, RequestHandler, Response, Rule, Tipfy
+from
tipfy.app import local
+
+import tipfy.auth
+from tipfy.auth import (AdminRequiredMiddleware, LoginRequiredMiddleware,
+ UserRequiredMiddleware, UserRequiredIfAuthenticatedMiddleware,
+ admin_required, login_required, user_required,
+ user_required_if_authenticated, check_password_hash,
generate_password_hash,
+ create_session_id, MultiAuthStore)
+from tipfy.appengine.auth import AuthStore, MixedAuthStore
+from tipfy.appengine.auth.model import User
+
+import test_utils
+
+
+class LoginHandler(RequestHandler):
+ def get(self, **kwargs):
+ return Response('login')
+
+
+class LogoutHandler(RequestHandler):
+ def get(self, **kwargs):
+ return Response('logout')
+
+
+class SignupHandler(RequestHandler):
+ def get(self, **kwargs):
+ return Response('signup')
+
+
+class HomeHandler(RequestHandler):
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+
+def get_app():
+ app = Tipfy(rules=[
+ Rule('/login', name='auth/login', handler=LoginHandler),
+ Rule('/logout', name='auth/logout', handler=LogoutHandler),
+ Rule('/signup', name='auth/signup', handler=SignupHandler),
+ ])
+ return app
+
+
+class TestAuthStore(test_utils.BaseTestCase):
+ def test_user_model(self):
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=HomeHandler))
+
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+
+ store = AuthStore(local.current_handler)
+ self.assertEqual(store.user_model, User)
+
+ def test_login_url(self):
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=HomeHandler))
+
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+ app.router.match(request)
+
+ store = AuthStore(local.current_handler)
+ self.assertEqual(store.login_url(),
local.current_handler.url_for('auth/login', redirect='/'))
+
+ tipfy.auth.DEV_APPSERVER_APPSERVER = False
+ store.config['secure_urls'] = True
+ self.assertEqual(store.login_url(),
local.current_handler.url_for('auth/login', redirect='/', _scheme='https'))
+ tipfy.auth.DEV_APPSERVER_APPSERVER = True
+
+ def test_logout_url(self):
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=HomeHandler))
+
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+ app.router.match(request)
+
+ store = AuthStore(local.current_handler)
+ self.assertEqual(store.logout_url(),
local.current_handler.url_for('auth/logout', redirect='/'))
+
+ def test_signup_url(self):
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=HomeHandler))
+
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+ app.router.match(request)
+
+ store = AuthStore(local.current_handler)
+ self.assertEqual(store.signup_url(),
local.current_handler.url_for('auth/signup', redirect='/'))
+
+
+class TestMiddleware(test_utils.BaseTestCase):
+ def tearDown(self):
+ os.environ.pop('USER_EMAIL', None)
+ os.environ.pop('USER_ID', None)
+ os.environ.pop('USER_IS_ADMIN', None)
+ test_utils.BaseTestCase.tearDown(self)
+
+ def test_login_required_middleware_invalid(self):
+ class MyHandler(HomeHandler):
+ middleware = [LoginRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_login_required_decorator_invalid(self):
+ class MyHandler(HomeHandler):
+ @login_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_login_required_middleware(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ middleware = [LoginRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_login_required_decorator(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ @login_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_middleware_invalid(self):
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_user_required_decorator_invalid(self):
+ class MyHandler(HomeHandler):
+ @user_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_user_required_middleware_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
+
+ def test_user_required_decorator_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ @user_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
+
+ def test_user_required_middleware_with_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_decorator_with_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ @user_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_if_authenticated_middleware(self):
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredIfAuthenticatedMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_if_authenticate_decorator(self):
+ class MyHandler(HomeHandler):
+ @user_required_if_authenticated
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_if_authenticated_middleware_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredIfAuthenticatedMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
+
+ def test_user_required_if_authenticate_decorator_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ @user_required_if_authenticated
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
+
+ def test_user_required_if_authenticated_middleware_with_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ middleware = [UserRequiredIfAuthenticatedMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_user_required_if_authenticate_decorator_with_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ @user_required_if_authenticated
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_admin_required_middleware(self):
+ class MyHandler(HomeHandler):
+ middleware = [AdminRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_admin_required_decorator(self):
+ class MyHandler(HomeHandler):
+ @admin_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 302)
+
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
+
+ def test_admin_required_middleware_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ middleware = [AdminRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 403)
+
+ def test_admin_required_decorator_logged_in(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ class MyHandler(HomeHandler):
+ @admin_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 403)
+
+ def test_admin_required_middleware_with_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ middleware = [AdminRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 403)
+
+ def test_admin_required_decorator_withd_user(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me')
+
+ class MyHandler(HomeHandler):
+ @admin_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 403)
+
+ def test_admin_required_middleware_with_admin(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me', is_admin=True)
+
+ class MyHandler(HomeHandler):
+ middleware = [AdminRequiredMiddleware()]
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+ def test_admin_required_decorator_with_admin(self):
+ os.environ['USER_EMAIL'] = '
m...@myself.com'
+ os.environ['USER_ID'] = 'me'
+
+ User.create('me', 'gae|me', is_admin=True)
+
+ class MyHandler(HomeHandler):
+ @admin_required
+ def get(self, **kwargs):
+ return Response('home sweet home')
+
+ app = get_app()
+ app.router.add(Rule('/', name='home', handler=MyHandler))
+ client = app.get_test_client()
+
+ response = client.get('/')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'home sweet home')
+
+
+class TestUserModel(test_utils.BaseTestCase):
+ def test_create(self):
+ user = User.create('my_username', 'my_id')
+ self.assertEqual(isinstance(user, User), True)
+
+ # Second one will fail to be created.
+ user = User.create('my_username', 'my_id')
+ self.assertEqual(user, None)
+
+ def test_create_with_password_hash(self):
+ user = User.create('my_username', 'my_id', password_hash='foo')
+
+ self.assertEqual(isinstance(user, User), True)
+ self.assertEqual(user.password, 'foo')
+
+ def test_create_with_password(self):
+ user = User.create('my_username', 'my_id', password='foo')
+
+ self.assertEqual(isinstance(user, User), True)
+ self.assertNotEqual(user.password, 'foo')
+ self.assertEqual(len(user.password.split('$')), 3)
+
+ def test_set_password(self):
+ user = User.create('my_username', 'my_id', password='foo')
+ self.assertEqual(isinstance(user, User), True)
+
+ password = user.password
+
+ user.set_password('bar')
+ self.assertNotEqual(user.password, password)
+
+ self.assertNotEqual(user.password, 'bar')
+ self.assertEqual(len(user.password.split('$')), 3)
+
+ def test_check_password(self):
+ app = Tipfy()
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+
+ user = User.create('my_username', 'my_id', password='foo')
+
+ self.assertEqual(user.check_password('foo'), True)
+ self.assertEqual(user.check_password('bar'), False)
+
+ def test_check_session(self):
+ app = Tipfy()
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+
+ user = User.create('my_username', 'my_id', password='foo')
+
+ session_id = user.session_id
+ self.assertEqual(user.check_session(session_id), True)
+ self.assertEqual(user.check_session('bar'), False)
+
+ def test_get_by_username(self):
+ user = User.create('my_username', 'my_id')
+ user_1 = User.get_by_username('my_username')
+
+ self.assertEqual(isinstance(user, User), True)
+ self.assertEqual(isinstance(user_1, User), True)
+ self.assertEqual(str(user.key()), str(user_1.key()))
+
+ def test_get_by_auth_id(self):
+ user = User.create('my_username', 'my_id')
+ user_1 = User.get_by_auth_id('my_id')
+
+ self.assertEqual(isinstance(user, User), True)
+ self.assertEqual(isinstance(user_1, User), True)
+ self.assertEqual(str(user.key()), str(user_1.key()))
+
+ def test_unicode(self):
+ user_1 = User(username='Calvin', auth_id='test', session_id='test')
+ self.assertEqual(unicode(user_1), u'Calvin')
+
+ def test_str(self):
+ user_1 = User(username='Hobbes', auth_id='test', session_id='test')
+ self.assertEqual(str(user_1), u'Hobbes')
+
+ def test_eq(self):
+ user_1 = User(key_name='test', username='Calvin', auth_id='test',
session_id='test')
+ user_2 = User(key_name='test', username='Calvin', auth_id='test',
session_id='test')
+
+ self.assertEqual(user_1, user_2)
+ self.assertNotEqual(user_1, '')
+
+ def test_ne(self):
+ user_1 = User(key_name='test', username='Calvin', auth_id='test',
session_id='test')
+ user_2 = User(key_name='test_2', username='Calvin', auth_id='test',
session_id='test')
+
+ self.assertEqual((user_1 != user_2), True)
+
+ def test_renew_session(self):
+ app = Tipfy()
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+
+ user = User.create('my_username', 'my_id')
+ user.renew_session(max_age=86400)
+
+ def test_renew_session_force(self):
+ app = Tipfy()
+ user = User.create('my_username', 'my_id')
+ user.renew_session(force=True, max_age=86400)
+
+
+class TestMiscelaneous(test_utils.BaseTestCase):
+ def test_create_session_id(self):
+ self.assertEqual(len(create_session_id()), 32)
+
+
+class TestMultiAuthStore(test_utils.BaseTestCase):
+ def get_app(self):
+ app = Tipfy(config={'tipfy.sessions': {
+ 'secret_key': 'secret',
+ }})
+ local.current_handler = RequestHandler(app, Request.from_values('/'))
+ return app
+
+ def test_login_with_form_invalid(self):
+ app = self.get_app()
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+
+ store = MultiAuthStore(local.current_handler)
+ res = store.login_with_form('foo', 'bar', remember=True)
+
+ self.assertEqual(res, False)
+
+ def test_login_with_form(self):
+ user = User.create('foo', 'foo_id', password='bar')
+ app = self.get_app()
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+
+ store = MultiAuthStore(local.current_handler)
+ res = store.login_with_form('foo', 'bar', remember=True)
+ self.assertEqual(res, True)
+
+ res = store.login_with_form('foo', 'bar', remember=False)
+ self.assertEqual(res, True)
+
+ def test_login_with_auth_id(self):
+ app = self.get_app()
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+
+ store = MultiAuthStore(local.current_handler)
+ store.login_with_auth_id('foo_id', remember=False)
+
+ user = User.create('foo', 'foo_id', password='bar')
+ app = self.get_app()
+ store.login_with_auth_id('foo_id', remember=True)
+
+ def test_real_login(self):
+ user = User.create('foo', 'foo_id', auth_remember=True)
+ app = self.get_app()
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+
+ store = MultiAuthStore(local.current_handler)
+ store.login_with_auth_id('foo_id', remember=False)
+
+ response = Response()
+ local.current_handler.session_store.save(response)
+
+ request = Request.from_values('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ local.current_handler = RequestHandler(app, request)
+ store = MultiAuthStore(local.current_handler)
+ self.assertNotEqual(store.user, None)
+ self.assertEqual(store.user.username, 'foo')
+ self.assertEqual(store.user.auth_id, 'foo_id')
+
+ def test_real_logout(self):
+ user = User.create('foo', 'foo_id', auth_remember=True)
+ app = self.get_app()
+ request = Request.from_values('/')
+ local.current_handler = RequestHandler(app, request)
+
+ store = MultiAuthStore(local.current_handler)
+ store.login_with_auth_id('foo_id', remember=False)
+
+ response = Response()
+ local.current_handler.session_store.save(response)
+
+ request = Request.from_values('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ local.current_handler = RequestHandler(app, request)
+ store = MultiAuthStore(local.current_handler)
+ self.assertNotEqual(store.user, None)
+ self.assertEqual(store.user.username, 'foo')
+ store.logout()
+
+ response = Response()
+ local.current_handler.session_store.save(response)
+
+ request = Request.from_values('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ local.current_handler = RequestHandler(app, request)
+ store = MultiAuthStore(local.current_handler)
+
+ #self.assertEqual(store.user, None)
+ self.assertEqual(store.session, None)
+
+ def test_real_login_no_user(self):
+ app = self.get_app()
+ store = MultiAuthStore(local.current_handler)
+ user = store.create_user('foo', 'foo_id')
+ store.login_with_auth_id('foo_id', remember=False)
+
+ response = Response()
+ local.current_handler.session_store.save(response)
+
+ user.delete()
+
+ request = Request.from_values('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ local.current_handler = RequestHandler(app, request)
+ store = MultiAuthStore(local.current_handler)
+ self.assertEqual(store.session['id'], 'foo_id')
+ self.assertEqual(store.user, None)
+
+ def test_real_login_invalid(self):
+ app = self.get_app()
+ store = MultiAuthStore(local.current_handler)
+ self.assertEqual(store.user, None)
+ self.assertEqual(store.session, None)
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/config_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,370 @@
+# -*- coding: utf-8 -*-
+"""
+Tests for tipfy config
+"""
+from __future__ import with_statement
+
+import unittest
+
+from tipfy import Tipfy, RequestHandler, REQUIRED_VALUE
+from
tipfy.app import local
+from tipfy.config import Config
+
+import test_utils
+
+
+class TestConfig(test_utils.BaseTestCase):
+ def test_get(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ }})
+
+ self.assertEqual(config.get('foo'), {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ })
+
+ self.assertEqual(config.get('bar'), {})
+
+ def test_get_existing_keys(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ def test_get_existing_keys_from_default(self):
+ config = Config({}, {'foo': {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ def test_get_non_existing_keys(self):
+ config = Config()
+
+ self.assertRaises(KeyError, config.get_config, 'foo', 'bar')
+
+ def test_get_dict_existing_keys(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ }})
+
+ self.assertEqual(config.get_config('foo'), {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ })
+
+ def test_get_dict_non_existing_keys(self):
+ config = Config()
+
+ self.assertRaises(KeyError, config.get_config, 'bar')
+
+ def test_get_with_default(self):
+ config = Config()
+
+ self.assertRaises(KeyError, config.get_config, 'foo', 'bar', 'ooops')
+ self.assertRaises(KeyError, config.get_config, 'foo', 'doo', 'wooo')
+
+ def test_get_with_default_and_none(self):
+ config = Config({'foo': {
+ 'bar': None,
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar', 'ooops'), None)
+
+ def test_update(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ config.update('foo', {'bar': 'other'})
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'other')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ def test_setdefault(self):
+ config = Config()
+
+ self.assertRaises(KeyError, config.get_config, 'foo')
+
+ config.setdefault('foo', {
+ 'bar': 'baz',
+ 'doo': 'ding',
+ })
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ def test_setdefault2(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ }})
+
+ self.assertEqual(config.get_config('foo'), {
+ 'bar': 'baz',
+ })
+
+ config.setdefault('foo', {
+ 'bar': 'wooo',
+ 'doo': 'ding',
+ })
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+ self.assertEqual(config.get_config('foo', 'doo'), 'ding')
+
+ def test_setitem(self):
+ config = Config()
+ config['foo'] = {'bar': 'baz'}
+
+ self.assertEqual(config, {'foo': {'bar': 'baz'}})
+ self.assertEqual(config['foo'], {'bar': 'baz'})
+
+ def test_init_no_dict_values(self):
+ self.assertRaises(AssertionError, Config, {'foo': 'bar'})
+ self.assertRaises(AssertionError, Config, {'foo': None})
+ self.assertRaises(AssertionError, Config, 'foo')
+
+ def test_init_no_dict_default(self):
+ self.assertRaises(AssertionError, Config, {}, {'foo': 'bar'})
+ self.assertRaises(AssertionError, Config, {}, {'foo': None})
+ self.assertRaises(AssertionError, Config, {}, 'foo')
+
+ def test_update_no_dict_values(self):
+ config = Config()
+
+ self.assertRaises(AssertionError, config.update, {'foo': 'bar'}, 'baz')
+ self.assertRaises(AssertionError, config.update, {'foo': None}, 'baz')
+ self.assertRaises(AssertionError, config.update, 'foo', 'bar')
+
+ def test_setdefault_no_dict_values(self):
+ config = Config()
+
+ self.assertRaises(AssertionError, config.setdefault, 'foo', 'bar')
+ self.assertRaises(AssertionError, config.setdefault, 'foo', None)
+
+ def test_setitem_no_dict_values(self):
+ config = Config()
+
+ def setitem(key, value):
+ config[key] = value
+ return config
+
+ self.assertRaises(AssertionError, setitem, 'foo', 'bar')
+ self.assertRaises(AssertionError, setitem, 'foo', None)
+
+
+class TestLoadConfig(test_utils.BaseTestCase):
+ def test_default_config(self):
+ config = Config()
+
+ from resources.template import default_config as template_config
+ from resources.i18n import default_config as i18n_config
+
+
self.assertEqual(config.get_config('resources.template', 'templates_dir'),
template_config['templates_dir'])
+ self.assertEqual(config.get_config('resources.i18n', 'locale'),
i18n_config['locale'])
+ self.assertEqual(config.get_config('resources.i18n', 'timezone'),
i18n_config['timezone'])
+
+ def test_default_config_with_non_existing_key(self):
+ config = Config()
+
+ from resources.i18n import default_config as i18n_config
+
+ # In the first time the module config will be loaded normally.
+ self.assertEqual(config.get_config('resources.i18n', 'locale'),
i18n_config['locale'])
+
+ # In the second time it won't be loaded, but won't find the value and
then use the default.
+
self.assertEqual(config.get_config('resources.i18n', 'i_dont_exist', 'foo'), 'foo')
+
+ def test_override_config(self):
+ config = Config({
+ 'resources.template': {
+ 'templates_dir': 'apps/templates'
+ },
+ 'resources.i18n': {
+ 'locale': 'pt_BR',
+ 'timezone': 'America/Sao_Paulo',
+ },
+ })
+
+
self.assertEqual(config.get_config('resources.template', 'templates_dir'), 'apps/templates')
+ self.assertEqual(config.get_config('resources.i18n', 'locale'), 'pt_BR')
+
self.assertEqual(config.get_config('resources.i18n', 'timezone'), 'America/Sao_Paulo')
+
+ def test_override_config2(self):
+ config = Config({
+ 'resources.i18n': {
+ 'timezone': 'America/Sao_Paulo',
+ },
+ })
+
+ self.assertEqual(config.get_config('resources.i18n', 'locale'), 'en_US')
+
self.assertEqual(config.get_config('resources.i18n', 'timezone'), 'America/Sao_Paulo')
+
+ def test_get(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar'), 'baz')
+
+ def test_get_with_default(self):
+ config = Config()
+
+
self.assertEqual(config.get_config('resources.i18n', 'bar', 'baz'), 'baz')
+
+ def test_get_with_default_and_none(self):
+ config = Config({'foo': {
+ 'bar': None,
+ }})
+
+ self.assertEqual(config.get_config('foo', 'bar', 'ooops'), None)
+
+ def test_get_with_default_and_module_load(self):
+ config = Config()
+ self.assertEqual(config.get_config('resources.i18n', 'locale'), 'en_US')
+
self.assertEqual(config.get_config('resources.i18n', 'locale', 'foo'), 'en_US')
+
+ def test_required_config(self):
+ config = Config()
+ self.assertRaises(KeyError, config.get_config, 'resources.i18n', 'foo')
+
+ def test_missing_module(self):
+ config = Config()
+ self.assertRaises(KeyError,
config.get_config, 'i_dont_exist', 'i_dont_exist')
+
+ def test_missing_module2(self):
+ config = Config()
+ self.assertRaises(KeyError, config.get_config, 'i_dont_exist')
+
+ def test_missing_key(self):
+ config = Config()
+ self.assertRaises(KeyError,
config.get_config, 'resources.i18n', 'i_dont_exist')
+
+ def test_missing_default_config(self):
+ config = Config()
+ self.assertRaises(KeyError, config.get_config, 'tipfy', 'foo')
+
+ def test_request_handler_get_config(self):
+ app = Tipfy()
+ with app.get_test_context() as request:
+ handler = RequestHandler(request)
+
+
self.assertEqual(handler.get_config('resources.i18n', 'locale'), 'en_US')
+
self.assertEqual(handler.get_config('resources.i18n', 'locale', 'foo'), 'en_US')
+ self.assertEqual(handler.get_config('resources.i18n'), {
+ 'locale': 'en_US',
+ 'timezone': 'America/Chicago',
+ 'required': REQUIRED_VALUE,
+ })
+
+
+class TestLoadConfigGetItem(test_utils.BaseTestCase):
+ def test_default_config(self):
+ config = Config()
+
+ from resources.template import default_config as template_config
+ from resources.i18n import default_config as i18n_config
+
+ self.assertEqual(config['resources.template']['templates_dir'],
template_config['templates_dir'])
+ self.assertEqual(config['resources.i18n']['locale'],
i18n_config['locale'])
+ self.assertEqual(config['resources.i18n']['timezone'],
i18n_config['timezone'])
+
+ def test_default_config_with_non_existing_key(self):
+ config = Config()
+
+ from resources.i18n import default_config as i18n_config
+
+ # In the first time the module config will be loaded normally.
+ self.assertEqual(config['resources.i18n']['locale'],
i18n_config['locale'])
+
+ # In the second time it won't be loaded, but won't find the value and
then use the default.
+
self.assertEqual(config['resources.i18n'].get('i_dont_exist', 'foo'), 'foo')
+
+ def test_override_config(self):
+ config = Config({
+ 'resources.template': {
+ 'templates_dir': 'apps/templates'
+ },
+ 'resources.i18n': {
+ 'locale': 'pt_BR',
+ 'timezone': 'America/Sao_Paulo',
+ },
+ })
+
+
self.assertEqual(config['resources.template']['templates_dir'], 'apps/templates')
+ self.assertEqual(config['resources.i18n']['locale'], 'pt_BR')
+
self.assertEqual(config['resources.i18n']['timezone'], 'America/Sao_Paulo')
+
+ def test_override_config2(self):
+ config = Config({
+ 'resources.i18n': {
+ 'timezone': 'America/Sao_Paulo',
+ },
+ })
+
+ self.assertEqual(config['resources.i18n']['locale'], 'en_US')
+
self.assertEqual(config['resources.i18n']['timezone'], 'America/Sao_Paulo')
+
+ def test_get(self):
+ config = Config({'foo': {
+ 'bar': 'baz',
+ }})
+
+ self.assertEqual(config['foo']['bar'], 'baz')
+
+ def test_get_with_default(self):
+ config = Config()
+
+ self.assertEqual(config['resources.i18n'].get('bar', 'baz'), 'baz')
+
+ def test_get_with_default_and_none(self):
+ config = Config({'foo': {
+ 'bar': None,
+ }})
+
+ self.assertEqual(config['foo'].get('bar', 'ooops'), None)
+
+ def test_get_with_default_and_module_load(self):
+ config = Config()
+ self.assertEqual(config['resources.i18n']['locale'], 'en_US')
+ self.assertEqual(config['resources.i18n'].get('locale', 'foo'), 'en_US')
+
+ def test_required_config(self):
+ config = Config()
+ self.assertRaises(KeyError, config['resources.i18n'].__getitem__, 'foo')
+ self.assertRaises(KeyError,
config['resources.i18n'].__getitem__, 'required')
+
+ def test_missing_module(self):
+ config = Config()
+ self.assertRaises(KeyError, config.__getitem__, 'i_dont_exist')
+
+ def test_missing_key(self):
+ config = Config()
+ self.assertRaises(KeyError,
config['resources.i18n'].__getitem__, 'i_dont_exist')
+
+ def test_missing_default_config(self):
+ config = Config()
+ self.assertRaises(KeyError, config['tipfy'].__getitem__, 'foo')
+
+
+class TestGetConfig(test_utils.BaseTestCase):
+ '''
+ def test_get_config(self):
+ app = Tipfy()
+ self.assertEqual(get_config('resources.i18n', 'locale'), 'en_US')
+ '''
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/ext_jinja2_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,218 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfyext.jinja2
+"""
+import os
+import sys
+import unittest
+
+from jinja2 import FileSystemLoader, Environment
+
+from tipfy import RequestHandler, Request, Response, Tipfy
+from
tipfy.app import local
+from tipfyext.jinja2 import Jinja2, Jinja2Mixin
+
+import test_utils
+
+current_dir = os.path.abspath(os.path.dirname(__file__))
+templates_dir = os.path.join(current_dir, 'resources', 'templates')
+templates_compiled_target =
os.path.join(current_dir, 'resources', 'templates_compiled')
+
+
+class TestJinja2(test_utils.BaseTestCase):
+ def test_render_template(self):
+ app = Tipfy(config={'tipfyext.jinja2': {'templates_dir': templates_dir}})
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, World!'
+ res = jinja2.render_template(local.current_handler, 'template1.html',
message=message)
+ self.assertEqual(res, message)
+
+ def test_render_template_with_i18n(self):
+ app = Tipfy(config={
+ 'tipfyext.jinja2': {
+ 'templates_dir': templates_dir,
+ 'environment_args': dict(
+ autoescape=True,
+
extensions=['jinja2.ext.autoescape', 'jinja2.ext.with_', 'jinja2.ext.i18n'],
+ ),
+ },
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ },
+ })
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, i18n World!'
+ res = jinja2.render_template(local.current_handler, 'template2.html',
message=message)
+ self.assertEqual(res, message)
+
+ def test_render_response(self):
+ app = Tipfy(config={'tipfyext.jinja2': {'templates_dir': templates_dir}})
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, World!'
+ response =
jinja2.render_response(local.current_handler, 'template1.html',
message=message)
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'text/html')
+ self.assertEqual(response.data, message)
+
+ def test_render_response_force_compiled(self):
+ app = Tipfy(config={
+ 'tipfyext.jinja2': {
+ 'templates_compiled_target': templates_compiled_target,
+ 'force_use_compiled': True,
+ }
+ }, debug=False)
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, World!'
+ response =
jinja2.render_response(local.current_handler, 'template1.html',
message=message)
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'text/html')
+ self.assertEqual(response.data, message)
+
+ def test_jinja2_mixin_render_template(self):
+ class MyHandler(RequestHandler, Jinja2Mixin):
+ def __init__(self, app, request):
+
self.app = app
+ self.request = request
+ self.context = {}
+
+ app = Tipfy(config={'tipfyext.jinja2': {'templates_dir': templates_dir}})
+ request = Request.from_values()
+ local.current_handler = handler = MyHandler(app, request)
+ jinja2 = Jinja2(app)
+ message = 'Hello, World!'
+
+ response = handler.render_template('template1.html', message=message)
+ self.assertEqual(response, message)
+
+ def test_jinja2_mixin_render_response(self):
+ class MyHandler(RequestHandler, Jinja2Mixin):
+ def __init__(self, app, request):
+
self.app = app
+ self.request = request
+ self.context = {}
+
+ app = Tipfy(config={'tipfyext.jinja2': {'templates_dir': templates_dir}})
+ request = Request.from_values()
+ local.current_handler = handler = MyHandler(app, request)
+ jinja2 = Jinja2(app)
+ message = 'Hello, World!'
+
+ response = handler.render_response('template1.html', message=message)
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'text/html')
+ self.assertEqual(response.data, message)
+
+ def test_get_template_attribute(self):
+ app = Tipfy(config={'tipfyext.jinja2': {'templates_dir': templates_dir}})
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ jinja2 = Jinja2(app)
+
+ hello = jinja2.get_template_attribute('hello.html', 'hello')
+ self.assertEqual(hello('World'), 'Hello, World!')
+
+ def test_engine_factory(self):
+ def get_jinja2_env():
+ app =
local.current_handler.app
+ cfg = app.get_config('tipfyext.jinja2')
+
+ loader = FileSystemLoader(cfg.get( 'templates_dir'))
+
+ return Environment(loader=loader)
+
+ app = Tipfy(config={'tipfyext.jinja2': {
+ 'templates_dir': templates_dir,
+ 'engine_factory': get_jinja2_env,
+ }})
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, World!'
+ res = jinja2.render_template(local.current_handler, 'template1.html',
message=message)
+ self.assertEqual(res, message)
+
+ def test_engine_factory2(self):
+ old_sys_path = sys.path[:]
+ sys.path.insert(0, current_dir)
+
+ app = Tipfy(config={'tipfyext.jinja2': {
+ 'templates_dir': templates_dir,
+ 'engine_factory': 'resources.get_jinja2_env',
+ }})
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ jinja2 = Jinja2(app)
+
+ message = 'Hello, World!'
+ res = jinja2.render_template(local.current_handler, 'template1.html',
message=message)
+ self.assertEqual(res, message)
+
+ sys.path = old_sys_path
+
+ def test_engine_factory3(self):
+ app = Tipfy()
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ _globals = {'message': 'Hey there!'}
+ filters = {'ho': lambda e: e + ' Ho!'}
+ jinja2 = Jinja2(app, _globals=_globals, filters=filters)
+
+ template = jinja2.environment.from_string("""{{ message|ho }}""")
+
+ self.assertEqual(template.render(), 'Hey there! Ho!')
+
+ def test_after_environment_created(self):
+ def after_creation(environment):
+ environment.filters['ho'] = lambda x: x + ', Ho!'
+
+ app = Tipfy(config={'tipfyext.jinja2': {'after_environment_created':
after_creation}})
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ jinja2 = Jinja2(app)
+
+ template = jinja2.environment.from_string("""{{ 'Hey'|ho }}""")
+ self.assertEqual(template.render(), 'Hey, Ho!')
+
+ def test_after_environment_created_using_string(self):
+ app = Tipfy(config={'tipfyext.jinja2':
{'after_environment_created': 'resources.jinja2_after_environment_created.after_creation'}})
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ jinja2 = Jinja2(app)
+
+ template = jinja2.environment.from_string("""{{ 'Hey'|ho }}""")
+ self.assertEqual(template.render(), 'Hey, Ho!')
+
+ def test_translations(self):
+ app = Tipfy(config={
+ 'tipfyext.jinja2': {
+ 'environment_args': {
+ 'extensions': ['jinja2.ext.i18n',],
+ },
+ },
+ 'tipfy.sessions': {
+ 'secret_key': 'foo',
+ },
+ })
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ jinja2 = Jinja2(app)
+
+ template = jinja2.environment.from_string("""{{ _('foo = %(bar)s',
bar='foo') }}""")
+ self.assertEqual(template.render(), 'foo = foo')
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/ext_mako_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfyext.mako
+"""
+import os
+import sys
+import unittest
+
+from tipfy import RequestHandler, Request, Response, Tipfy
+from
tipfy.app import local
+from tipfyext.mako import Mako, MakoMixin
+
+import test_utils
+
+current_dir = os.path.abspath(os.path.dirname(__file__))
+templates_dir = os.path.join(current_dir, 'resources', 'mako_templates')
+
+
+class TestMako(test_utils.BaseTestCase):
+ def test_render_template(self):
+ app = Tipfy(config={'tipfyext.mako': {'templates_dir': templates_dir}})
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ mako = Mako(app)
+
+ message = 'Hello, World!'
+ res = mako.render_template(local.current_handler, 'template1.html',
message=message)
+ self.assertEqual(res, message + '\n')
+
+ def test_render_response(self):
+ app = Tipfy(config={'tipfyext.mako': {'templates_dir': templates_dir}})
+ request = Request.from_values()
+ local.current_handler = handler = RequestHandler(app, request)
+ mako = Mako(app)
+
+ message = 'Hello, World!'
+ response = mako.render_response(local.current_handler, 'template1.html',
message=message)
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'text/html')
+ self.assertEqual(response.data, message + '\n')
+
+ def test_mako_mixin_render_template(self):
+ class MyHandler(RequestHandler, MakoMixin):
+ def __init__(self, app, request):
+
self.app = app
+ self.request = request
+ self.context = {}
+
+ app = Tipfy(config={'tipfyext.mako': {'templates_dir': templates_dir}})
+ request = Request.from_values()
+ local.current_handler = handler = MyHandler(app, request)
+ mako = Mako(app)
+ message = 'Hello, World!'
+
+ response = handler.render_template('template1.html', message=message)
+ self.assertEqual(response, message + '\n')
+
+ def test_mako_mixin_render_response(self):
+ class MyHandler(RequestHandler, MakoMixin):
+ def __init__(self, app, request):
+
self.app = app
+ self.request = request
+ self.context = {}
+
+ app = Tipfy(config={'tipfyext.mako': {'templates_dir': templates_dir}})
+ request = Request.from_values()
+ local.current_handler = handler = MyHandler(app, request)
+ mako = Mako(app)
+ message = 'Hello, World!'
+
+ response = handler.render_response('template1.html', message=message)
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'text/html')
+ self.assertEqual(response.data, message + '\n')
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/gae_acl_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,439 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfy.appengine.acl
+"""
+import unittest
+
+from google.appengine.api import memcache
+
+from tipfy import Tipfy, Request, RequestHandler, CURRENT_VERSION_ID
+from
tipfy.app import local
+from tipfy.appengine.acl import Acl, AclRules, _rules_map, AclMixin
+
+from
tipfy.app import local
+
+import test_utils
+
+
+class TestAcl(test_utils.BaseTestCase):
+ def setUp(self):
+ # Clean up datastore.
+ super(TestAcl, self).setUp()
+
+
self.app = Tipfy()
+ self.app.config['tipfy']['dev'] = False
+ local.current_handler = RequestHandler(
self.app, Request.from_values())
+
+ Acl.roles_map = {}
+ Acl.roles_lock = CURRENT_VERSION_ID
+ _rules_map.clear()
+ test_utils.BaseTestCase.setUp(self)
+
+ def tearDown(self):
+ self.app.config['tipfy']['dev'] = True
+
+ Acl.roles_map = {}
+ Acl.roles_lock = CURRENT_VERSION_ID
+ _rules_map.clear()
+ test_utils.BaseTestCase.tearDown(self)
+
+ def test_test_insert_or_update(self):
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertEqual(user_acl, None)
+
+ # Set empty rules.
+ user_acl = AclRules.insert_or_update(area='test', user='test')
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertNotEqual(user_acl, None)
+ self.assertEqual(user_acl.rules, [])
+ self.assertEqual(user_acl.roles, [])
+
+ rules = [
+ ('topic_1', 'name_1', True),
+ ('topic_1', 'name_2', True),
+ ('topic_2', 'name_1', False),
+ ]
+
+ user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertNotEqual(user_acl, None)
+ self.assertEqual(user_acl.rules, rules)
+ self.assertEqual(user_acl.roles, [])
+
+ extra_rule = ('topic_3', 'name_3', True)
+ rules.append(extra_rule)
+
+ user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules, roles=['foo', 'bar', 'baz'])
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertNotEqual(user_acl, None)
+ self.assertEqual(user_acl.rules, rules)
+ self.assertEqual(user_acl.roles, ['foo', 'bar', 'baz'])
+
+ def test_set_rules(self):
+ """Test setting and appending rules."""
+ rules = [
+ ('topic_1', 'name_1', True),
+ ('topic_1', 'name_2', True),
+ ('topic_2', 'name_1', False),
+ ]
+ extra_rule = ('topic_3', 'name_3', True)
+
+ # Set empty rules.
+ user_acl = AclRules.insert_or_update(area='test', user='test')
+
+ # Set rules and save the record.
+ user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
+
+ # Fetch the record again, and compare.
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertEqual(user_acl.rules, rules)
+
+ # Append more rules.
+ user_acl.rules.append(extra_rule)
+ user_acl.put()
+ rules.append(extra_rule)
+
+ # Fetch the record again, and compare.
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertEqual(user_acl.rules, rules)
+
+ def test_delete_rules(self):
+ rules = [
+ ('topic_1', 'name_1', True),
+ ('topic_1', 'name_2', True),
+ ('topic_2', 'name_1', False),
+ ]
+ user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
+
+ # Fetch the record again, and compare.
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ self.assertEqual(user_acl.rules, rules)
+
+ key_name = AclRules.get_key_name('test', 'test')
+ acl = Acl('test', 'test')
+
+ cached = memcache.get(key_name, namespace=AclRules.__name__)
+ self.assertEqual(key_name in _rules_map, True)
+ self.assertEqual(cached, _rules_map[key_name])
+
+ user_acl.delete()
+ user_acl2 = AclRules.get_by_area_and_user('test', 'test')
+
+ cached = memcache.get(key_name, namespace=AclRules.__name__)
+ self.assertEqual(user_acl2, None)
+ self.assertEqual(key_name not in _rules_map, True)
+ self.assertEqual(cached, None)
+
+ def test_is_rule_set(self):
+ rules = [
+ ('topic_1', 'name_1', True),
+ ('topic_1', 'name_2', True),
+ ('topic_2', 'name_1', False),
+ ]
+ user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
+
+ # Fetch the record again, and compare.
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+
+ self.assertEqual(user_acl.is_rule_set(*rules[0]), True)
+ self.assertEqual(user_acl.is_rule_set(*rules[1]), True)
+ self.assertEqual(user_acl.is_rule_set(*rules[2]), True)
+ self.assertEqual(user_acl.is_rule_set('topic_1', 'name_3', True), False)
+
+ def test_no_area_or_no_user(self):
+ acl1 = Acl('foo', None)
+ acl2 = Acl(None, 'foo')
+
+ self.assertEqual(acl1.has_any_access(), False)
+ self.assertEqual(acl2.has_any_access(), False)
+
+ def test_default_roles_lock(self):
+ Acl.roles_lock = None
+ acl2 = Acl('foo', 'foo')
+
+ self.assertEqual(acl2.roles_lock, CURRENT_VERSION_ID)
+
+ def test_set_invalid_rules(self):
+ rules = {}
+ self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
+
+ rules = ['foo', 'bar', True]
+ self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
+
+ rules = [('foo',)]
+ self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
+
+ rules = [('foo', 'bar')]
+ self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
+
+ rules = [(1, 2, 3)]
+ self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
+
+ rules = [('foo', 'bar', True)]
+ AclRules.insert_or_update(area='test', user='test', rules=rules)
+ user_acl = AclRules.get_by_area_and_user('test', 'test')
+ user_acl.rules.append((1, 2, 3))
+ self.assertRaises(AssertionError, user_acl.put)
+
+ def test_example(self):
+ """Tests the example set in the acl module."""
+ # Set a dict of roles with an 'admin' role that has full access and
assign
+ # users to it. Each role maps to a list of rules. Each rule, a tuple
+ # (topic, name, flag), where flag, as bool to allow or disallow access.
+ # Wildcard '*' can be used to match all topics and/or names.
+ Acl.roles_map = {
+ 'admin': [
+ ('*', '*', True),
+ ],
+ }
+
+ # Assign users 'user_1' and 'user_2' to the 'admin' role.
+ AclRules.insert_or_update(area='my_area', user='user_1', roles=['admin'])
+ AclRules.insert_or_update(area='my_area', user='user_2', roles=['admin'])
+
+ # Restrict 'user_2' from accessing a specific resource, adding a new rule
+ # with flag set to False. Now this user has access to everything except
this
+ # resource.
+ user_acl = AclRules.get_by_area_and_user('my_area', 'user_2')
+ user_acl.rules.append(('UserAdmin', '*', False))
+ user_acl.put()
+
+ # Check 'user_2' permission.
+ acl = Acl(area='my_area', user='user_2')
+ self.assertEqual(acl.has_access(topic='UserAdmin', name='save'), False)
+ self.assertEqual(acl.has_access(topic='UserAdmin', name='get'), False)
+ self.assertEqual(acl.has_access(topic='AnythingElse', name='put'), True)
+
+ def test_is_one(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
+
+ acl = Acl(area='my_area', user='user_1')
+ self.assertEqual(acl.is_one('editor'), True)
+ self.assertEqual(acl.is_one('designer'), True)
+ self.assertEqual(acl.is_one('admin'), False)
+
+ def test_is_any(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
+
+ acl = Acl(area='my_area', user='user_1')
+ self.assertEqual(acl.is_any(['editor', 'admin']), True)
+ self.assertEqual(acl.is_any(['admin', 'designer']), True)
+ self.assertEqual(acl.is_any(['admin', 'user']), False)
+
+ def test_is_all(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
+
+ acl = Acl(area='my_area', user='user_1')
+ self.assertEqual(acl.is_all(['editor', 'admin']), False)
+ self.assertEqual(acl.is_all(['admin', 'designer']), False)
+ self.assertEqual(acl.is_all(['admin', 'user']), False)
+ self.assertEqual(acl.is_all(['editor', 'designer']), True)
+
+ def test_non_existent_user(self):
+ acl = Acl(area='my_area', user='user_3')
+ self.assertEqual(acl.has_any_access(), False)
+
+ def test_has_any_access(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
+ AclRules.insert_or_update(area='my_area', user='user_2',
rules=[('*', '*', True)])
+ AclRules.insert_or_update(area='my_area', user='user_3')
+
+ acl = Acl(area='my_area', user='user_1')
+ self.assertEqual(acl.has_any_access(), True)
+
+ acl = Acl(area='my_area', user='user_2')
+ self.assertEqual(acl.has_any_access(), True)
+
+ acl = Acl(area='my_area', user='user_3')
+ self.assertEqual(acl.has_any_access(), False)
+ self.assertEqual(acl._rules, [])
+ self.assertEqual(acl._roles, [])
+
+ def test_has_access_invalid_parameters(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
rules=[('*', '*', True)])
+
+ acl1 = Acl(area='my_area', user='user_1')
+
+ self.assertRaises(ValueError, acl1.has_access, 'content', '*')
+ self.assertRaises(ValueError, acl1.has_access, '*', 'content')
+
+ def test_has_access(self):
+ AclRules.insert_or_update(area='my_area', user='user_1',
rules=[('*', '*', True)])
+ AclRules.insert_or_update(area='my_area', user='user_2',
rules=[('content', '*', True), ('content', 'delete', False)])
+ AclRules.insert_or_update(area='my_area', user='user_3',
rules=[('content', 'read', True)])
+
+ acl1 = Acl(area='my_area', user='user_1')
+ acl2 = Acl(area='my_area', user='user_2')
+ acl3 = Acl(area='my_area', user='user_3')
+
+ self.assertEqual(acl1.has_access('content', 'read'), True)
+ self.assertEqual(acl1.has_access('content', 'update'), True)
+ self.assertEqual(acl1.has_access('content', 'delete'), True)
+
+ self.assertEqual(acl2.has_access('content', 'read'), True)
+ self.assertEqual(acl2.has_access('content', 'update'), True)
+ self.assertEqual(acl2.has_access('content', 'delete'), False)
+
+ self.assertEqual(acl3.has_access('content', 'read'), True)
+ self.assertEqual(acl3.has_access('content', 'update'), False)
+ self.assertEqual(acl3.has_access('content', 'delete'), False)
+
+ def test_has_access_with_roles(self):
+ Acl.roles_map = {
+ 'admin': [('*', '*', True),],
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete', False)],
+ 'designer': [('design', '*', True),],
+ }
+
+ AclRules.insert_or_update(area='my_area', user='user_1', roles=['admin'])
+ acl1 = Acl(area='my_area', user='user_1')
+
+ AclRules.insert_or_update(area='my_area', user='user_2',
roles=['admin'], rules=[('ManageUsers', '*', False)])
+ acl2 = Acl(area='my_area', user='user_2')
+
+ AclRules.insert_or_update(area='my_area', user='user_3',
roles=['editor'])
+ acl3 = Acl(area='my_area', user='user_3')
+
+ AclRules.insert_or_update(area='my_area', user='user_4',
roles=['contributor'], rules=[('design', '*', True),])
+ acl4 = Acl(area='my_area', user='user_4')
+
+ self.assertEqual(acl1.has_access('ApproveUsers', 'save'), True)
+ self.assertEqual(acl1.has_access('ManageUsers', 'edit'), True)
+ self.assertEqual(acl1.has_access('ManageUsers', 'delete'), True)
+
+ self.assertEqual(acl1.has_access('ApproveUsers', 'save'), True)
+ self.assertEqual(acl2.has_access('ManageUsers', 'edit'), False)
+ self.assertEqual(acl2.has_access('ManageUsers', 'delete'), False)
+
+ self.assertEqual(acl3.has_access('ApproveUsers', 'save'), False)
+ self.assertEqual(acl3.has_access('ManageUsers', 'edit'), False)
+ self.assertEqual(acl3.has_access('ManageUsers', 'delete'), False)
+ self.assertEqual(acl3.has_access('content', 'edit'), True)
+ self.assertEqual(acl3.has_access('content', 'delete'), True)
+ self.assertEqual(acl3.has_access('content', 'save'), True)
+ self.assertEqual(acl3.has_access('design', 'edit'), False)
+ self.assertEqual(acl3.has_access('design', 'delete'), False)
+
+ self.assertEqual(acl4.has_access('ApproveUsers', 'save'), False)
+ self.assertEqual(acl4.has_access('ManageUsers', 'edit'), False)
+ self.assertEqual(acl4.has_access('ManageUsers', 'delete'), False)
+ self.assertEqual(acl4.has_access('content', 'edit'), True)
+ self.assertEqual(acl4.has_access('content', 'delete'), False)
+ self.assertEqual(acl4.has_access('content', 'save'), True)
+ self.assertEqual(acl4.has_access('design', 'edit'), True)
+ self.assertEqual(acl4.has_access('design', 'delete'), True)
+
+ def test_roles_lock_unchanged(self):
+ roles_map1 = {
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete', False)],
+ }
+ Acl.roles_map = roles_map1
+ Acl.roles_lock = 'initial'
+
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
+ acl1 = Acl(area='my_area', user='user_1')
+
+ AclRules.insert_or_update(area='my_area', user='user_2',
roles=['contributor'])
+ acl2 = Acl(area='my_area', user='user_2')
+
+ self.assertEqual(acl1.has_access('content', 'add'), True)
+ self.assertEqual(acl1.has_access('content', 'edit'), True)
+ self.assertEqual(acl1.has_access('content', 'delete'), True)
+
+ self.assertEqual(acl2.has_access('content', 'add'), True)
+ self.assertEqual(acl2.has_access('content', 'edit'), True)
+ self.assertEqual(acl2.has_access('content', 'delete'), False)
+
+ roles_map2 = {
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete', False),
('content', 'add', False)],
+ }
+ Acl.roles_map = roles_map2
+ # Don't change the lock to check that the cache will be kept.
+ # Acl.roles_lock = 'changed'
+
+ acl1 = Acl(area='my_area', user='user_1')
+ acl2 = Acl(area='my_area', user='user_2')
+
+ self.assertEqual(acl1.has_access('content', 'add'), True)
+ self.assertEqual(acl1.has_access('content', 'edit'), True)
+ self.assertEqual(acl1.has_access('content', 'delete'), True)
+
+ self.assertEqual(acl2.has_access('content', 'add'), True)
+ self.assertEqual(acl2.has_access('content', 'edit'), True)
+ self.assertEqual(acl2.has_access('content', 'delete'), False)
+
+ def test_roles_lock_changed(self):
+ roles_map1 = {
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete', False)],
+ }
+ Acl.roles_map = roles_map1
+ Acl.roles_lock = 'initial'
+
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
+ acl1 = Acl(area='my_area', user='user_1')
+
+ AclRules.insert_or_update(area='my_area', user='user_2',
roles=['contributor'])
+ acl2 = Acl(area='my_area', user='user_2')
+
+ self.assertEqual(acl1.has_access('content', 'add'), True)
+ self.assertEqual(acl1.has_access('content', 'edit'), True)
+ self.assertEqual(acl1.has_access('content', 'delete'), True)
+
+ self.assertEqual(acl2.has_access('content', 'add'), True)
+ self.assertEqual(acl2.has_access('content', 'edit'), True)
+ self.assertEqual(acl2.has_access('content', 'delete'), False)
+
+ roles_map2 = {
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete', False),
('content', 'add', False)],
+ }
+ Acl.roles_map = roles_map2
+ Acl.roles_lock = 'changed'
+
+ acl1 = Acl(area='my_area', user='user_1')
+ acl2 = Acl(area='my_area', user='user_2')
+
+ self.assertEqual(acl1.has_access('content', 'add'), True)
+ self.assertEqual(acl1.has_access('content', 'edit'), True)
+ self.assertEqual(acl1.has_access('content', 'delete'), True)
+
+ self.assertEqual(acl2.has_access('content', 'add'), False)
+ self.assertEqual(acl2.has_access('content', 'edit'), True)
+ self.assertEqual(acl2.has_access('content', 'delete'), False)
+
+ def test_acl_mixin(self):
+ roles_map1 = {
+ 'editor': [('content', '*', True),],
+ 'contributor': [('content', '*', True), ('content', 'delete', False)],
+ }
+ AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
+
+ class Area(object):
+ def key(self):
+ return 'my_area'
+
+ class User(object):
+ def key(self):
+ return 'user_1'
+
+ class MyHandler(AclMixin):
+ roles_map = roles_map1
+ roles_lock = 'foo'
+
+ def __init__(self):
+ self.area = Area()
+ self.current_user = User()
+
+ handler = MyHandler()
+ self.assertEqual(handler.acl.has_access('content', 'add'), True)
+ self.assertEqual(handler.acl.has_access('content', 'edit'), True)
+ self.assertEqual(handler.acl.has_access('content', 'delete'), True)
+ self.assertEqual(handler.acl.has_access('foo', 'delete'), False)
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/gae_blobstore_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfy.appengine.blobstore
+"""
+import datetime
+import decimal
+import email
+import StringIO
+import time
+import unittest
+
+from google.appengine.ext import blobstore
+
+from tipfy.appengine.blobstore import (CreationFormatError,
parse_blob_info,
+ parse_creation)
+
+from werkzeug import FileStorage
+
+import test_utils
+
+
+class TestParseCreation(test_utils.BaseTestCase):
+ """YYYY-mm-dd HH:MM:SS.ffffff"""
+ def test_invalid_format(self):
+ self.assertRaises(CreationFormatError, parse_creation, '2010-05-20
6:20:35', 'my_field_name')
+
+ def test_invalid_format2(self):
+ self.assertRaises(CreationFormatError, parse_creation, '2010-05-20
6:20:35.1234.5678', 'my_field_name')
+
+ def test_invalid_format3(self):
+ self.assertRaises(CreationFormatError,
parse_creation, 'youcannot.parseme', 'my_field_name')
+
+ def test_parse(self):
+ timestamp = time.time()
+ parts = str(timestamp).split('.', 1)
+ ms = parts[1][:4]
+ timestamp = decimal.Decimal(parts[0] + '.' + ms)
+ curr_date = datetime.datetime.fromtimestamp(timestamp)
+
+ to_convert = '%s.%s' % (curr_date.strftime('%Y-%m-%d %H:%M:%S'), ms)
+
+ res = parse_creation(to_convert, 'my_field_name')
+
+ self.assertEqual(res.timetuple(), curr_date.timetuple())
+
+
+
+class TestParseBlobInfo(test_utils.BaseTestCase):
+ def test_none(self):
+ self.assertEqual(parse_blob_info(None, 'my_field_name'), None)
+
+ def test_file(self):
+ stream = StringIO.StringIO()
+ stream.write("""\
+Content-Type: application/octet-stream
+Content-Length: 1
+X-AppEngine-Upload-Creation: 2010-10-01 05:34:00.000000
+""")
+ stream.seek(0)
+ headers = {}
+ headers['Content-Type'] = 'image/png; blob-key=foo'
+
+ f = FileStorage(stream=stream, headers=headers)
+ self.assertNotEqual(parse_blob_info(f, 'my_field_name'), None)
+
+ def test_invalid_size(self):
+ stream = StringIO.StringIO()
+ stream.write("""\
+Content-Type: application/octet-stream
+Content-Length: zzz
+X-AppEngine-Upload-Creation: 2010-10-01 05:34:00.000000
+""")
+ stream.seek(0)
+ headers = {}
+ headers['Content-Type'] = 'image/png; blob-key=foo'
+
+ f = FileStorage(stream=stream, headers=headers)
+ self.assertRaises(blobstore.BlobInfoParseError,
parse_blob_info,f, 'my_field_name')
+
+ def test_invalid_CREATION(self):
+ stream = StringIO.StringIO()
+ stream.write("""\
+Content-Type: application/octet-stream
+Content-Length: 1
+X-AppEngine-Upload-Creation: XXX
+""")
+ stream.seek(0)
+ headers = {}
+ headers['Content-Type'] = 'image/png; blob-key=foo'
+
+ f = FileStorage(stream=stream, headers=headers)
+ self.assertRaises(blobstore.BlobInfoParseError,
parse_blob_info,f, 'my_field_name')
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/gae_db_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,662 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfy.appengine.db
+"""
+import unittest
+import hashlib
+
+from google.appengine.ext import db
+from google.appengine.api import datastore_errors
+
+from werkzeug.exceptions import NotFound
+
+from tipfy.appengine import db as ext_db
+
+import test_utils
+
+
+class FooModel(db.Model):
+ name = db.StringProperty(required=True)
+ name2 = db.StringProperty()
+ age = db.IntegerProperty()
+ married = db.BooleanProperty()
+ data = ext_db.PickleProperty()
+ slug = ext_db.SlugProperty(name)
+ slug2 = ext_db.SlugProperty(name2, default='some-default-value',
max_length=20)
+ etag = ext_db.EtagProperty(name)
+ etag2 = ext_db.EtagProperty(name2)
+ somekey = ext_db.KeyProperty()
+
+
+class FooExpandoModel(db.Expando):
+ pass
+
+
+class BarModel(db.Model):
+ foo = db.ReferenceProperty(FooModel)
+
+
+class JsonModel(db.Model):
+ data = ext_db.JsonProperty()
+
+
+class TimezoneModel(db.Model):
+ data = ext_db.TimezoneProperty()
+
+
+@ext_db.retry_on_timeout(retries=3, interval=0.1)
+def test_timeout_1(**kwargs):
+ counter = kwargs.get('counter')
+
+ # Let it pass only in the last attempt
+ if counter[0] < 3:
+ counter[0] += 1
+ raise db.Timeout()
+
+
+@ext_db.retry_on_timeout(retries=5, interval=0.1)
+def test_timeout_2(**kwargs):
+ counter = kwargs.get('counter')
+
+ # Let it pass only in the last attempt
+ if counter[0] < 5:
+ counter[0] += 1
+ raise db.Timeout()
+
+ raise ValueError()
+
+
+@ext_db.retry_on_timeout(retries=2, interval=0.1)
+def test_timeout_3(**kwargs):
+ # Never let it pass.
+ counter = kwargs.get('counter')
+ counter[0] += 1
+ raise db.Timeout()
+
+
+class TestModel(test_utils.BaseTestCase):
+ def test_no_protobuf_from_entity(self):
+ res_1 = ext_db.get_entity_from_protobuf([])
+ self.assertEqual(res_1, None)
+ res_2 = ext_db.get_protobuf_from_entity(None)
+ self.assertEqual(res_2, None)
+
+ def test_no_entity_from_protobuf(self):
+ res_1 = ext_db.get_entity_from_protobuf([])
+ self.assertEqual(res_1, None)
+
+ def test_one_model_to_and_from_protobuf(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ pb_1 = ext_db.get_protobuf_from_entity(entity_1)
+
+ entity_1 = ext_db.get_entity_from_protobuf(pb_1)
+ self.assertEqual(isinstance(entity_1, FooModel), True)
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ def test_many_models_to_and_from_protobuf(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+ entity_2 = FooModel(name='bar', age=30, married=True)
+ entity_2.put()
+ entity_3 = FooModel(name='baz', age=45, married=False)
+ entity_3.put()
+
+ pbs = ext_db.get_protobuf_from_entity([entity_1, entity_2, entity_3])
+ self.assertEqual(len(pbs), 3)
+
+ entity_1, entity_2, entity_3 = ext_db.get_entity_from_protobuf(pbs)
+ self.assertEqual(isinstance(entity_1, FooModel), True)
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ self.assertEqual(isinstance(entity_2, FooModel), True)
+ self.assertEqual(
entity_2.name, 'bar')
+ self.assertEqual(entity_2.age, 30)
+ self.assertEqual(entity_2.married, True)
+
+ self.assertEqual(isinstance(entity_3, FooModel), True)
+ self.assertEqual(
entity_3.name, 'baz')
+ self.assertEqual(entity_3.age, 45)
+ self.assertEqual(entity_3.married, False)
+
+ def test_get_protobuf_from_entity_using_dict(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+ entity_2 = FooModel(name='bar', age=30, married=True)
+ entity_2.put()
+ entity_3 = FooModel(name='baz', age=45, married=False)
+ entity_3.put()
+
+ entity_dict = {'entity_1': entity_1, 'entity_2': entity_2, 'entity_3':
entity_3,}
+
+ pbs = ext_db.get_protobuf_from_entity(entity_dict)
+
+ entities = ext_db.get_entity_from_protobuf(pbs)
+ entity_1 = entities['entity_1']
+ entity_2 = entities['entity_2']
+ entity_3 = entities['entity_3']
+
+ self.assertEqual(isinstance(entity_1, FooModel), True)
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ self.assertEqual(isinstance(entity_2, FooModel), True)
+ self.assertEqual(
entity_2.name, 'bar')
+ self.assertEqual(entity_2.age, 30)
+ self.assertEqual(entity_2.married, True)
+
+ self.assertEqual(isinstance(entity_3, FooModel), True)
+ self.assertEqual(
entity_3.name, 'baz')
+ self.assertEqual(entity_3.age, 45)
+ self.assertEqual(entity_3.married, False)
+
+ def test_get_or_insert_with_flag(self):
+ entity, flag = ext_db.get_or_insert_with_flag(FooModel, 'foo',
name='foo', age=15, married=False)
+ self.assertEqual(flag, True)
+ self.assertEqual(
entity.name, 'foo')
+ self.assertEqual(entity.age, 15)
+ self.assertEqual(entity.married, False)
+
+ entity, flag = ext_db.get_or_insert_with_flag(FooModel, 'foo',
name='bar', age=30, married=True)
+ self.assertEqual(flag, False)
+ self.assertEqual(
entity.name, 'foo')
+ self.assertEqual(entity.age, 15)
+ self.assertEqual(entity.married, False)
+
+ def test_get_reference_key(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+ entity_1_key = str(entity_1.key())
+
+ entity_2 = BarModel(key_name='first_bar', foo=entity_1)
+ entity_2.put()
+
+ entity_1.delete()
+ entity_3 = BarModel.get_by_key_name('first_bar')
+ # Won't resolve, but we can still get the key value.
+ self.assertRaises(db.Error, getattr, entity_3, 'foo')
+ self.assertEqual(str(ext_db.get_reference_key(entity_3, 'foo')),
entity_1_key)
+
+ def test_get_reference_key_2(self):
+ # Set a book entity with an author reference.
+ class Author(db.Model):
+ name = db.StringProperty()
+
+ class Book(db.Model):
+ title = db.StringProperty()
+ author = db.ReferenceProperty(Author)
+
+ author = Author(name='Stephen King')
+ author.put()
+
+ book = Book(key_name='the-shining', title='The Shining', author=author)
+ book.put()
+
+ # Now let's fetch the book and get the author key without fetching it.
+ fetched_book = Book.get_by_key_name('the-shining')
+ self.assertEqual(str(ext_db.get_reference_key(fetched_book, 'author')),
str(author.key()))
+
+
#===========================================================================
+ # db.populate_entity
+
#===========================================================================
+ def test_populate_entity(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'bar')
+ self.assertEqual(entity_1.age, 20)
+ self.assertEqual(entity_1.married, True)
+
+ def test_populate_entity_2(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
+ entity_1.put()
+
+ self.assertRaises(AttributeError, getattr, entity_1, 'city')
+
+ def test_populate_expando_entity(self):
+ entity_1 = FooExpandoModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'bar')
+ self.assertEqual(entity_1.age, 20)
+ self.assertEqual(entity_1.married, True)
+
+ def test_populate_expando_entity_2(self):
+ entity_1 = FooExpandoModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ self.assertEqual(
entity_1.name, 'foo')
+ self.assertEqual(entity_1.age, 15)
+ self.assertEqual(entity_1.married, False)
+
+ ext_db.populate_entity(entity_1, name='bar', age=20, married=True,
city='Yukon')
+ entity_1.put()
+
+ self.assertRaises(AttributeError, getattr, entity_1, 'city')
+
+
+
#===========================================================================
+ # db.get_entity_dict
+
#===========================================================================
+ def test_get_entity_dict(self):
+ class MyModel(db.Model):
+ animal = db.StringProperty()
+ species = db.IntegerProperty()
+ description = db.TextProperty()
+
+ entity = MyModel(animal='duck', species=12,
+ description='A duck, a bird that swims well.')
+ values = ext_db.get_entity_dict(entity)
+
+ self.assertEqual(values, {
+ 'animal': 'duck',
+ 'species': 12,
+ 'description': 'A duck, a bird that swims well.',
+ })
+
+ def test_get_entity_dict_multiple(self):
+ class MyModel(db.Model):
+ animal = db.StringProperty()
+ species = db.IntegerProperty()
+ description = db.TextProperty()
+
+ entity = MyModel(animal='duck', species=12,
+ description='A duck, a bird that swims well.')
+ entity2 = MyModel(animal='bird', species=7,
+ description='A bird, an animal that flies well.')
+ values = ext_db.get_entity_dict([entity, entity2])
+
+ self.assertEqual(values, [
+ {
+ 'animal': 'duck',
+ 'species': 12,
+ 'description': 'A duck, a bird that swims well.',
+ },
+ {
+ 'animal': 'bird',
+ 'species': 7,
+ 'description': 'A bird, an animal that flies well.',
+ }
+ ])
+
+ def test_get_entity_dict_with_expando(self):
+ class MyModel(db.Expando):
+ animal = db.StringProperty()
+ species = db.IntegerProperty()
+ description = db.TextProperty()
+
+ entity = MyModel(animal='duck', species=12,
+ description='A duck, a bird that swims well.',
+ most_famous='Daffy Duck')
+ values = ext_db.get_entity_dict(entity)
+
+ self.assertEqual(values, {
+ 'animal': 'duck',
+ 'species': 12,
+ 'description': 'A duck, a bird that swims well.',
+ 'most_famous': 'Daffy Duck',
+ })
+
+
#===========================================================================
+ # get..._or_404
+
#===========================================================================
+ def test_get_by_key_name_or_404(self):
+ entity_1 = FooModel(key_name='foo', name='foo', age=15, married=False)
+ entity_1.put()
+
+ entity = ext_db.get_by_key_name_or_404(FooModel, 'foo')
+ self.assertEqual(str(entity.key()), str(entity_1.key()))
+
+ def test_get_by_key_name_or_404_2(self):
+ self.assertRaises(NotFound, ext_db.get_by_key_name_or_404,
FooModel, 'bar')
+
+ def test_get_by_id_or_404(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ entity = ext_db.get_by_id_or_404(FooModel, entity_1.key().id())
+ self.assertEqual(str(entity.key()), str(entity_1.key()))
+
+ def test_get_by_id_or_404_2(self):
+ self.assertRaises(NotFound, ext_db.get_by_id_or_404, FooModel, -1)
+
+ def test_get_or_404(self):
+ entity_1 = FooModel(name='foo', age=15, married=False)
+ entity_1.put()
+
+ entity = ext_db.get_or_404(entity_1.key())
+ self.assertEqual(str(entity.key()), str(entity_1.key()))
+
+ def test_get_or_404_2(self):
+ self.assertRaises(NotFound, ext_db.get_or_404,
db.Key.from_path('FooModel', 'bar'))
+
+ def test_get_or_404_3(self):
+ self.assertRaises(NotFound, ext_db.get_or_404, 'this, not a valid key')
+
+
#===========================================================================
+ # db.Property
+
#===========================================================================
+ def test_pickle_property(self):
+ data_1 = {'foo': 'bar'}
+ entity_1 = FooModel(key_name='foo', name='foo', data=data_1)
+ entity_1.put()
+
+ data_2 = [1, 2, 3, 'baz']
+ entity_2 = FooModel(key_name='bar', name='bar', data=data_2)
+ entity_2.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ self.assertEqual(entity_1.data, data_1)
+
+ entity_2 = FooModel.get_by_key_name('bar')
+ self.assertEqual(entity_2.data, data_2)
+
+ def test_slug_property(self):
+ entity_1 = FooModel(key_name='foo', name=u'Mary Björk')
+ entity_1.put()
+
+ entity_2 = FooModel(key_name='bar', name=u'Tião Macalé')
+ entity_2.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ entity_2 = FooModel.get_by_key_name('bar')
+ self.assertEqual(entity_1.slug, 'mary-bjork')
+ self.assertEqual(entity_2.slug, 'tiao-macale')
+
+ def test_slug_property2(self):
+ entity_1 = FooModel(key_name='foo', name=u'---')
+ entity_1.put()
+
+ entity_2 = FooModel(key_name='bar', name=u'___')
+ entity_2.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ entity_2 = FooModel.get_by_key_name('bar')
+ self.assertEqual(entity_1.slug, None)
+ self.assertEqual(entity_2.slug, None)
+
+ def test_slug_property3(self):
+ entity_1 = FooModel(key_name='foo', name=u'---', name2=u'---')
+ entity_1.put()
+
+ entity_2 = FooModel(key_name='bar', name=u'___', name2=u'___')
+ entity_2.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ entity_2 = FooModel.get_by_key_name('bar')
+ self.assertEqual(entity_1.slug2, 'some-default-value')
+ self.assertEqual(entity_2.slug2, 'some-default-value')
+
+ def test_slug_property4(self):
+ entity_1 = FooModel(key_name='foo', name=u'---', name2=u'Some really
very big and maybe enormous string')
+ entity_1.put()
+
+ entity_2 = FooModel(key_name='bar', name=u'___',
name2=u'abcdefghijklmnopqrstuwxyz')
+ entity_2.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ entity_2 = FooModel.get_by_key_name('bar')
+ self.assertEqual(entity_1.slug2, 'some-really-very-big')
+ self.assertEqual(entity_2.slug2, 'abcdefghijklmnopqrst')
+
+ def test_etag_property(self):
+ entity_1 = FooModel(key_name='foo', name=u'Mary Björk')
+ entity_1.put()
+
+ entity_2 = FooModel(key_name='bar', name=u'Tião Macalé')
+ entity_2.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ entity_2 = FooModel.get_by_key_name('bar')
+ self.assertEqual(entity_1.etag,
hashlib.sha1(entity_1.name.encode('utf8')).hexdigest())
+ self.assertEqual(entity_2.etag,
hashlib.sha1(entity_2.name.encode('utf8')).hexdigest())
+
+ def test_etag_property2(self):
+ entity_1 = FooModel(key_name='foo', name=u'Mary Björk')
+ entity_1.put()
+
+ entity_2 = FooModel(key_name='bar', name=u'Tião Macalé')
+ entity_2.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ entity_2 = FooModel.get_by_key_name('bar')
+ self.assertEqual(entity_1.etag2, None)
+ self.assertEqual(entity_2.etag2, None)
+
+ def test_json_property(self):
+ entity_1 = JsonModel(key_name='foo', data={'foo': 'bar'})
+ entity_1.put()
+
+ entity_1 = JsonModel.get_by_key_name('foo')
+ self.assertEqual(entity_1.data, {'foo': 'bar'})
+
+ def test_json_property2(self):
+ self.assertRaises(db.BadValueError, JsonModel, key_name='foo',
data='foo')
+
+ def test_timezone_property(self):
+ zone = 'America/Chicago'
+ entity_1 = TimezoneModel(key_name='foo', data=zone)
+ entity_1.put()
+
+ entity_1 = TimezoneModel.get_by_key_name('foo')
+ self.assertEqual(entity_1.data, ext_db.pytz.timezone(zone))
+
+ def test_timezone_property2(self):
+ self.assertRaises(db.BadValueError, TimezoneModel, key_name='foo',
data=[])
+
+ def test_timezone_property3(self):
+ self.assertRaises(ext_db.pytz.UnknownTimeZoneError, TimezoneModel,
key_name='foo', data='foo')
+
+ def test_key_property(self):
+ key = db.Key.from_path('Bar', 'bar-key')
+ entity_1 = FooModel(name='foo', key_name='foo', somekey=key)
+ entity_1.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ self.assertEqual(entity_1.somekey, key)
+
+ def test_key_property2(self):
+ key = db.Key.from_path('Bar', 'bar-key')
+ entity_1 = FooModel(name='foo', key_name='foo', somekey=str(key))
+ entity_1.put()
+
+ entity_1 = FooModel.get_by_key_name('foo')
+ self.assertEqual(entity_1.somekey, key)
+
+ def test_key_property3(self):
+ key = db.Key.from_path('Bar', 'bar-key')
+ entity_1 = FooModel(name='foo', key_name='foo', somekey=str(key))
+ entity_1.put()
+
+ entity_2 = FooModel(name='bar', key_name='bar', somekey=entity_1)
+ entity_2.put()
+
+ entity_2 = FooModel.get_by_key_name('bar')
+ self.assertEqual(entity_2.somekey, entity_1.key())
+
+ def test_key_property4(self):
+ key = db.Key.from_path('Bar', 'bar-key')
+ entity_1 = FooModel(name='foo', somekey=str(key))
+ self.assertRaises(db.BadValueError, FooModel, name='bar',
key_name='bar', somekey=entity_1)
+
+ def test_key_property5(self):
+ self.assertRaises(TypeError, FooModel, name='foo', key_name='foo',
somekey=['foo'])
+
+ def test_key_property6(self):
+ self.assertRaises(datastore_errors.BadKeyError, FooModel, name='foo',
key_name='foo', somekey='foo')
+
+
#===========================================================================
+ # @db.retry_on_timeout
+
#===========================================================================
+ def test_retry_on_timeout_1(self):
+ counter = [0]
+ test_timeout_1(counter=counter)
+ self.assertEqual(counter[0], 3)
+
+ def test_retry_on_timeout_2(self):
+ counter = [0]
+ self.assertRaises(ValueError, test_timeout_2, counter=counter)
+ self.assertEqual(counter[0], 5)
+
+ def test_retry_on_timeout_3(self):
+ counter = [0]
+ self.assertRaises(db.Timeout, test_timeout_3, counter=counter)
+ self.assertEqual(counter[0], 3)
+
+
#===========================================================================
+ # @db.load_entity
+
#===========================================================================
+ def test_load_entity_with_key(self):
+ @ext_db.load_entity(FooModel, 'foo_key', 'foo', 'key')
+ def get(*args, **kwargs):
+ return kwargs['foo']
+
+ foo = FooModel(name='foo')
+ foo.put()
+
+ loaded_foo = get(foo_key=str(foo.key()))
+ self.assertEqual(str(loaded_foo.key()), str(foo.key()))
+ self.assertEqual(get(foo_key=None), None)
+
+ def test_load_entity_with_key_2(self):
+ @ext_db.load_entity(FooModel, 'foo_key', 'foo', 'key')
+ def get(*args, **kwargs):
+ return kwargs['foo']
+
+ self.assertRaises(NotFound, get,
foo_key=str(db.Key.from_path('FooModel', 'bar')))
+
+ def test_load_entity_with_id(self):
+ @ext_db.load_entity(FooModel, 'foo_id', 'foo', 'id')
+ def get(*args, **kwargs):
+ return kwargs['foo']
+
+ foo = FooModel(name='foo')
+ foo.put()
+
+ loaded_foo = get(foo_id=foo.key().id())
+ self.assertEqual(str(loaded_foo.key()), str(foo.key()))
+
+ def test_load_entity_with_id_2(self):
+ @ext_db.load_entity(FooModel, 'foo_id', 'foo', 'id')
+ def get(*args, **kwargs):
+ return kwargs['foo']
+
+ self.assertRaises(NotFound, get, foo_id=-1)
+
+ def test_load_entity_with_key_name(self):
+ @ext_db.load_entity(FooModel, 'foo_key_name', 'foo', 'key_name')
+ def get(*args, **kwargs):
+ return kwargs['foo']
+
+ foo = FooModel(key_name='foo', name='foo')
+ foo.put()
+
+ loaded_foo = get(foo_key_name='foo')
+ self.assertEqual(str(loaded_foo.key()), str(foo.key()))
+
+ def test_load_entity_with_key_name_2(self):
+ @ext_db.load_entity(FooModel, 'foo_key_name', 'foo', 'key_name')
+ def get(*args, **kwargs):
+ return kwargs['foo']
+
+ self.assertRaises(NotFound, get, foo_key_name='bar')
+
+ def test_load_entity_with_key_with_guessed_fetch_mode(self):
+ @ext_db.load_entity(FooModel, 'foo_key')
+ def get(*args, **kwargs):
+ return kwargs['foo']
+
+ foo = FooModel(name='foo')
+ foo.put()
+
+ loaded_foo = get(foo_key=str(foo.key()))
+ self.assertEqual(str(loaded_foo.key()), str(foo.key()))
+ self.assertEqual(get(foo_key=None), None)
+
+ def test_load_entity_with_key_with_impossible_fetch_mode(self):
+ def test():
+ @ext_db.load_entity(FooModel, 'foo_bar')
+ def get(*args, **kwargs):
+ return kwargs['foo']
+
+ self.assertRaises(NotImplementedError, test)
+
+
#===========================================================================
+ # db.run_in_namespace
+
#===========================================================================
+ def test_run_in_namespace(self):
+ class MyModel(db.Model):
+ name = db.StringProperty()
+
+ def create_entity(name):
+ entity = MyModel(key_name=name, name=name)
+ entity.put()
+
+ def get_entity(name):
+ return MyModel.get_by_key_name(name)
+
+ entity = ext_db.run_in_namespace('ns1', get_entity, 'foo')
+ self.assertEqual(entity, None)
+
+ ext_db.run_in_namespace('ns1', create_entity, 'foo')
+
+ entity = ext_db.run_in_namespace('ns1', get_entity, 'foo')
+ self.assertNotEqual(entity, None)
+
+ entity = ext_db.run_in_namespace('ns2', get_entity, 'foo')
+ self.assertEqual(entity, None)
+
+
#===========================================================================
+ # db.to_key
+
#===========================================================================
+ def test_to_key(self):
+ class MyModel(db.Model):
+ pass
+
+ # None.
+ self.assertEqual(ext_db.to_key(None), None)
+ # Model without key.
+ self.assertEqual(ext_db.to_key(MyModel()), None)
+ # Model with key.
+ self.assertEqual(ext_db.to_key(MyModel(key_name='foo')),
db.Key.from_path('MyModel', 'foo'))
+ # Key.
+ self.assertEqual(ext_db.to_key(db.Key.from_path('MyModel', 'foo')),
db.Key.from_path('MyModel', 'foo'))
+ # Key as string.
+ self.assertEqual(ext_db.to_key(str(db.Key.from_path('MyModel', 'foo'))),
db.Key.from_path('MyModel', 'foo'))
+ # All mixed.
+ keys = [None, MyModel(), MyModel(key_name='foo'),
db.Key.from_path('MyModel', 'foo'), str(db.Key.from_path('MyModel', 'foo'))]
+ result = [None, None, db.Key.from_path('MyModel', 'foo'),
db.Key.from_path('MyModel', 'foo'), db.Key.from_path('MyModel', 'foo')]
+ self.assertEqual(ext_db.to_key(keys), result)
+
+ self.assertRaises(datastore_errors.BadArgumentError, ext_db.to_key, {})
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/gae_mail_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfy.appengine.mail
+"""
+import os
+import sys
+import unittest
+
+from tipfy import Rule, Tipfy
+from
tipfy.app import local
+
+from google.appengine.api.xmpp import Message as ApiMessage
+
+import test_utils
+
+MESSAGE = """Subject: Hello there!
+From: Me <
m...@myself.com>
+To: You <
y...@yourself.com>
+Content-Type: text/plain; charset=ISO-8859-1
+
+Test message!"""
+
+
+def get_app():
+ return Tipfy(rules=[
+ Rule('/', name='xmpp-test',
handler='resources.mail_handlers.MailHandler'),
+ Rule('/test2', name='xmpp-test',
handler='resources.mail_handlers.MailHandler2'),
+ ], debug=True)
+
+
+class TestInboundMailHandler(test_utils.BaseTestCase):
+ def test_mail(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ response = client.open(method='POST', path='/', data=MESSAGE,
content_type='text/plain')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, 'Test message!')
+
+ def test_not_implemented(self):
+ app = get_app()
+ app.config['tipfy']['enable_debugger'] = False
+ client = app.get_test_client()
+
+ self.assertRaises(NotImplementedError, client.open, method='POST',
path='/test2', data=MESSAGE, content_type='text/plain')
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/gae_sharded_counter_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfy.appengine.sharded_counter
+"""
+from datetime import datetime, timedelta
+import unittest
+
+from google.appengine.api import memcache
+from google.appengine.ext import db
+
+from tipfy import Request, RequestHandler, Tipfy
+from
tipfy.app import local
+from tipfy.appengine.sharded_counter import Counter
+
+import test_utils
+
+
+class TestCounter(test_utils.BaseTestCase):
+ def setUp(self):
+ app = Tipfy()
+ local.current_handler = RequestHandler(app, Request.from_values())
+ test_utils.BaseTestCase.setUp(self)
+
+ def test_counter(self):
+ # Build a new counter that uses the unique key name 'hits'.
+ hits = Counter('hits')
+
+ self.assertEqual(hits.count, 0)
+
+ # Increment by 1.
+ hits.increment()
+ # Increment by 10.
+ hits.increment(10)
+ # Decrement by 3.
+ hits.increment(-3)
+ # This is the current count.
+ self.assertEqual(hits.count, 8)
+
+ # Forces fetching a non-cached count of all shards.
+ self.assertEqual(hits.get_count(nocache=True), 8)
+
+ # Set the counter to an arbitrary value.
+ hits.count = 6
+
+ self.assertEqual(hits.get_count(nocache=True), 6)
+
+ def test_cache(self):
+ # Build a new counter that uses the unique key name 'hits'.
+ hits = Counter('hits')
+
+ self.assertEqual(hits.count, 0)
+
+ # Increment by 1.
+ hits.increment()
+ # Increment by 10.
+ hits.increment(10)
+ # Decrement by 3.
+ hits.increment(-3)
+ # This is the current count.
+ self.assertEqual(hits.count, 8)
+
+ # Forces fetching a non-cached count of all shards.
+ self.assertEqual(hits.get_count(nocache=True), 8)
+
+ # Set the counter to an arbitrary value.
+ hits.delete()
+
+ self.assertEqual(hits.get_count(), 8)
+ self.assertEqual(hits.get_count(nocache=True), 0)
+
+ hits.memcached.delete_count()
+
+ self.assertEqual(hits.get_count(), 0)
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/gae_taskqueue_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfyext.appenginetaskqueue
+"""
+import time
+import unittest
+
+from google.appengine.ext import deferred
+
+from google.appengine.api import taskqueue
+from google.appengine.ext import db
+
+from tipfy import Rule, Tipfy
+from
tipfy.app import local
+
+import test_utils
+
+
+def get_rules():
+ # Fake get_rules() for testing.
+ return [
+ Rule('/_tasks/process-mymodel/', name='tasks/process-mymodel',
+ handler='%s.TaskTestModelEntityTaskHandler' % __name__),
+ Rule('/_tasks/process-mymodel/<string:key>',
name='tasks/process-mymodel',
+ handler='%s.TaskTestModelEntityTaskHandler' % __name__),
+ ]
+
+
+def get_url_rules():
+ # Fake get_rules() for testing.
+ rules = [
+ Rule('/_ah/queue/deferred', name='tasks/deferred',
handler='tipfy.appengine.taskqueue.DeferredHandler'),
+ ]
+
+ return Map(rules)
+
+
+def get_app():
+ return Tipfy({
+ 'tipfy': {
+ 'dev': True,
+ },
+ }, rules=get_url_rules())
+
+
+class TaskTestModel(db.Model):
+ number = db.IntegerProperty()
+
+
+def save_entities(numbers):
+ entities = []
+ for number in numbers:
+ entities.append(TaskTestModel(key_name=str(number), number=number))
+
+ res = db.put(entities)
+
+ import sys
+ sys.exit(res)
+
+
+class TestDeferredHandler(test_utils.BaseTestCase):
+ """TODO"""
+
+ def test_simple_deferred(self):
+ numbers = [1234, 1577, 988]
+ keys = [db.Key.from_path('TaskTestModel', str(number)) for number in
numbers]
+ entities = db.get(keys)
+ self.assertEqual(entities, [None, None, None])
+
+ deferred.defer(save_entities, numbers)
+
+
+class TestTasks(test_utils.BaseTestCase):
+ """TODO"""
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/gae_xmpp_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,159 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfy.appengine.xmpp
+"""
+import os
+import sys
+import unittest
+
+from tipfy import Rule, Tipfy
+from
tipfy.app import local
+
+from google.appengine.api.xmpp import Message as ApiMessage
+
+import test_utils
+
+fake_local = {}
+
+
+def get_app():
+ return Tipfy(rules=[
+ Rule('/', name='xmpp-test',
handler='resources.xmpp_handlers.XmppHandler'),
+ Rule('/test2', name='xmpp-test',
handler='resources.xmpp_handlers.XmppHandler2'),
+ ], debug=True)
+
+
+def send_message(jids, body, from_jid=None, message_type='chat',
+ raw_xml=False):
+ fake_local['message'] = {
+ 'body': body,
+ }
+
+
+class InvalidMessageError(Exception):
+ pass
+
+
+class Message(ApiMessage):
+ """Encapsulates an XMPP message received by the application."""
+ def __init__(self, vars):
+ """Constructs a new XMPP Message from an HTTP request.
+
+ Args:
+ vars: A dict-like object to extract message arguments from.
+ """
+ try:
+ self.__sender = vars["from"]
+ self.__to = vars["to"]
+ self.__body = vars["body"]
+ except KeyError, e:
+ raise InvalidMessageError(e[0])
+
+ self.__command = None
+ self.__arg = None
+
+ def reply(self, body, message_type='chat', raw_xml=False,
+ send_message=send_message):
+ """Convenience function to reply to a message.
+
+ Args:
+ body: str: The body of the message
+ message_type, raw_xml: As per send_message.
+ send_message: Used for testing.
+
+ Returns:
+ A status code as per send_message.
+
+ Raises:
+ See send_message.
+ """
+ return send_message([self.sender], body, from_jid=
self.to,
+ message_type=message_type, raw_xml=raw_xml)
+
+
+class TestCommandHandler(test_utils.BaseTestCase):
+ def setUp(self):
+ from tipfy.appengine import xmpp
+ self.xmpp_module = xmpp.xmpp
+ xmpp.xmpp = sys.modules[__name__]
+ test_utils.BaseTestCase.setUp(self)
+
+ def tearDown(self):
+ fake_local.clear()
+
+ from tipfy.appengine import xmpp
+ xmpp.xmpp = self.xmpp_module
+ test_utils.BaseTestCase.tearDown(self)
+
+ def test_no_command(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ data = {}
+ client.open(method='POST', data=data)
+
+ self.assertEqual(fake_local.get('message', None), None)
+
+ def test_not_implemented(self):
+ app = get_app()
+ app.config['tipfy']['enable_debugger'] = False
+ client = app.get_test_client()
+
+ data = {
+ 'from': '
m...@myself.com',
+ 'to': '
y...@yourself.com',
+ 'body': '/inexistent_command foo bar',
+ }
+ self.assertRaises(NotImplementedError,
client.post, path='/test2',
data=data)
+
+ def test_unknown_command(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ data = {
+ 'from': '
m...@myself.com',
+ 'to': '
y...@yourself.com',
+ 'body': '/inexistent_command foo bar',
+ }
+ client.open(method='POST', data=data)
+
+ self.assertEqual(fake_local.get('message', None), {'body': 'Unknown
command'})
+
+ def test_command(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ data = {
+ 'from': '
m...@myself.com',
+ 'to': '
y...@yourself.com',
+ 'body': '/foo foo bar',
+ }
+ client.open(method='POST', data=data)
+
+ self.assertEqual(fake_local.get('message', None), {'body': 'Foo
command!'})
+
+ data = {
+ 'from': '
m...@myself.com',
+ 'to': '
y...@yourself.com',
+ 'body': '/bar foo bar',
+ }
+ client.open(method='POST', data=data)
+
+ self.assertEqual(fake_local.get('message', None), {'body': 'Bar
command!'})
+
+ def test_text_message(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ data = {
+ 'from': '
m...@myself.com',
+ 'to': '
y...@yourself.com',
+ 'body': 'Hello, text message!',
+ }
+ client.open(method='POST', data=data)
+
+ self.assertEqual(fake_local.get('message', None), {'body': 'Hello, text
message!'})
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/i18n_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,491 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfy.i18n
+"""
+from __future__ import with_statement
+
+import datetime
+import gettext as gettext_stdlib
+import os
+import unittest
+
+from babel.numbers import NumberFormatError
+
+from pytz.gae import pytz
+
+from tipfy import (Tipfy, RequestHandler, Request, Response, Rule,
+ current_handler)
+from
tipfy.app import local
+import tipfy.i18n as i18n
+from tipfy.sessions import SessionMiddleware
+
+import test_utils
+
+
+class BaseTestCase(test_utils.BaseTestCase):
+ def setUp(self):
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=RequestHandler)
+ ], config={
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ },
+ 'tipfy.i18n': {
+ 'timezone': 'UTC'
+ },
+ })
+ local.current_handler = RequestHandler(app,
Request.from_values('/'))
+ test_utils.BaseTestCase.setUp(self)
+
+
+#==============================================================================
+# I18nMiddleware
+#==============================================================================
+class TestI18nMiddleware(BaseTestCase):
+ def test_middleware_multiple_changes(self):
+ class MyHandler(RequestHandler):
+ middleware = [SessionMiddleware(), i18n.I18nMiddleware()]
+
+ def get(self, **kwargs):
+ locale = self.i18n.locale
+ return Response(locale)
+
+ app = Tipfy(rules=[
+ Rule('/', name='home', handler=MyHandler)
+ ], config={
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ },
+ 'tipfy.i18n': {
+ 'locale_request_lookup': [('args', 'lang'),
('session', '_locale')],
+ }
+ })
+
+ client = app.get_test_client()
+ response = client.get('/')
+ self.assertEqual(response.data, 'en_US')
+
+ response = client.get('/?lang=pt_BR')
+ self.assertEqual(response.data, 'pt_BR')
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'pt_BR')
+
+ response = client.get('/?lang=en_US')
+ self.assertEqual(response.data, 'en_US')
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'en_US')
+
+
+#==============================================================================
+# _(), gettext(), ngettext(), lazy_gettext(), lazy_ngettext()
+#==============================================================================
+class TestGettext(BaseTestCase):
+ def test_translations_not_set(self):
+ # TODO: shouldn't need to release local here
+ local.__release_local__()
+ self.assertRaises(RuntimeError, i18n.gettext, 'foo')
+
+ def test_gettext(self):
+ self.assertEqual(i18n.gettext('foo'), u'foo')
+
+ def test_gettext_(self):
+ self.assertEqual(i18n._('foo'), u'foo')
+
+ def test_gettext_with_variables(self):
+ self.assertEqual(i18n.gettext('foo %(foo)s'), u'foo %(foo)s')
+ self.assertEqual(i18n.gettext('foo %(foo)s') % {'foo': 'bar'},
u'foo bar')
+ self.assertEqual(i18n.gettext('foo %(foo)s', foo='bar'), u'foo
bar')
+
+ def test_ngettext(self):
+ self.assertEqual(i18n.ngettext('One foo', 'Many foos', 1), u'One
foo')
+ self.assertEqual(i18n.ngettext('One foo', 'Many foos', 2), u'Many
foos')
+
+ def test_ngettext_with_variables(self):
+ self.assertEqual(i18n.ngettext('One foo %(foo)s', 'Many
foos %(foo)s', 1), u'One foo %(foo)s')
+ self.assertEqual(i18n.ngettext('One foo %(foo)s', 'Many
foos %(foo)s', 2), u'Many foos %(foo)s')
+ self.assertEqual(i18n.ngettext('One foo %(foo)s', 'Many
foos %(foo)s', 1, foo='bar'), u'One foo bar')
+ self.assertEqual(i18n.ngettext('One foo %(foo)s', 'Many
foos %(foo)s', 2, foo='bar'), u'Many foos bar')
+ self.assertEqual(i18n.ngettext('One foo %(foo)s', 'Many
foos %(foo)s', 1) % {'foo': 'bar'}, u'One foo bar')
+ self.assertEqual(i18n.ngettext('One foo %(foo)s', 'Many
foos %(foo)s', 2) % {'foo': 'bar'}, u'Many foos bar')
+
+ def test_lazy_gettext(self):
+ self.assertEqual(i18n.lazy_gettext('foo'), u'foo')
+
+ def test_lazy_ngettext(self):
+ self.assertEqual(i18n.lazy_ngettext('One foo', 'Many foos', 1),
u'One foo')
+ self.assertEqual(i18n.lazy_ngettext('One foo', 'Many foos', 2),
u'Many foos')
+
+
+#==============================================================================
+# I18nStore.get_store_for_request()
+#==============================================================================
+class TestStoreForRequest(BaseTestCase):
+ def get_app(self):
+ return Tipfy(rules=[
+ Rule('/', name='home', handler=RequestHandler)
+ ], config={
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ },
+ 'tipfy.i18n': {
+ 'timezone': 'UTC'
+ },
+ })
+
+ def test_get_store_for_request(self):
+ app = self.get_app()
+ app.config['tipfy.i18n']['locale'] = 'jp_JP'
+
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(handler.i18n.locale, 'jp_JP')
+
+ def test_get_store_for_request_args(self):
+ app = self.get_app()
+ app.config['tipfy.i18n']['locale_request_lookup'] =
[('args', 'language')]
+
+ with app.get_test_handler('/', query_string={'language': 'es_ES'})
as handler:
+ self.assertEqual(handler.i18n.locale, 'es_ES')
+
+ def test_get_store_for_request_form(self):
+ app = self.get_app()
+ app.config['tipfy.i18n']['locale_request_lookup'] =
[('form', 'language')]
+
+ with app.get_test_handler('/', data={'language': 'es_ES'},
method='POST') as handler:
+ self.assertEqual(handler.i18n.locale, 'es_ES')
+
+ def test_get_store_for_request_cookies(self):
+ app = self.get_app()
+ app.config['tipfy.i18n']['locale_request_lookup'] =
[('cookies', 'language')]
+
+ with app.get_test_handler('/',
headers=[('Cookie', 'language="es_ES"; Path=/')]) as handler:
+ self.assertEqual(handler.i18n.locale, 'es_ES')
+
+ def test_get_store_for_request_args_cookies(self):
+ app = self.get_app()
+ app.config['tipfy.i18n']['locale_request_lookup'] = [
+ ('args', 'foo'),
+ ('cookies', 'language')
+ ]
+
+ with app.get_test_handler('/',
headers=[('Cookie', 'language="es_ES"; Path=/')]) as handler:
+ self.assertEqual(handler.i18n.locale, 'es_ES')
+
+ def test_get_store_for_request_rule_args(self):
+ app = self.get_app()
+ app.config['tipfy.i18n']['locale_request_lookup'] =
[('rule_args', 'locale'),]
+
+ with app.get_test_handler('/') as handler:
+ handler.request.rule_args = {'locale': 'es_ES'}
+ self.assertEqual(handler.i18n.locale, 'es_ES')
+
+
+#==============================================================================
+# Date formatting
+#==============================================================================
+class TestDates(BaseTestCase):
+ def test_format_date(self):
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+
+ self.assertEqual(i18n.format_date(value, format='short'),
u'11/10/09')
+ self.assertEqual(i18n.format_date(value, format='medium'), u'Nov
10, 2009')
+ self.assertEqual(i18n.format_date(value, format='long'),
u'November 10, 2009')
+ self.assertEqual(i18n.format_date(value, format='full'),
u'Tuesday, November 10, 2009')
+
+ def test_format_date_no_format(self):
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+ self.assertEqual(i18n.format_date(value), u'Nov 10, 2009')
+
+ def test_format_date_no_format_but_configured(self):
+ app = Tipfy(config={
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ },
+ 'tipfy.i18n': {
+ 'timezone': 'UTC',
+ 'date_formats': {
+ 'time': 'medium',
+ 'date': 'medium',
+ 'datetime': 'medium',
+ 'time.short': None,
+ 'time.medium': None,
+ 'time.full': None,
+ 'time.long': None,
+ 'date.short': None,
+ 'date.medium': 'full',
+ 'date.full': None,
+ 'date.long': None,
+ 'datetime.short': None,
+ 'datetime.medium': None,
+ 'datetime.full': None,
+ 'datetime.long': None,
+ }
+ }
+ })
+ local.current_handler = RequestHandler(app,
Request.from_values('/'))
+
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+ self.assertEqual(i18n.format_date(value), u'Tuesday, November 10,
2009')
+
+ def test_format_date_pt_BR(self):
+ i18n.set_locale('pt_BR')
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+
+ self.assertEqual(i18n.format_date(value, format='short'),
u'10/11/09')
+ self.assertEqual(i18n.format_date(value, format='medium'),
u'10/11/2009')
+ self.assertEqual(i18n.format_date(value, format='long'), u'10 de
novembro de 2009')
+ self.assertEqual(i18n.format_date(value, format='full'),
u'terça-feira, 10 de novembro de 2009')
+
+ def test_format_datetime(self):
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+
+ self.assertEqual(i18n.format_datetime(value, format='short'),
u'11/10/09 4:36 PM')
+ self.assertEqual(i18n.format_datetime(value, format='medium'),
u'Nov 10, 2009 4:36:05 PM')
+ self.assertEqual(i18n.format_datetime(value, format='long'),
u'November 10, 2009 4:36:05 PM +0000')
+ #self.assertEqual(i18n.format_datetime(value, format='full'),
u'Tuesday, November 10, 2009 4:36:05 PM World (GMT) Time')
+ self.assertEqual(i18n.format_datetime(value, format='full'),
u'Tuesday, November 10, 2009 4:36:05 PM GMT+00:00')
+
+ i18n.set_timezone('America/Chicago')
+ self.assertEqual(i18n.format_datetime(value, format='short'),
u'11/10/09 10:36 AM')
+
+ def test_format_datetime_no_format(self):
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+ self.assertEqual(i18n.format_datetime(value), u'Nov 10, 2009
4:36:05 PM')
+
+ def test_format_datetime_pt_BR(self):
+ i18n.set_locale('pt_BR')
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+
+ self.assertEqual(i18n.format_datetime(value, format='short'),
u'10/11/09 16:36')
+ self.assertEqual(i18n.format_datetime(value, format='medium'),
u'10/11/2009 16:36:05')
+ #self.assertEqual(i18n.format_datetime(value, format='long'), u'10
de novembro de 2009 16:36:05 +0000')
+ self.assertEqual(i18n.format_datetime(value, format='long'), u'10
de novembro de 2009 16h36min05s +0000')
+ #self.assertEqual(i18n.format_datetime(value, format='full'),
u'terça-feira, 10 de novembro de 2009 16h36min05s Horário Mundo (GMT)')
+ self.assertEqual(i18n.format_datetime(value, format='full'),
u'ter\xe7a-feira, 10 de novembro de 2009 16h36min05s GMT+00:00')
+
+ def test_format_time(self):
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+
+ self.assertEqual(i18n.format_time(value, format='short'), u'4:36
PM')
+ self.assertEqual(i18n.format_time(value, format='medium'),
u'4:36:05 PM')
+ self.assertEqual(i18n.format_time(value, format='long'), u'4:36:05
PM +0000')
+ #self.assertEqual(i18n.format_time(value, format='full'),
u'4:36:05 PM World (GMT) Time')
+ self.assertEqual(i18n.format_time(value, format='full'), u'4:36:05
PM GMT+00:00')
+
+ def test_format_time_no_format(self):
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+ self.assertEqual(i18n.format_time(value), u'4:36:05 PM')
+
+ def test_format_time_pt_BR(self):
+ i18n.set_locale('pt_BR')
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+
+ self.assertEqual(i18n.format_time(value, format='short'), u'16:36')
+ self.assertEqual(i18n.format_time(value, format='medium'),
u'16:36:05')
+ #self.assertEqual(i18n.format_time(value, format='long'),
u'16:36:05 +0000')
+ self.assertEqual(i18n.format_time(value, format='long'),
u'16h36min05s +0000')
+ #self.assertEqual(i18n.format_time(value, format='full'),
u'16h36min05s Horário Mundo (GMT)')
+ self.assertEqual(i18n.format_time(value, format='full'),
u'16h36min05s GMT+00:00')
+
+ i18n.set_timezone('America/Chicago')
+ self.assertEqual(i18n.format_time(value, format='short'), u'10:36')
+
+ def test_parse_date(self):
+ i18n.set_locale('en_US')
+ self.assertEqual(i18n.parse_date('4/1/04'), datetime.date(2004, 4,
1))
+ i18n.set_locale('de_DE')
+ self.assertEqual(i18n.parse_date('01.04.2004'),
datetime.date(2004, 4, 1))
+
+ def test_parse_datetime(self):
+ i18n.set_locale('en_US')
+ self.assertRaises(NotImplementedError,
i18n.parse_datetime, '4/1/04 16:08:09')
+
+ def test_parse_time(self):
+ i18n.set_locale('en_US')
+ self.assertEqual(i18n.parse_time('18:08:09'), datetime.time(18, 8,
9))
+ i18n.set_locale('de_DE')
+ self.assertEqual(i18n.parse_time('18:08:09'), datetime.time(18, 8,
9))
+
+ def test_format_timedelta(self):
+ # This is only present in Babel dev, so skip if not available.
+ if not getattr(i18n, 'format_timedelta', None):
+ return
+
+ i18n.set_locale('en_US')
+ # ???
+ #
self.assertEqual(i18n.format_timedelta(datetime.timedelta(weeks=12)), u'3
months')
+
self.assertEqual(i18n.format_timedelta(datetime.timedelta(weeks=12)), u'3
mths')
+ i18n.set_locale('es')
+ #
self.assertEqual(i18n.format_timedelta(datetime.timedelta(seconds=1)), u'1
segundo')
+
self.assertEqual(i18n.format_timedelta(datetime.timedelta(seconds=1)), u'1
s')
+ i18n.set_locale('en_US')
+
self.assertEqual(i18n.format_timedelta(datetime.timedelta(hours=3),
granularity='day'), u'1 day')
+
self.assertEqual(i18n.format_timedelta(datetime.timedelta(hours=23),
threshold=0.9), u'1 day')
+ #
self.assertEqual(i18n.format_timedelta(datetime.timedelta(hours=23),
threshold=1.1), u'23 hours')
+
self.assertEqual(i18n.format_timedelta(datetime.timedelta(hours=23),
threshold=1.1), u'23 hrs')
+ self.assertEqual(i18n.format_timedelta(datetime.datetime.now() -
datetime.timedelta(days=5)), u'5 days')
+
+ def test_format_iso(self):
+ value = datetime.datetime(2009, 11, 10, 16, 36, 05)
+
+ self.assertEqual(i18n.format_date(value, format='iso'),
u'2009-11-10')
+ self.assertEqual(i18n.format_time(value, format='iso'),
u'16:36:05')
+ self.assertEqual(i18n.format_datetime(value, format='iso'),
u'2009-11-10T16:36:05+0000')
+
+#==============================================================================
+# Timezones
+#==============================================================================
+class TestTimezones(BaseTestCase):
+ def test_set_timezone(self):
+ current_handler.i18n.set_timezone('UTC')
+ self.assertEqual(current_handler.i18n.tzinfo.zone, 'UTC')
+
+ current_handler.i18n.set_timezone('America/Chicago')
+
self.assertEqual(current_handler.i18n.tzinfo.zone, 'America/Chicago')
+
+ current_handler.i18n.set_timezone('America/Sao_Paulo')
+
self.assertEqual(current_handler.i18n.tzinfo.zone, 'America/Sao_Paulo')
+
+ def test_to_local_timezone(self):
+ current_handler.i18n.set_timezone('US/Eastern')
+
+ format = '%Y-%m-%d %H:%M:%S %Z%z'
+
+ # Test datetime with timezone set
+ base = datetime.datetime(2002, 10, 27, 6, 0, 0, tzinfo=pytz.UTC)
+ localtime = i18n.to_local_timezone(base)
+ result = localtime.strftime(format)
+ self.assertEqual(result, '2002-10-27 01:00:00 EST-0500')
+
+ # Test naive datetime - no timezone set
+ base = datetime.datetime(2002, 10, 27, 6, 0, 0)
+ localtime = i18n.to_local_timezone(base)
+ result = localtime.strftime(format)
+ self.assertEqual(result, '2002-10-27 01:00:00 EST-0500')
+
+ def test_to_utc(self):
+ current_handler.i18n.set_timezone('US/Eastern')
+
+ format = '%Y-%m-%d %H:%M:%S'
+
+ # Test datetime with timezone set
+ base = datetime.datetime(2002, 10, 27, 6, 0, 0, tzinfo=pytz.UTC)
+ localtime = i18n.to_utc(base)
+ result = localtime.strftime(format)
+
+ self.assertEqual(result, '2002-10-27 06:00:00')
+
+ # Test naive datetime - no timezone set
+ base = datetime.datetime(2002, 10, 27, 6, 0, 0)
+ localtime = i18n.to_utc(base)
+ result = localtime.strftime(format)
+ self.assertEqual(result, '2002-10-27 11:00:00')
+
+ def test_get_timezone_location(self):
+ i18n.set_locale('de_DE')
+
self.assertEqual(i18n.get_timezone_location(pytz.timezone('America/St_Johns')),
u'Kanada (St. John\'s)')
+ i18n.set_locale('de_DE')
+
self.assertEqual(i18n.get_timezone_location(pytz.timezone('America/Mexico_City')),
u'Mexiko (Mexiko-Stadt)')
+ i18n.set_locale('de_DE')
+
self.assertEqual(i18n.get_timezone_location(pytz.timezone('Europe/Berlin')),
u'Deutschland')
+
+
+#==============================================================================
+# Number formatting
+#==============================================================================
+class TestNumberFormatting(BaseTestCase):
+ def test_format_number(self):
+ i18n.set_locale('en_US')
+ self.assertEqual(i18n.format_number(1099), u'1,099')
+
+ def test_format_decimal(self):
+ i18n.set_locale('en_US')
+ self.assertEqual(i18n.format_decimal(1.2345), u'1.234')
+ self.assertEqual(i18n.format_decimal(1.2346), u'1.235')
+ self.assertEqual(i18n.format_decimal(-1.2346), u'-1.235')
+ self.assertEqual(i18n.format_decimal(12345.5), u'12,345.5')
+
+ i18n.set_locale('sv_SE')
+ self.assertEqual(i18n.format_decimal(1.2345), u'1,234')
+
+ i18n.set_locale('de')
+ self.assertEqual(i18n.format_decimal(12345), u'12.345')
+
+ def test_format_currency(self):
+ i18n.set_locale('en_US')
+ self.assertEqual(i18n.format_currency(1099.98, 'USD'),
u'$1,099.98')
+ self.assertEqual(i18n.format_currency(1099.98, 'EUR', u'\xa4\xa4
#,##0.00'), u'EUR 1,099.98')
+
+ i18n.set_locale('es_CO')
+ self.assertEqual(i18n.format_currency(1099.98, 'USD'),
u'US$\xa01.099,98')
+
+ i18n.set_locale('de_DE')
+ self.assertEqual(i18n.format_currency(1099.98, 'EUR'),
u'1.099,98\xa0\u20ac')
+
+ def test_format_percent(self):
+ i18n.set_locale('en_US')
+ self.assertEqual(i18n.format_percent(0.34), u'34%')
+ self.assertEqual(i18n.format_percent(25.1234), u'2,512%')
+ self.assertEqual(i18n.format_percent(25.1234, u'#,##0\u2030'),
u'25,123\u2030')
+
+ i18n.set_locale('sv_SE')
+ self.assertEqual(i18n.format_percent(25.1234), u'2\xa0512\xa0%')
+
+ def test_format_scientific(self):
+ i18n.set_locale('en_US')
+ self.assertEqual(i18n.format_scientific(10000), u'1E4')
+ self.assertEqual(i18n.format_scientific(1234567, u'##0E00'),
u'1.23E06')
+
+ def test_parse_number(self):
+ i18n.set_locale('en_US')
+ self.assertEqual(i18n.parse_number('1,099'), 1099L)
+
+ i18n.set_locale('de_DE')
+ self.assertEqual(i18n.parse_number('1.099'), 1099L)
+
+ def test_parse_number2(self):
+ i18n.set_locale('de')
+ self.assertRaises(NumberFormatError, i18n.parse_number, '1.099,98')
+
+ def test_parse_decimal(self):
+ i18n.set_locale('en_US')
+ self.assertEqual(i18n.parse_decimal('1,099.98'), 1099.98)
+
+ i18n.set_locale('de')
+ self.assertEqual(i18n.parse_decimal('1.099,98'), 1099.98)
+
+ def test_parse_decimal_error(self):
+ i18n.set_locale('de')
+ self.assertRaises(NumberFormatError,
i18n.parse_decimal, '2,109,998')
+
+
+#==============================================================================
+# Miscelaneous
+#==============================================================================
+class TestMiscelaneous(BaseTestCase):
+ def test_list_translations(self):
+ cwd = os.getcwd()
+
os.chdir(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'resources'))
+
+ translations = i18n.list_translations()
+
+ self.assertEqual(len(translations), 2)
+ self.assertEqual(translations[0].language, 'en')
+ self.assertEqual(translations[0].territory, 'US')
+ self.assertEqual(translations[1].language, 'pt')
+ self.assertEqual(translations[1].territory, 'BR')
+
+ os.chdir(cwd)
+
+ def test_list_translations_no_locale_dir(self):
+ cwd = os.getcwd()
+
os.chdir(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'resources', 'locale'))
+
+ self.assertEqual(i18n.list_translations(), [])
+
+ os.chdir(cwd)
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/manage_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,196 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfy.scripts.manage
+"""
+from __future__ import with_statement
+
+import ConfigParser
+import os
+import textwrap
+import StringIO
+import sys
+import unittest
+import StringIO
+
+import test_utils
+'''
+from tipfy.manage.config import Config
+
+
+class TestConfig(test_utils.BaseTestCase):
+ def get_fp(self, config):
+ return StringIO.StringIO(textwrap.dedent(config))
+
+ def test_get(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ foo = bar
+
+ [section_1]
+ baz = ding
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ self.assertEqual(config.get('section_1', 'foo'), 'bar')
+ self.assertEqual(config.get('section_1', 'baz'), 'ding')
+
+ # Invalid key.
+ self.assertEqual(config.get('section_1', 'invalid'), None)
+
+ def test_getboolean(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ true_1 = 1
+ true_2 = yes
+ false_1 = 0
+ false_2 = no
+
+ [section_1]
+ true_3 = on
+ true_4 = true
+ false_3 = off
+ false_4 = false
+ invalid = bar
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ self.assertEqual(config.getboolean('section_1', 'true_1'), True)
+ self.assertEqual(config.getboolean('section_1', 'true_2'), True)
+ self.assertEqual(config.getboolean('section_1', 'true_3'), True)
+ self.assertEqual(config.getboolean('section_1', 'true_4'), True)
+ self.assertEqual(config.getboolean('section_1', 'false_1'), False)
+ self.assertEqual(config.getboolean('section_1', 'false_2'), False)
+ self.assertEqual(config.getboolean('section_1', 'false_3'), False)
+ self.assertEqual(config.getboolean('section_1', 'false_4'), False)
+
+ # Invalid boolean.
+ self.assertEqual(config.getboolean('section_1', 'invalid'), None)
+
+ def test_getfloat(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ foo = 0.1
+
+ [section_1]
+ baz = 0.2
+ invalid = bar
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ self.assertEqual(config.getfloat('section_1', 'foo'), 0.1)
+ self.assertEqual(config.getfloat('section_1', 'baz'), 0.2)
+
+ # Invalid float.
+ self.assertEqual(config.getboolean('section_1', 'invalid'), None)
+
+ def test_getint(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ foo = 999
+
+ [section_1]
+ baz = 1999
+ invalid = bar
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ self.assertEqual(config.getint('section_1', 'foo'), 999)
+ self.assertEqual(config.getint('section_1', 'baz'), 1999)
+
+ # Invalid int.
+ self.assertEqual(config.getboolean('section_1', 'invalid'), None)
+
+ def test_getlist(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ animals =
+ rhino
+ rhino
+ hamster
+ hamster
+ goat
+ goat
+
+ [section_1]
+ fruits =
+ orange
+ watermellow
+ grape
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ # Non-unique values.
+ self.assertEqual(config.getlist('section_1', 'animals'), [
+ 'rhino',
+ 'rhino',
+ 'hamster',
+ 'hamster',
+ 'goat',
+ 'goat',
+ ])
+ self.assertEqual(config.getlist('section_1', 'fruits'), [
+ 'orange',
+ 'watermellow',
+ 'grape',
+ ])
+
+ # Unique values.
+ self.assertEqual(config.getlist('section_1', 'animals', unique=True), [
+ 'rhino',
+ 'hamster',
+ 'goat',
+ ])
+
+ def test_interpolation(self):
+ fp = self.get_fp("""\
+ [DEFAULT]
+ path = /path/to/%(path_name)s
+
+ [section_1]
+ path_name = foo
+ path_1 = /special%(path)s
+ path_2 = /special/%(path_name)s
+
+ [section_2]
+ path_name = bar
+
+ [section_3]
+ path_1 = /path/to/%(section_1|path_name)s
+ path_2 = /path/to/%(section_2|path_name)s
+ path_3 = /%(section_1|path_name)s/%(section_2|path_name)s/%(section_1|
path_name)s/%(section_2|path_name)s
+ path_error_1 = /path/to/%(section_3|path_error_1)s
+ path_error_2 = /path/to/%(section_3|path_error_3)s
+ path_error_3 = /path/to/%(section_3|path_error_2)s
+ path_not_really = /path/to/%(foo
+ """)
+ config = Config()
+ config.readfp(fp)
+
+ self.assertEqual(config.get('section_1', 'path'), '/path/to/foo')
+
self.assertEqual(config.get('section_1', 'path_1'), '/special/path/to/foo')
+ self.assertEqual(config.get('section_1', 'path_2'), '/special/foo')
+ self.assertEqual(config.get('section_2', 'path'), '/path/to/bar')
+
+ self.assertEqual(config.get('section_3', 'path_1'), '/path/to/foo')
+ self.assertEqual(config.get('section_3', 'path_2'), '/path/to/bar')
+ self.assertEqual(config.get('section_3', 'path_3'), '/foo/bar/foo/bar')
+
+ # Failed interpolation (recursive)
+ self.assertRaises(ConfigParser.InterpolationError, config.get,
+ 'section_3', 'path_error_1')
+ self.assertRaises(ConfigParser.InterpolationError, config.get,
+ 'section_3', 'path_error_2')
+ self.assertRaises(ConfigParser.InterpolationError, config.get,
+ 'section_3', 'path_error_3')
+
+
self.assertEqual(config.get('section_3', 'path_not_really'), '/path/to/%(foo')
+'''
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/routing_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,169 @@
+from __future__ import with_statement
+
+from tipfy import Tipfy, RequestHandler, Response
+from tipfy.routing import HandlerPrefix, NamePrefix, Router, Rule
+from tipfy.utils import url_for
+
+import test_utils
+
+
+class TestRouter(test_utils.BaseTestCase):
+ def test_add(self):
+ app = Tipfy()
+ router = Router(app)
+ self.assertEqual(len(list(router.map.iter_rules())), 0)
+
+ router.add(Rule('/', name='home', handler='HomeHandler'))
+ self.assertEqual(len(list(router.map.iter_rules())), 1)
+
+ router.add([
+ Rule('/about', name='about', handler='AboutHandler'),
+ Rule('/contact', name='contact', handler='ContactHandler'),
+ ])
+ self.assertEqual(len(list(router.map.iter_rules())), 3)
+
+
+class TestRouting(test_utils.BaseTestCase):
+
#==========================================================================
+ # HandlerPrefix
+
#==========================================================================
+ def test_handler_prefix(self):
+ rules = [
+ HandlerPrefix('resources.handlers.', [
+ Rule('/', name='home', handler='HomeHandler'),
+ Rule('/defaults', name='defaults', handler='HandlerWithRuleDefaults',
defaults={'foo': 'bar'}),
+ ])
+ ]
+
+ app = Tipfy(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'Hello, World!')
+
+ response = client.get('/defaults')
+ self.assertEqual(response.data, 'bar')
+
+
#==========================================================================
+ # NamePrefix
+
#==========================================================================
+ def test_name_prefix(self):
+ class DummyHandler(RequestHandler):
+ def get(self, **kwargs):
+ return ''
+
+ rules = [
+ NamePrefix('company-', [
+ Rule('/', name='home', handler=DummyHandler),
+ Rule('/about', name='about', handler=DummyHandler),
+ Rule('/contact', name='contact', handler=DummyHandler),
+ ]),
+ ]
+
+ app = Tipfy(rules)
+
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(handler.url_for('company-home'), '/')
+ self.assertEqual(handler.url_for('company-about'), '/about')
+ self.assertEqual(handler.url_for('company-contact'), '/contact')
+
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(
handler.request.rule.name, 'company-home')
+
+ with app.get_test_handler('/about') as handler:
+ self.assertEqual(
handler.request.rule.name, 'company-about')
+
+ with app.get_test_handler('/contact') as handler:
+ self.assertEqual(
handler.request.rule.name, 'company-contact')
+
+
#==========================================================================
+ # RegexConverter
+
#==========================================================================
+ def test_regex_converter(self):
+ class TestHandler(RequestHandler):
+ def get(self, **kwargs):
+ return Response(kwargs.get('path'))
+
+ app = Tipfy([
+ Rule('/<regex(".*"):path>', name='home', handler=TestHandler),
+ ])
+ client = app.get_test_client()
+
+ response = client.get('/foo')
+ self.assertEqual(response.data, 'foo')
+
+ response = client.get('/foo/bar')
+ self.assertEqual(response.data, 'foo/bar')
+
+ response = client.get('/foo/bar/baz')
+ self.assertEqual(response.data, 'foo/bar/baz')
+
+ def test_url_for(self):
+ class DummyHandler(RequestHandler):
+ def get(self, **kwargs):
+ return ''
+
+ rules = [
+ NamePrefix('company-', [
+ Rule('/', name='home', handler=DummyHandler),
+ Rule('/about', name='about', handler=DummyHandler),
+ Rule('/contact', name='contact', handler=DummyHandler),
+ ]),
+ ]
+
+ app = Tipfy(rules)
+
+ with app.get_test_handler('/') as handler:
+ self.assertEqual(url_for('company-home'), '/')
+ self.assertEqual(url_for('company-about'), '/about')
+ self.assertEqual(url_for('company-contact'), '/contact')
+
+
+class TestAlternativeRouting(test_utils.BaseTestCase):
+ def test_handler(self):
+ rules = [
+ HandlerPrefix('resources.alternative_routing.', [
+ Rule('/', name='home', handler='HomeHandler'),
+ Rule('/foo', name='home/foo', handler='HomeHandler:foo'),
+ Rule('/bar', name='home/bar', handler='HomeHandler:bar'),
+ Rule('/other/foo', name='other/foo', handler='OtherHandler:foo'),
+ Rule('/other/bar', name='other/bar', handler='OtherHandler:bar'),
+ ])
+ ]
+
+ app = Tipfy(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'home-get')
+ response = client.get('/foo')
+ self.assertEqual(response.data, 'home-foo')
+ response = client.get('/bar')
+ self.assertEqual(response.data, 'home-bar')
+ response = client.get('/other/foo')
+ self.assertEqual(response.data, 'other-foo')
+ response = client.get('/other/bar')
+ self.assertEqual(response.data, 'other-bar')
+
+ def test_function_handler(self):
+ rules = [
+ HandlerPrefix('resources.alternative_routing.', [
+ Rule('/', name='home', handler='home'),
+ Rule('/foo', name='home/foo', handler='foo'),
+ Rule('/bar', name='home/bar', handler='bar'),
+ ])
+ ]
+
+ app = Tipfy(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'home')
+ response = client.get('/foo')
+ self.assertEqual(response.data, 'foo')
+ response = client.get('/bar')
+ self.assertEqual(response.data, 'bar')
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/secure_cookie_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,55 @@
+import time
+import unittest
+
+from tipfy import Tipfy, Request, Response
+from tipfy.sessions import SessionStore, SecureCookieStore,
SecureCookieSession
+
+import test_utils
+
+
+class TestSecureCookie(test_utils.BaseTestCase):
+ def _get_app(self):
+ return Tipfy(config={
+ 'tipfy.sessions': {
+ 'secret_key': 'something very secret',
+ }
+ })
+
+ def test_get_cookie_no_cookie(self):
+ store = SecureCookieStore('secret')
+ request = Request.from_values('/')
+ self.assertEqual(store.get_cookie(request, 'session'), None)
+
+ def test_get_cookie_invalid_parts(self):
+ store = SecureCookieStore('secret')
+ request = Request.from_values('/',
headers=[('Cookie', 'session="invalid"; Path=/')])
+ self.assertEqual(store.get_cookie(request, 'session'), None)
+
+ def test_get_cookie_invalid_signature(self):
+ store = SecureCookieStore('secret')
+ request = Request.from_values('/', headers=[('Cookie', 'session="foo|bar|
baz"; Path=/')])
+ self.assertEqual(store.get_cookie(request, 'session'), None)
+
+ def test_get_cookie_expired(self):
+ store = SecureCookieStore('secret')
+ request = Request.from_values('/',
headers=[('Cookie', 'session="eyJmb28iOiJiYXIifQ==|1284849476|
847b472f2fabbf1efef55748a394b6f182acd8be"; Path=/')])
+ self.assertEqual(store.get_cookie(request, 'session', max_age=-86400),
None)
+
+ def test_get_cookie_badly_encoded(self):
+ store = SecureCookieStore('secret')
+ timestamp = str(int(time.time()))
+ value = 'foo'
+ signature = store._get_signature('session', value, timestamp)
+ cookie_value = '|'.join([value, timestamp, signature])
+
+ request = Request.from_values('/', headers=[('Cookie', 'session="%s";
Path=/' % cookie_value)])
+ self.assertEqual(store.get_cookie(request, 'session'), None)
+
+ def test_get_cookie_valid(self):
+ store = SecureCookieStore('secret')
+ request = Request.from_values('/',
headers=[('Cookie', 'session="eyJmb28iOiJiYXIifQ==|1284849476|
847b472f2fabbf1efef55748a394b6f182acd8be"; Path=/')])
+ self.assertEqual(store.get_cookie(request, 'session'), {'foo': 'bar'})
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/sessions_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,449 @@
+import os
+import unittest
+
+from werkzeug import cached_property
+
+from tipfy import Tipfy, Request, RequestHandler, Response, Rule
+from
tipfy.app import local
+from tipfy.sessions import (SecureCookieSession, SecureCookieStore,
+ SessionMiddleware, SessionStore)
+from tipfy.utils import json_b64decode
+from tipfy.appengine.sessions import (DatastoreSession, MemcacheSession,
+ SessionModel)
+
+import test_utils
+
+
+class BaseHandler(RequestHandler):
+ middleware = [SessionMiddleware()]
+
+
+class TestSessionStoreBase(test_utils.BaseTestCase):
+ def _get_app(self):
+ return Tipfy(config={
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ }
+ })
+
+ def test_secure_cookie_store(self):
+ local.current_handler = handler = RequestHandler(self._get_app(),
Request.from_values())
+ store = SessionStore(handler)
+
+ self.assertEqual(isinstance(store.secure_cookie_store,
SecureCookieStore), True)
+
+ def test_secure_cookie_store_no_secret_key(self):
+ local.current_handler = handler = RequestHandler(Tipfy(),
Request.from_values())
+ store = SessionStore(handler)
+
+ self.assertRaises(KeyError, getattr, store, 'secure_cookie_store')
+
+ def test_get_cookie_args(self):
+ local.current_handler = handler = RequestHandler(self._get_app(),
Request.from_values())
+ store = SessionStore(handler)
+
+ self.assertEqual(store.get_cookie_args(), {
+ 'max_age': None,
+ 'domain': None,
+ 'path': '/',
+ 'secure': None,
+ 'httponly': False,
+ })
+
+ self.assertEqual(store.get_cookie_args(max_age=86400,
domain='.
foo.com'), {
+ 'max_age': 86400,
+ 'domain': '.
foo.com',
+ 'path': '/',
+ 'secure': None,
+ 'httponly': False,
+ })
+
+ def test_get_save_session(self):
+ local.current_handler = handler = RequestHandler(self._get_app(),
Request.from_values())
+ store = SessionStore(handler)
+
+ session = store.get_session()
+ self.assertEqual(isinstance(session, SecureCookieSession), True)
+ self.assertEqual(session, {})
+
+ session['foo'] = 'bar'
+
+ response = Response()
+ store.save(response)
+
+ request = Request.from_values('/',
headers={'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))})
+ local.current_handler = handler = RequestHandler(self._get_app(),
request)
+ store = SessionStore(handler)
+
+ session = store.get_session()
+ self.assertEqual(isinstance(session, SecureCookieSession), True)
+ self.assertEqual(session, {'foo': 'bar'})
+
+ def test_set_delete_cookie(self):
+ local.current_handler = handler = RequestHandler(self._get_app(),
Request.from_values())
+ store = SessionStore(handler)
+
+ store.set_cookie('foo', 'bar')
+ store.set_cookie('baz', 'ding')
+
+ response = Response()
+ store.save(response)
+
+ headers = {'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))}
+ request = Request.from_values('/', headers=headers)
+
+ self.assertEqual(request.cookies.get('foo'), 'bar')
+ self.assertEqual(request.cookies.get('baz'), 'ding')
+
+ store.delete_cookie('foo')
+ store.save(response)
+
+ headers = {'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))}
+ request = Request.from_values('/', headers=headers)
+
+ self.assertEqual(request.cookies.get('foo', None), '')
+ self.assertEqual(request.cookies['baz'], 'ding')
+
+ def test_set_cookie_encoded(self):
+ local.current_handler = handler = RequestHandler(self._get_app(),
Request.from_values())
+ store = SessionStore(handler)
+
+ store.set_cookie('foo', 'bar', format='json')
+ store.set_cookie('baz', 'ding', format='json')
+
+ response = Response()
+ store.save(response)
+
+ headers = {'Cookie': '\n'.join(response.headers.getlist('Set-Cookie'))}
+ request = Request.from_values('/', headers=headers)
+
+ self.assertEqual(json_b64decode(request.cookies.get('foo')), 'bar')
+ self.assertEqual(json_b64decode(request.cookies.get('baz')), 'ding')
+
+
+class TestSessionStore(test_utils.BaseTestCase):
+ def setUp(self):
+ SessionStore.default_backends.update({
+ 'datastore': DatastoreSession,
+ 'memcache': MemcacheSession,
+ 'securecookie': SecureCookieSession,
+ })
+ test_utils.BaseTestCase.setUp(self)
+
+ def _get_app(self, *args, **kwargs):
+ app = Tipfy(config={
+ 'tipfy.sessions': {
+ 'secret_key': 'secret',
+ },
+ })
+ local.current_handler = handler = RequestHandler(app,
Request.from_values(*args, **kwargs))
+ return app
+
+ def test_set_session(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ res = self.session.get('key')
+ if not res:
+ res = 'undefined'
+ session = SecureCookieSession()
+ session['key'] = 'a session value'
+
self.session_store.set_session(self.session_store.config['cookie_name'],
session)
+
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a session value')
+
+ def test_set_session_datastore(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ session = self.session_store.get_session(backend='datastore')
+ res = session.get('key')
+ if not res:
+ res = 'undefined'
+ session = DatastoreSession(None, 'a_random_session_id')
+ session['key'] = 'a session value'
+
self.session_store.set_session(self.session_store.config['cookie_name'],
session, backend='datastore')
+
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a session value')
+
+ def test_get_memcache_session(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ session = self.session_store.get_session(backend='memcache')
+ res = session.get('test')
+ if not res:
+ res = 'undefined'
+ session['test'] = 'a memcache session value'
+
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a memcache session value')
+
+ def test_get_datastore_session(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ session = self.session_store.get_session(backend='datastore')
+ res = session.get('test')
+ if not res:
+ res = 'undefined'
+ session['test'] = 'a datastore session value'
+
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a datastore session value')
+
+ def test_set_delete_cookie(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ res = self.request.cookies.get('test')
+ if not res:
+ res = 'undefined'
+ self.session_store.set_cookie('test', 'a cookie value')
+ else:
+ self.session_store.delete_cookie('test')
+
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a cookie value')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a cookie value')
+
+ def test_set_unset_cookie(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ res = self.request.cookies.get('test')
+ if not res:
+ res = 'undefined'
+ self.session_store.set_cookie('test', 'a cookie value')
+
+ self.session_store.unset_cookie('test')
+ return Response(res)
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'undefined')
+
+ def test_set_get_secure_cookie(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ response = Response()
+
+ cookie = self.session_store.get_secure_cookie('test') or {}
+ res = cookie.get('test')
+ if not res:
+ res = 'undefined'
+ self.session_store.set_secure_cookie(response, 'test', {'test': 'a
secure cookie value'})
+
+ response.data = res
+ return response
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a secure cookie value')
+
+ def test_set_get_flashes(self):
+ class MyHandler(BaseHandler):
+ def get(self):
+ res = [msg for msg, level in self.session.get_flashes()]
+ if not res:
+ res = [{'body': 'undefined'}]
+ self.session.flash({'body': 'a flash value'})
+
+ return Response(res[0]['body'])
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'undefined')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a flash value')
+
+ def test_set_get_messages(self):
+ class MyHandler(BaseHandler):
+ @cached_property
+ def messages(self):
+ """A list of status messages to be displayed to the user."""
+ messages = []
+ flashes = self.session.get_flashes(key='_messages')
+ for msg, level in flashes:
+ msg['level'] = level
+ messages.append(msg)
+
+ return messages
+
+ def set_message(self, level, body, title=None, life=None, flash=False):
+ """Adds a status message.
+
+ :param level:
+ Message level. Common values are "success", "error", "info" or
+ "alert".
+ :param body:
+ Message contents.
+ :param title:
+ Optional message title.
+ :param life:
+ Message life time in seconds. User interface can implement
+ a mechanism to make the message disappear after the elapsed time.
+ If not set, the message is permanent.
+ :returns:
+ None.
+ """
+ message = {'title': title, 'body': body, 'life': life}
+ if flash is True:
+ self.session.flash(message, level, '_messages')
+ else:
+ self.messages.append(message)
+
+ def get(self):
+ self.set_message('success', 'a normal message value')
+ self.set_message('success', 'a flash message value', flash=True)
+ return Response('|'.join(msg['body'] for msg in self.messages))
+
+ rules = [Rule('/', name='test', handler=MyHandler)]
+
+ app = self._get_app('/')
+ app.router.add(rules)
+ client = app.get_test_client()
+
+ response = client.get('/')
+ self.assertEqual(response.data, 'a normal message value')
+
+ response = client.get('/', headers={
+ 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
+ })
+ self.assertEqual(response.data, 'a flash message value|a normal message
value')
+
+
+class TestSessionModel(test_utils.BaseTestCase):
+ def setUp(self):
+
self.app = Tipfy()
+ test_utils.BaseTestCase.setUp(self)
+
+ def test_get_by_sid_without_cache(self):
+ sid = 'test'
+ entity = SessionModel.create(sid, {'foo': 'bar', 'baz': 'ding'})
+ entity.put()
+
+ cached_data = SessionModel.get_cache(sid)
+ self.assertNotEqual(cached_data, None)
+
+ entity.delete_cache()
+ cached_data = SessionModel.get_cache(sid)
+ self.assertEqual(cached_data, None)
+
+ entity = SessionModel.get_by_sid(sid)
+ self.assertNotEqual(entity, None)
+
+ # Now will fetch cache.
+ entity = SessionModel.get_by_sid(sid)
+ self.assertNotEqual(entity, None)
+
+ self.assertEqual('foo' in entity.data, True)
+ self.assertEqual('baz' in entity.data, True)
+ self.assertEqual(entity.data['foo'], 'bar')
+ self.assertEqual(entity.data['baz'], 'ding')
+
+ entity.delete()
+ entity = SessionModel.get_by_sid(sid)
+ self.assertEqual(entity, None)
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/template_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,31 @@
+import os
+import unittest
+
+from tipfy import template
+
+import test_utils
+
+TEMPLATES_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ 'resources', 'templates'))
+TEMPLATES_ZIP_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ 'resources', 'templates.zip'))
+
+
+class TestTemplate(test_utils.BaseTestCase):
+ def test_generate(self):
+ t = template.Template('<html>{{ myvalue }}</html>')
+ self.assertEqual(t.generate(myvalue='XXX'), '<html>XXX</html>')
+
+ def test_loader(self):
+ loader = template.Loader(TEMPLATES_DIR)
+ t = loader.load('template_tornado1.html')
+
self.assertEqual(t.generate(students=['calvin', 'hobbes', 'moe']), '\n\ncalvin\n\n\n\nhobbes\n\n\n\nmoe\n\n\n')
+
+ def test_loader2(self):
+ loader = template.ZipLoader(TEMPLATES_ZIP_DIR, 'templates')
+ t = loader.load('template1.html')
+ self.assertEqual(t.generate(message='Hello, World!'), 'Hello, World!\n')
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /dev/null
+++ /tests/utils_test.py Sat Apr 2 13:52:07 2011
@@ -0,0 +1,192 @@
+# -*- coding: utf-8 -*-
+"""
+ Tests for tipfy utils
+"""
+import unittest
+
+import werkzeug
+
+from tipfy import RequestHandler, Request, Response, Rule, Tipfy
+from
tipfy.app import local
+
+from tipfy.utils import (xhtml_escape, xhtml_unescape, json_encode,
+ json_decode, render_json_response, url_escape, url_unescape, utf8,
+ _unicode)
+
+import test_utils
+
+
+class HomeHandler(RequestHandler):
+ def get(self, **kwargs):
+ return 'Hello, World!'
+
+
+class ProfileHandler(RequestHandler):
+ def get(self, **kwargs):
+ return 'Username: %s' % kwargs.get('username')
+
+
+class RedirectToHandler(RequestHandler):
+ def get(self, **kwargs):
+ username = kwargs.get('username', None)
+ if username:
+ return redirect_to('profile', username=username)
+ else:
+ return redirect_to('home')
+
+
+class RedirectTo301Handler(RequestHandler):
+ def get(self, **kwargs):
+ username = kwargs.get('username', None)
+ if username:
+ return redirect_to('profile', username=username, _code=301)
+ else:
+ return redirect_to('home', _code=301)
+
+
+class RedirectToInvalidCodeHandler(RequestHandler):
+ def get(self, **kwargs):
+ return redirect_to('home', _code=405)
+
+
+def get_app():
+ return Tipfy(rules=[
+ Rule('/', name='home', handler=HomeHandler),
+ Rule('/people/<string:username>', name='profile',
handler=ProfileHandler),
+ Rule('/redirect_to/', name='redirect_to', handler=RedirectToHandler),
+ Rule('/redirect_to/<string:username>', name='redirect_to',
handler=RedirectToHandler),
+ Rule('/redirect_to_301/', name='redirect_to',
handler=RedirectTo301Handler),
+ Rule('/redirect_to_301/<string:username>', name='redirect_to',
handler=RedirectTo301Handler),
+ Rule('/redirect_to_invalid', name='redirect_to_invalid',
handler=RedirectToInvalidCodeHandler),
+ ])
+
+
+class TestRedirect(test_utils.BaseTestCase):
+ '''
+
#===========================================================================
+ # redirect()
+
#===========================================================================
+ def test_redirect(self):
+ response = redirect('
http://www.google.com/')
+
+ self.assertEqual(response.headers['location'], '
http://www.google.com/')
+ self.assertEqual(response.status_code, 302)
+
+ def test_redirect_301(self):
+ response = redirect('
http://www.google.com/', 301)
+
+ self.assertEqual(response.headers['location'], '
http://www.google.com/')
+ self.assertEqual(response.status_code, 301)
+
+ def test_redirect_no_response(self):
+ response = redirect('
http://www.google.com/')
+
+ self.assertEqual(isinstance(response, werkzeug.BaseResponse), True)
+ self.assertEqual(response.headers['location'], '
http://www.google.com/')
+ self.assertEqual(response.status_code, 302)
+
+ def test_redirect_no_response_301(self):
+ response = redirect('
http://www.google.com/', 301)
+
+ self.assertEqual(isinstance(response, werkzeug.BaseResponse), True)
+ self.assertEqual(response.headers['location'], '
http://www.google.com/')
+ self.assertEqual(response.status_code, 301)
+
+ def test_redirect_invalid_code(self):
+ self.assertRaises(AssertionError, redirect, '
http://www.google.com/',
404)
+
+
#===========================================================================
+ # redirect_to()
+
#===========================================================================
+ def test_redirect_to(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ response = client.get('/redirect_to/', base_url='
http://foo.com')
+ self.assertEqual(response.headers['location'], '
http://foo.com/')
+ self.assertEqual(response.status_code, 302)
+
+
+ def test_redirect_to2(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ response = client.get('/redirect_to/calvin', base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/calvin')
+ self.assertEqual(response.status_code, 302)
+
+ response = client.get('/redirect_to/hobbes', base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/hobbes')
+ self.assertEqual(response.status_code, 302)
+
+ response = client.get('/redirect_to/moe', base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/moe')
+ self.assertEqual(response.status_code, 302)
+
+ def test_redirect_to_301(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ response = client.get('/redirect_to_301/calvin',
base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/calvin')
+ self.assertEqual(response.status_code, 301)
+
+ response = client.get('/redirect_to_301/hobbes',
base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/hobbes')
+ self.assertEqual(response.status_code, 301)
+
+ response = client.get('/redirect_to_301/moe', base_url='
http://foo.com')
+
self.assertEqual(response.headers['location'], '
http://foo.com/people/moe')
+ self.assertEqual(response.status_code, 301)
+
+ def test_redirect_to_invalid_code(self):
+ app = get_app()
+ client = app.get_test_client()
+
+ response = client.get('/redirect_to_invalid', base_url='
http://foo.com')
+ self.assertEqual(response.status_code, 500)
+ '''
+
+class TestRenderJson(test_utils.BaseTestCase):
+
#===========================================================================
+ # render_json_response()
+
#===========================================================================
+ def test_render_json_response(self):
+ local.current_handler = HomeHandler(Tipfy(), Request.from_values())
+ response = render_json_response({'foo': 'bar'})
+
+ self.assertEqual(isinstance(response, Response), True)
+ self.assertEqual(response.mimetype, 'application/json')
+ self.assertEqual(response.data, '{"foo":"bar"}')
+
+
+class TestUtils(test_utils.BaseTestCase):
+ def test_xhtml_escape(self):
+ self.assertEqual(xhtml_escape('"foo"'), '"foo"')
+
+ def test_xhtml_unescape(self):
+ self.assertEqual(xhtml_unescape('"foo"'), '"foo"')
+
+ def test_json_encode(self):
+
self.assertEqual(json_encode('<script>alert("hello")</script>'), '"<script>alert(\\"hello\\")<\\/script>"')
+
+ def test_json_decode(self):
+
self.assertEqual(json_decode('"<script>alert(\\"hello\\")<\\/script>"'), '<script>alert("hello")</script>')
+
+ def test_url_escape(self):
+ self.assertEqual(url_escape('somewords&some more
words'), 'somewords%26some+more+words')
+
+ def test_url_unescape(self):
+
self.assertEqual(url_unescape('somewords%26some+more+words'), 'somewords&some
more words')
+
+ def test_utf8(self):
+ self.assertEqual(isinstance(utf8(u'ááá'), str), True)
+ self.assertEqual(isinstance(utf8('ááá'), str), True)
+
+ def test_unicode(self):
+ self.assertEqual(isinstance(_unicode(u'ááá'), unicode), True)
+ self.assertEqual(isinstance(_unicode('ááá'), unicode), True)
+
+
+if __name__ == '__main__':
+ test_utils.main()
=======================================
--- /tests/project/README.txt Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,62 +0,0 @@
-Tipfy Installation
-==================
-Tipfy is a small but powerful framework made specifically for Google App
-Engine. Here are some quick instructions to get started with it. More
details
-are available in tipfy's wiki:
-
-
http://www.tipfy.org/wiki/guide/installation/
-
-Our goal gere is to provide a smooth installation process so that you can
see
-a tipfy application up and running in a few minutes. If you have any
problems,
-please post a message to the discussion group:
-
-
http://groups.google.com/group/tipfy
-
-
-All-in-one installation
------------------------
-If you downloaded the all-in-one pack, all you need to do is to start the
-development server pointing to the /app dir inside the uncompressed
archive:
-
-- Run the dev_appserver tool from the App Engine SDK (or the App Engine
- Launcher) pointing to the /app directory inside the uncompressed archive:
-
- dev_appserver.py /path/to/project/app
-
-- Open a browser and test the URLs:
-
-
http://localhost:8080/
-
http://localhost:8080/pretty
-
-You should see a Hello, World! message. If you do, that's all. Now you have
-a project environment to start developing your app.
-
-
-Do-it-yourself installation
----------------------------
-If you downloaded the do-it-yourself pack, you need to first install the
-needed libraries before running de development server. Here's how:
-
-- Access the project directory and call the bootstrap script using your
- Python 2.5 interpreter. We pass the command --distribute because it
- is preferable to the default setuptools. This will prepare buildout to
run:
-
- python bootstrap.py --distribute
-
-- Build the project calling bin/buildout. This will download and setup
- tipfy and all libraries inside the /app directory. It may take a while.
-
- bin/buildout
-
-- Start the development server calling bin/dev_appserver. It will use the
- application from /app by default:
-
- bin/dev_appserver
-
-- Open a browser and test the URLs:
-
-
http://localhost:8080/
-
http://localhost:8080/pretty
-
-You should see a Hello, World! message. If you do, that's all. Now you have
-a project environment to start developing your app.
=======================================
--- /tests/project/app/app.yaml Sat Nov 27 13:14:25 2010
+++ /dev/null
@@ -1,24 +0,0 @@
-application: my-app
-version: 1
-runtime: python
-api_version: 1
-
-builtins:
-- appstats: on
-- datastore_admin: on
-- remote_api: on
-
-handlers:
-- url: /(robots\.txt|favicon\.ico)
- static_files: static/\1
- upload: static/(.*)
-
-- url: /static
- static_dir: static
-
-- url: /_ah/queue/deferred
- script: main.py
- login: admin
-
-- url: /.*
- script: main.py
=======================================
--- /tests/project/app/config.py Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,3 +0,0 @@
-# -*- coding: utf-8 -*-
-"""App configuration."""
-config = {}
=======================================
--- /tests/project/app/hello_world/__init__.py Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,1 +0,0 @@
-
=======================================
--- /tests/project/app/hello_world/handlers.py Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,27 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- hello_world.handlers
- ~~~~~~~~~~~~~~~~~~~~
-
- Hello, World!: the simplest tipfy app.
-
- :copyright: 2009 by
tipfy.org.
- :license: BSD, see LICENSE for more details.
-"""
-from tipfy import RequestHandler, Response
-from tipfyext.jinja2 import Jinja2Mixin
-
-
-class HelloWorldHandler(RequestHandler):
- def get(self):
- """Simply returns a Response object with an enigmatic
salutation."""
- return Response('Hello, World!')
-
-
-class PrettyHelloWorldHandler(RequestHandler, Jinja2Mixin):
- def get(self):
- """Simply returns a rendered template with an enigmatic
salutation."""
- context = {
- 'message': 'Hello, World!',
- }
- return self.render_response('hello_world.html', **context)
=======================================
--- /tests/project/app/index.yaml Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,11 +0,0 @@
-indexes:
-
-# AUTOGENERATED
-
-# This index.yaml is automatically updated whenever the dev_appserver
-# detects that a new type of query is run. If you want to manage the
-# index.yaml file manually, remove the above marker line (the line
-# saying "# AUTOGENERATED"). If you want to manage some indexes
-# manually, move them above the marker line. The index.yaml file is
-# automatically uploaded to the admin console when you next deploy
-# your application using appcfg.py.
=======================================
--- /tests/project/app/lib/README.txt Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,10 +0,0 @@
-/lib
-====
-
-Use this directory to place your libraries.
-
-This directory is searched first when looking for a package, and
-"/distlib" is searched in sequence.
-
-This directory is not editted by a build tool. It is a safe place for
custom
-libraries or to override packages from the "/distlib" directory.
=======================================
--- /tests/project/app/locale/README.txt Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,5 +0,0 @@
-/locale
-=======
-
-This directory is used to store translations for the app. If you are not
-going to use internationalization in your app, you can ignore or remove it.
=======================================
--- /tests/project/app/main.py Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,50 +0,0 @@
-# -*- coding: utf-8 -*-
-"""WSGI app setup."""
-import os
-import sys
-
-if 'lib' not in sys.path:
- # Add lib as primary libraries directory, with fallback to lib/dist
- # and optionally to lib/dist.zip, loaded using zipimport.
- sys.path[0:0] = ['lib', 'lib/dist', 'lib/dist.zip']
-
-from tipfy import Tipfy
-from config import config
-from urls import rules
-
-
-def enable_appstats(app):
- """Enables appstats middleware."""
- if debug:
- return
-
- from google.appengine.ext.appstats.recording import
appstats_wsgi_middleware
- app.wsgi_app = appstats_wsgi_middleware(app.wsgi_app)
-
-
-def enable_jinja2_debugging():
- """Enables blacklisted modules that help Jinja2 debugging."""
- if not debug:
- return
-
- # This enables better debugging info for errors in Jinja2 templates.
- from google.appengine.tools.dev_appserver import HardenedModulesHook
- HardenedModulesHook._WHITE_LIST_C_MODULES += ['_ctypes', 'gestalt']
-
-
-# Is this the development server?
-debug = os.environ.get('SERVER_SOFTWARE', '').startswith('Dev')
-
-# Instantiate the application.
-app = Tipfy(rules=rules, config=config, debug=debug)
-enable_appstats(app)
-enable_jinja2_debugging()
-
-
-def main():
- # Run the app.
- app.run()
-
-
-if __name__ == '__main__':
- main()
=======================================
--- /tests/project/app/static/favicon.ico Thu Nov 25 23:54:11 2010
+++ /dev/null
Binary file, no diff available.
=======================================
--- /tests/project/app/static/robots.txt Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,1 +0,0 @@
-
=======================================
--- /tests/project/app/urls.py Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,8 +0,0 @@
-# -*- coding: utf-8 -*-
-"""URL definitions."""
-from tipfy import Rule
-
-rules = [
- Rule('/', name='hello-world',
handler='hello_world.handlers.HelloWorldHandler'),
- Rule('/pretty', name='hello-world-pretty',
handler='hello_world.handlers.PrettyHelloWorldHandler'),
-]
=======================================
--- /tests/project/babel.cfg Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,9 +0,0 @@
-# Extraction from Python source files
-[python: **.py]
-# Extraction from Jinja2 HTML templates
-[jinja2: **/**.html]
-encoding = utf-8
-line_statement_prefix = %
-
-[extractors]
-jinja2 = jinja2.ext:babel_extract
=======================================
--- /tests/project/bin/README.txt Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,7 +0,0 @@
-/bin
-====
-
-This directory contains scripts and tools to help development.
-
-For example, the development server (dev_appserver) and the deployment
-tool (appcfg) are stored here, but there are more.
=======================================
--- /tests/project/bootstrap.py Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,260 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2006 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Bootstrap a buildout-based project
-
-Simply run this script in a directory containing a buildout.cfg.
-The script accepts buildout command-line options, so you can
-use the -c option to specify an alternate configuration file.
-"""
-
-import os, shutil, sys, tempfile, textwrap, urllib, urllib2, subprocess
-from optparse import OptionParser
-
-if sys.platform == 'win32':
- def quote(c):
- if ' ' in c:
- return '"%s"' % c # work around spawn lamosity on windows
- else:
- return c
-else:
- quote = str
-
-# See zc.buildout.easy_install._has_broken_dash_S for motivation and
comments.
-stdout, stderr = subprocess.Popen(
- [sys.executable, '-Sc',
- 'try:\n'
- ' import ConfigParser\n'
- 'except ImportError:\n'
- ' print 1\n'
- 'else:\n'
- ' print 0\n'],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
-has_broken_dash_S = bool(int(stdout.strip()))
-
-# In order to be more robust in the face of system Pythons, we want to
-# run without site-packages loaded. This is somewhat tricky, in
-# particular because Python 2.6's distutils imports site, so starting
-# with the -S flag is not sufficient. However, we'll start with that:
-if not has_broken_dash_S and 'site' in sys.modules:
- # We will restart with python -S.
- args = sys.argv[:]
- args[0:0] = [sys.executable, '-S']
- args = map(quote, args)
- os.execv(sys.executable, args)
-# Now we are running with -S. We'll get the clean sys.path, import site
-# because distutils will do it later, and then reset the path and clean
-# out any namespace packages from site-packages that might have been
-# loaded by .pth files.
-clean_path = sys.path[:]
-import site
-sys.path[:] = clean_path
-for k, v in sys.modules.items():
- if k in ('setuptools', 'pkg_resources') or (
- hasattr(v, '__path__') and
- len(v.__path__)==1 and
- not os.path.exists(os.path.join(v.__path__[0],'__init__.py'))):
- # This is a namespace package. Remove it.
- sys.modules.pop(k)
-
-is_jython = sys.platform.startswith('java')
-
-setuptools_source = '
http://peak.telecommunity.com/dist/ez_setup.py'
-distribute_source = '
http://python-distribute.org/distribute_setup.py'
-
-# parsing arguments
-def normalize_to_url(option, opt_str, value, parser):
- if value:
- if '://' not in value: # It doesn't smell like a URL.
- value = 'file://%s' % (
- urllib.pathname2url(
- os.path.abspath(os.path.expanduser(value))),)
- if opt_str == '--download-base' and not value.endswith('/'):
- # Download base needs a trailing slash to make the world happy.
- value += '/'
- else:
- value = None
- name = opt_str[2:].replace('-', '_')
- setattr(parser.values, name, value)
-
-usage = '''\
-[DESIRED PYTHON FOR BUILDOUT] bootstrap.py [options]
-
-Bootstraps a buildout-based project.
-
-Simply run this script in a directory containing a buildout.cfg, using the
-Python that you want bin/buildout to use.
-
-Note that by using --setup-source and --download-base to point to
-local resources, you can keep this script from going over the network.
-'''
-
-parser = OptionParser(usage=usage)
-parser.add_option("-v", "--version", dest="version",
- help="use a specific zc.buildout version")
-parser.add_option("-d", "--distribute",
- action="store_true", dest="use_distribute",
default=True,
- help="Use Distribute rather than Setuptools.")
-parser.add_option("--setup-source", action="callback", dest="setup_source",
- callback=normalize_to_url, nargs=1, type="string",
- help=("Specify a URL or file location for the setup
file. "
- "If you use Setuptools, this will default to " +
- setuptools_source + "; if you use Distribute,
this "
- "will default to " + distribute_source +"."))
-parser.add_option("--download-base", action="callback",
dest="download_base",
- callback=normalize_to_url, nargs=1, type="string",
- help=("Specify a URL or directory for downloading "
- "zc.buildout and either Setuptools or Distribute. "
- "Defaults to PyPI."))
-parser.add_option("--eggs",
- help=("Specify a directory for storing eggs. Defaults
to "
- "a temporary directory that is deleted when the "
- "bootstrap script completes."))
-parser.add_option("-t", "--accept-buildout-test-releases",
- dest='accept_buildout_test_releases',
- action="store_true", default=False,
- help=("Normally, if you do not specify a --version, the "
- "bootstrap script and buildout gets the newest "
- "*final* versions of zc.buildout and its recipes
and "
- "extensions for you. If you use this flag, "
- "bootstrap and buildout will get the newest
releases "
- "even if they are alphas or betas."))
-parser.add_option("-c", None, action="store", dest="config_file",
- help=("Specify the path to the buildout configuration "
- "file to be used."))
-
-options, args = parser.parse_args()
-
-# if -c was provided, we push it back into args for buildout's main
function
-if options.config_file is not None:
- args += ['-c', options.config_file]
-
-if options.eggs:
- eggs_dir = os.path.abspath(os.path.expanduser(options.eggs))
-else:
- eggs_dir = tempfile.mkdtemp()
-
-if options.setup_source is None:
- if options.use_distribute:
- options.setup_source = distribute_source
- else:
- options.setup_source = setuptools_source
-
-if options.accept_buildout_test_releases:
- args.append('buildout:accept-buildout-test-releases=true')
-args.append('bootstrap')
-
-try:
- import pkg_resources
- import setuptools # A flag. Sometimes pkg_resources is installed
alone.
- if not hasattr(pkg_resources, '_distribute'):
- raise ImportError
-except ImportError:
- ez_code = urllib2.urlopen(
- options.setup_source).read().replace('\r\n', '\n')
- ez = {}
- exec ez_code in ez
- setup_args = dict(to_dir=eggs_dir, download_delay=0)
- if options.download_base:
- setup_args['download_base'] = options.download_base
- if options.use_distribute:
- setup_args['no_fake'] = True
- ez['use_setuptools'](**setup_args)
- if 'pkg_resources' in sys.modules:
- reload(sys.modules['pkg_resources'])
- import pkg_resources
- # This does not (always?) update the default working set. We will
- # do it.
- for path in sys.path:
- if path not in pkg_resources.working_set.entries:
- pkg_resources.working_set.add_entry(path)
-
-cmd = [quote(sys.executable),
- '-c',
- quote('from setuptools.command.easy_install import main; main()'),
- '-mqNxd',
- quote(eggs_dir)]
-
-if not has_broken_dash_S:
- cmd.insert(1, '-S')
-
-find_links = options.download_base
-if not find_links:
- find_links = os.environ.get('bootstrap-testing-find-links')
-if find_links:
- cmd.extend(['-f', quote(find_links)])
-
-if options.use_distribute:
- setup_requirement = 'distribute'
-else:
- setup_requirement = 'setuptools'
-ws = pkg_resources.working_set
-setup_requirement_path = ws.find(
- pkg_resources.Requirement.parse(setup_requirement)).location
-env = dict(
- os.environ,
- PYTHONPATH=setup_requirement_path)
-
-requirement = 'zc.buildout'
-version = options.version
-if version is None and not options.accept_buildout_test_releases:
- # Figure out the most recent final version of zc.buildout.
- import setuptools.package_index
- _final_parts = '*final-', '*final'
- def _final_version(parsed_version):
- for part in parsed_version:
- if (part[:1] == '*') and (part not in _final_parts):
- return False
- return True
- index = setuptools.package_index.PackageIndex(
- search_path=[setup_requirement_path])
- if find_links:
- index.add_find_links((find_links,))
- req = pkg_resources.Requirement.parse(requirement)
- if index.obtain(req) is not None:
- best = []
- bestv = None
- for dist in index[req.project_name]:
- distv = dist.parsed_version
- if _final_version(distv):
- if bestv is None or distv > bestv:
- best = [dist]
- bestv = distv
- elif distv == bestv:
- best.append(dist)
- if best:
- best.sort()
- version = best[-1].version
-if version:
- requirement = '=='.join((requirement, version))
-cmd.append(requirement)
-
-if is_jython:
- import subprocess
- exitcode = subprocess.Popen(cmd, env=env).wait()
-else: # Windows prefers this, apparently; otherwise we would prefer
subprocess
- exitcode = os.spawnle(*([os.P_WAIT, sys.executable] + cmd + [env]))
-if exitcode != 0:
- sys.stdout.flush()
- sys.stderr.flush()
- print ("An error occurred when trying to install zc.buildout. "
- "Look above this message for any errors that "
- "were output by easy_install.")
- sys.exit(exitcode)
-
-ws.add_entry(eggs_dir)
-ws.require(requirement)
-import zc.buildout.buildout
-zc.buildout.buildout.main(args)
-if not options.eggs: # clean up temporary egg directory
- shutil.rmtree(eggs_dir)
=======================================
--- /tests/project/buildout.cfg Sun Feb 27 11:05:22 2011
+++ /dev/null
@@ -1,81 +0,0 @@
-[buildout]
-parts =
- gae_sdk
- gae_tools
- app_lib
-
-develop = ../../
-
-# Generate relative paths for eggs so that the buildout can be moved
around.
-relative-paths = true
-
-# Unzip eggs automatically, if needed.
-unzip = true
-
-# Define versions for installed packages.
-extends = versions.cfg
-versions = versions
-
-# Enable this to save all picked versions in the versions.cfg file.
-# extensions = buildout.dumppickedversions
-# dump-picked-versions-file = versions.cfg
-
-# Keep internal stuff in a subdirectory.
-download-cache = var/downloads
-# Buildout bug: it doesn't honor custom egg dir this in
parts/buildout/site.py
-# Until it is fixed we need to use the standard eggs dir.
-# eggs-directory = var/eggs
-develop-eggs-directory = var/develop-eggs
-parts-directory = var/parts
-
-[gae_sdk]
-# Dowloads and extracts the App Engine SDK.
-recipe = appfy.recipe.gae:sdk
-url =
http://googleappengine.googlecode.com/files/google_appengine_1.4.2.zip
-
-[gae_tools]
-# Installs appcfg, bulkload_client, bulkloader, dev_appserver,
remote_api_shell
-# and python executables in the bin directory.
-recipe = appfy.recipe.gae:tools
-# Add these paths to sys.path in the generated scripts.
-extra-paths =
- app
- app/lib
- app/lib/dist
-
-[app_lib]
-# Sets the library dependencies for the app.
-recipe = appfy.recipe.gae:app_lib
-lib-directory = app/lib/dist
-use-zipimport = false
-
-# Define the packages to download. Only tipfy is included, but you can add
-# others or uncomment the extra lines to add those common packages.
-eggs =
- tipfy
- babel
- gaepytz
- jinja2
- mako
- wtforms
-
-# Don't copy files that match these glob patterns.
-ignore-globs =
- *.c
- *.pyc
- *.pyo
- *.so
- */test
- */tests
- */testsuite
- */django
- */sqlalchemy
-
-# Don't install these packages or modules.
-ignore-packages =
- distribute
- setuptools
- easy_install
- site
- ssl
- pkg_resources
=======================================
--- /tests/project/gaetools.cfg Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,10 +0,0 @@
-[dev_appserver]
-# Set default values to start the dev_appserver. All options from the
command
-# line are allowed, one per line. If an option is provided when calling
-# dev_appserver, it will override the default value if it is set. Values
are
-# used as they are; don't use variables here.
-defaults =
- --datastore_path=var/data.store
- --history_path=var/history.store
- --blobstore_path=var/blob.store
- app
=======================================
--- /tests/project/var/README.txt Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,8 +0,0 @@
-/var
-====
-
-This directory contains variable files used by the application.
-
-For example, the datastore and blobstore data are stored here by default.
-
-You generally don't need to care about this directory; it's internal stuff.
=======================================
--- /tests/project/var/downloads/README.txt Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,5 +0,0 @@
-/etc/downloads
-==============
-
-This directory is used to cache downloads of package distributions and the
-App Engine SDK.
=======================================
--- /tests/project/versions.cfg Thu Nov 25 23:54:11 2010
+++ /dev/null
@@ -1,1 +0,0 @@
-[versions]
=======================================
--- /tests/run_tests.py Sun Feb 6 07:35:23 2011
+++ /dev/null
@@ -1,46 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-To run the tests, first install the following packages:
-
- nose
- nosegae==0.1.7
- webtest
- gaetestbed
- coverage
-
-Then run run_tests.py from the repository root.
-"""
-import os
-import sys
-
-import nose
-
-# Explicitly defining to not cover tipfy.template.
-cover_packages = [
- '
tipfy.app',
- 'tipfy.appengine',
- 'tipfy.auth',
- 'tipfy.config',
- 'tipfy.debugger',
- '
tipfy.dev',
- 'tipfy.i18n',
- 'tipfy.middleware',
- 'tipfy.routing',
- 'tipfy.manage',
- 'tipfy.sessions',
- 'tipfy.testing',
- 'tipfy.utils',
- 'tipfyext',
-]
-
-if __name__ == '__main__':
- base = os.path.abspath(os.path.dirname(__file__))
- tipfy = os.path.join(base, '..')
- app = os.path.join(base, 'project', 'app', 'lib', 'dist')
- gae = os.path.join(base, 'project', 'var', 'parts', 'google_appengine')
- sys.path[0:0] = [tipfy, app, gae]
-
- argv = [__file__]
- argv += '-d --with-gae -P --without-sandbox --with-coverage
--cover-erase --gae-application=./project/app'.split()
- argv += ['--cover-package=%s' % ','.join(cover_packages)]
- nose.run(argv=argv)
=======================================
--- /tests/test_app.py Thu Dec 2 15:15:14 2010
+++ /dev/null
@@ -1,510 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Tests for
tipfy.app
-"""
-from __future__ import with_statement
-
-import os
-import sys
-import unittest
-
-from . import BaseTestCase
-
-from tipfy import Request, RequestHandler, Response, Rule, Tipfy
-from tipfy.utils import json_encode
-
-
-class AllMethodsHandler(RequestHandler):
- def get(self, **kwargs):
- return Response('Method: %s' % self.request.method)
-
- delete = head = options = post = put = trace = get
-
-
-class BrokenHandler(RequestHandler):
- def get(self, **kwargs):
- raise ValueError('booo!')
-
-
-class BrokenButFixedHandler(BrokenHandler):
- def handle_exception(self, exception=None):
- # Let's fix it.
- return Response('That was close!', status=200)
-
-
-class Handle404(RequestHandler):
- def handle_exception(self, exception=None):
- return Response('404 custom handler', status=404)
-
-
-class Handle405(RequestHandler):
- def handle_exception(self, exception=None):
- response = Response('405 custom handler', status=405)
- response.headers['Allow'] = 'GET'
- return response
-
-
-class Handle500(RequestHandler):
- def handle_exception(self, exception=None):
- return Response('500 custom handler', status=500)
-
-
-class TestRequestHandler(BaseTestCase):
- def test_200(self):
- app = Tipfy(rules=[Rule('/', name='home',
handler=AllMethodsHandler)])
- client = app.get_test_client()
-
- for method in app.allowed_methods:
- response = client.open('/', method=method)
- self.assertEqual(response.status_code, 200, method)
- if method == 'HEAD':
- self.assertEqual(response.data, '')
- else:
- self.assertEqual(response.data, 'Method: %s' % method)
-
- # App Engine mode.
- self._set_dev_server_flag(True)
- response = client.get('/')
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'Method: GET')
-
- def test_404(self):
- """No URL rules defined."""
- app = Tipfy()
- client = app.get_test_client()
-
- # Normal mode.
- response = client.get('/')
- self.assertEqual(response.status_code, 404)
-
- # Debug mode.
- app.debug = True
- response = client.get('/')
- self.assertEqual(response.status_code, 404)
-
- def test_500(self):
- """Handler import will fail."""
- app = Tipfy(rules=[Rule('/', name='home',
handler='non.existent.handler')])
- client = app.get_test_client()
-
- # Normal mode.
- response = client.get('/')
- self.assertEqual(response.status_code, 500)
-
- # Debug mode.
- app.debug = True
- app.config['tipfy']['enable_debugger'] = False
- self.assertRaises(ImportError, client.get, '/')
-
- def test_501(self):
- """Method is not in app.allowed_methods."""
- app = Tipfy()
- client = app.get_test_client()
-
- # Normal mode.
- response = client.open('/', method='CONNECT')
- self.assertEqual(response.status_code, 501)
-
- # Debug mode.
- app.debug = True
- response = client.open('/', method='CONNECT')
- self.assertEqual(response.status_code, 501)
-
- def test_abort(self):
- class HandlerWithAbort(RequestHandler):
- def get(self, **kwargs):
- self.abort(kwargs.get('status_code'))
-
- app = Tipfy(rules=[
- Rule('/<int:status_code>', name='abort-me',
handler=HandlerWithAbort),
- ])
- client = app.get_test_client()
-
- response = client.get('/400')
- self.assertEqual(response.status_code, 400)
-
- response = client.get('/403')
- self.assertEqual(response.status_code, 403)
-
- response = client.get('/404')
- self.assertEqual(response.status_code, 404)
-
- def test_get_config(self):
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- ], config = {
- 'foo': {
- 'bar': 'baz',
- }
- })
- with app.get_test_handler('/') as handler:
- self.assertEqual(handler.get_config('foo', 'bar'), 'baz')
-
- def test_handle_exception(self):
- app = Tipfy([
- Rule('/', handler=AllMethodsHandler, name='home'),
- Rule('/broken', handler=BrokenHandler, name='broken'),
- Rule('/broken-but-fixed', handler=BrokenButFixedHandler,
name='broken-but-fixed'),
- ], debug=False)
- client = app.get_test_client()
-
- response = client.get('/broken')
- self.assertEqual(response.status_code, 500)
-
- response = client.get('/broken-but-fixed')
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'That was close!')
-
- def test_redirect(self):
- class HandlerWithRedirect(RequestHandler):
- def get(self, **kwargs):
- return self.redirect('/')
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- Rule('/redirect-me', name='redirect',
handler=HandlerWithRedirect),
- ])
-
- client = app.get_test_client()
- response = client.get('/redirect-me', follow_redirects=True)
- self.assertEqual(response.data, 'Method: GET')
-
- def test_redirect_empty(self):
- class HandlerWithRedirect(RequestHandler):
- def get(self, **kwargs):
- return self.redirect('/', empty=True)
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- Rule('/redirect-me', name='redirect',
handler=HandlerWithRedirect),
- ])
-
- client = app.get_test_client()
- response = client.get('/redirect-me', follow_redirects=False)
- self.assertEqual(response.data, '')
-
- def test_redirect_to(self):
- class HandlerWithRedirectTo(RequestHandler):
- def get(self, **kwargs):
- return self.redirect_to('home')
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- Rule('/redirect-me', name='redirect',
handler=HandlerWithRedirectTo),
- ])
-
- client = app.get_test_client()
- response = client.get('/redirect-me', follow_redirects=True)
- self.assertEqual(response.data, 'Method: GET')
-
- def test_redirect_to_empty(self):
- class HandlerWithRedirectTo(RequestHandler):
- def get(self, **kwargs):
- return self.redirect_to('home', _empty=True)
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- Rule('/redirect-me', name='redirect',
handler=HandlerWithRedirectTo),
- ])
-
- client = app.get_test_client()
- response = client.get('/redirect-me', follow_redirects=False)
- self.assertEqual(response.data, '')
-
- def test_redirect_relative_uris(self):
- class MyHandler(RequestHandler):
- def get(self):
- return self.redirect(self.request.args.get('redirect'))
-
- app = Tipfy(rules=[
- Rule('/foo/bar', name='test1', handler=MyHandler),
- Rule('/foo/bar/', name='test2', handler=MyHandler),
- ])
- client = app.get_test_client()
-
- response = client.get('/foo/bar/',
query_string={'redirect': '/baz'})
-
self.assertEqual(response.headers['Location'], '
http://localhost/baz')
-
- response = client.get('/foo/bar/',
query_string={'redirect': './baz'})
-
self.assertEqual(response.headers['Location'], '
http://localhost/foo/bar/baz')
-
- response = client.get('/foo/bar/',
query_string={'redirect': '../baz'})
-
self.assertEqual(response.headers['Location'], '
http://localhost/foo/baz')
-
- response = client.get('/foo/bar',
query_string={'redirect': '/baz'})
-
self.assertEqual(response.headers['Location'], '
http://localhost/baz')
-
- response = client.get('/foo/bar',
query_string={'redirect': './baz'})
-
self.assertEqual(response.headers['Location'], '
http://localhost/foo/baz')
-
- response = client.get('/foo/bar',
query_string={'redirect': '../baz'})
-
self.assertEqual(response.headers['Location'], '
http://localhost/baz')
-
- def test_url_for(self):
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- Rule('/about', name='about', handler='handlers.About'),
- Rule('/contact', name='contact', handler='handlers.Contact'),
- ])
- with app.get_test_handler('/') as handler:
- self.assertEqual(handler.url_for('home'), '/')
- self.assertEqual(handler.url_for('about'), '/about')
- self.assertEqual(handler.url_for('contact'), '/contact')
-
- # Extras
- self.assertEqual(handler.url_for('about',
_anchor='history'), '/about#history')
- self.assertEqual(handler.url_for('about',
_full=True), '
http://localhost/about')
- self.assertEqual(handler.url_for('about',
_netloc='
www.google.com'), '
http://www.google.com/about')
- self.assertEqual(handler.url_for('about',
_scheme='https'), '
https://localhost/about')
-
- def test_store_instances(self):
- from tipfy.appengine.auth import AuthStore
- from tipfy.i18n import I18nStore
- from tipfy.sessions import SecureCookieSession
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- ], config={
- 'tipfy.sessions': {
- 'secret_key': 'secret',
- },
- })
- with app.get_test_handler('/') as handler:
- self.assertEqual(isinstance(handler.session,
SecureCookieSession), True)
- self.assertEqual(isinstance(handler.auth, AuthStore), True)
- self.assertEqual(isinstance(handler.i18n, I18nStore), True)
-
-
-class TestHandlerMiddleware(BaseTestCase):
- def test_before_dispatch(self):
- res = 'Intercepted!'
-
- class MyMiddleware(object):
- def before_dispatch(self, handler):
- return Response(res)
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- return Response('default')
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ])
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.data, res)
-
- def test_after_dispatch(self):
- res = 'Intercepted!'
-
- class MyMiddleware(object):
- def after_dispatch(self, handler, response):
- response.data += res
- return response
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- return Response('default')
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ])
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.data, 'default' + res)
-
- def test_handle_exception(self):
- res = 'Catched!'
-
- class MyMiddleware(object):
- def handle_exception(self, handler, exception):
- return Response(res)
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- raise ValueError()
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ])
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.data, res)
-
- def test_handle_exception_2(self):
- res = 'I fixed it!'
-
- class MyMiddleware(object):
- def handle_exception(self, handler, exception):
- raise ValueError()
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- raise ValueError()
-
- class ErrorHandler(RequestHandler):
- def handle_exception(self, exception):
- return Response(res)
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ], debug=False)
- app.error_handlers[500] = ErrorHandler
-
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.data, res)
-
- def test_handle_exception_3(self):
- class MyMiddleware(object):
- def handle_exception(self, handler, exception):
- pass
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- raise ValueError()
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ])
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.status_code, 500)
-
- def test_handle_exception_with_app_error_handler(self):
- class MyMiddleware(object):
- def handle_exception(self, handler, exception):
- raise ValueError()
-
- class MyHandler(RequestHandler):
- middleware = [MyMiddleware()]
-
- def get(self, **kwargs):
- raise ValueError()
-
- class ErrorHandler(RequestHandler):
- def handle_exception(self, exception):
- raise ValueError()
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=MyHandler),
- ], debug=False)
- app.error_handlers[500] = ErrorHandler
-
- client = app.get_test_client()
- response = client.get('/')
- self.assertEqual(response.status_code, 500)
-
-
-class TestTipfy(BaseTestCase):
- def test_custom_error_handlers(self):
- app = Tipfy([
- Rule('/', handler=AllMethodsHandler, name='home'),
- Rule('/broken', handler=BrokenHandler, name='broken'),
- ], debug=False)
- app.error_handlers = {
- 404: Handle404,
- 405: Handle405,
- 500: Handle500,
- }
- client = app.get_test_client()
-
- res = client.get('/nowhere')
- self.assertEqual(res.status_code, 404)
- self.assertEqual(res.data, '404 custom handler')
-
- res = client.put('/broken')
- self.assertEqual(res.status_code, 405)
- self.assertEqual(res.data, '405 custom handler')
- self.assertEqual(res.headers.get('Allow'), 'GET')
-
- res = client.get('/broken')
- self.assertEqual(res.status_code, 500)
- self.assertEqual(res.data, '500 custom handler')
-
- def test_store_classes(self):
- from tipfy.appengine.auth import AuthStore
- from tipfy.i18n import I18nStore
- from tipfy.sessions import SessionStore
-
- app = Tipfy()
- self.assertEqual(app.auth_store_class, AuthStore)
- self.assertEqual(app.i18n_store_class, I18nStore)
- self.assertEqual(app.session_store_class, SessionStore)
-
- def test_make_response(self):
- app = Tipfy()
- request = Request.from_values()
-
- # Empty.
- response = app.make_response(request)
- self.assertEqual(isinstance(response, app.response_class), True)
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, '')
-
- # From Response.
- response = app.make_response(request, Response('Hello, World!'))
- self.assertEqual(isinstance(response, app.response_class), True)
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'Hello, World!')
-
- # From string.
- response = app.make_response(request, 'Hello, World!')
- self.assertEqual(isinstance(response, app.response_class), True)
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'Hello, World!')
-
- # From tuple.
- response = app.make_response(request, 'Hello, World!', 404)
- self.assertEqual(isinstance(response, app.response_class), True)
- self.assertEqual(response.status_code, 404)
- self.assertEqual(response.data, 'Hello, World!')
-
- # From None.
- self.assertRaises(ValueError, app.make_response, request, None)
-
- def test_dev_run(self):
- self._set_dev_server_flag(True)
-
- os.environ['APPLICATION_ID'] = 'my-app'
- os.environ['SERVER_SOFTWARE'] = 'Development'
- os.environ['SERVER_NAME'] = 'localhost'
- os.environ['SERVER_PORT'] = '8080'
- os.environ['REQUEST_METHOD'] = 'GET'
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=AllMethodsHandler),
- ], debug=True)
-
- app.run()
- self.assertEqual(sys.stdout.getvalue(), 'Status: 200
OK\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length:
11\r\n\r\nMethod: GET')
-
- def test_get_config(self):
- app = Tipfy(config={'tipfy': {'foo': 'bar'}})
- self.assertEqual(app.get_config('tipfy', 'foo'), 'bar')
-
-
-class TestRequest(BaseTestCase):
- def test_json(self):
- class JsonHandler(RequestHandler):
- def get(self, **kwargs):
- return Response(self.request.json['foo'])
-
- app = Tipfy(rules=[
- Rule('/', name='home', handler=JsonHandler),
- ], debug=True)
-
- data = json_encode({'foo': 'bar'})
- client = app.get_test_client()
- response = client.get('/', content_type='application/json',
data=data)
- self.assertEqual(response.data, 'bar')
=======================================
--- /tests/test_auth.py Thu Oct 7 10:52:28 2010
+++ /dev/null
@@ -1,760 +0,0 @@
-import os
-import unittest
-
-from gaetestbed import DataStoreTestCase
-
-from tipfy import Request, RequestHandler, Response, Rule, Tipfy
-from
tipfy.app import local
-
-import tipfy.auth
-from tipfy.auth import (AdminRequiredMiddleware, LoginRequiredMiddleware,
- UserRequiredMiddleware, UserRequiredIfAuthenticatedMiddleware,
- admin_required, login_required, user_required,
- user_required_if_authenticated, check_password_hash,
generate_password_hash,
- create_session_id, MultiAuthStore)
-from tipfy.appengine.auth import AuthStore, MixedAuthStore
-from tipfy.appengine.auth.model import User
-
-
-class LoginHandler(RequestHandler):
- def get(self, **kwargs):
- return Response('login')
-
-
-class LogoutHandler(RequestHandler):
- def get(self, **kwargs):
- return Response('logout')
-
-
-class SignupHandler(RequestHandler):
- def get(self, **kwargs):
- return Response('signup')
-
-
-class HomeHandler(RequestHandler):
- def get(self, **kwargs):
- return Response('home sweet home')
-
-
-def get_app():
- app = Tipfy(rules=[
- Rule('/login', name='auth/login', handler=LoginHandler),
- Rule('/logout', name='auth/logout', handler=LogoutHandler),
- Rule('/signup', name='auth/signup', handler=SignupHandler),
- ])
- return app
-
-
-class TestAuthStore(unittest.TestCase):
- def tearDown(self):
- local.__release_local__()
-
- def test_user_model(self):
- app = get_app()
- app.router.add(Rule('/', name='home', handler=HomeHandler))
-
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = AuthStore(local.current_handler)
- self.assertEqual(store.user_model, User)
-
- def test_login_url(self):
- app = get_app()
- app.router.add(Rule('/', name='home', handler=HomeHandler))
-
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
- app.router.match(request)
-
- store = AuthStore(local.current_handler)
- self.assertEqual(store.login_url(),
local.current_handler.url_for('auth/login', redirect='/'))
-
- tipfy.auth.DEV_APPSERVER_APPSERVER = False
- store.config['secure_urls'] = True
- self.assertEqual(store.login_url(),
local.current_handler.url_for('auth/login', redirect='/', _scheme='https'))
- tipfy.auth.DEV_APPSERVER_APPSERVER = True
-
- def test_logout_url(self):
- app = get_app()
- app.router.add(Rule('/', name='home', handler=HomeHandler))
-
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
- app.router.match(request)
-
- store = AuthStore(local.current_handler)
- self.assertEqual(store.logout_url(),
local.current_handler.url_for('auth/logout', redirect='/'))
-
- def test_signup_url(self):
- app = get_app()
- app.router.add(Rule('/', name='home', handler=HomeHandler))
-
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
- app.router.match(request)
-
- store = AuthStore(local.current_handler)
- self.assertEqual(store.signup_url(),
local.current_handler.url_for('auth/signup', redirect='/'))
-
-
-class TestMiddleware(DataStoreTestCase, unittest.TestCase):
- def setUp(self):
- DataStoreTestCase.setUp(self)
-
- def tearDown(self):
- local.__release_local__()
-
- os.environ.pop('USER_EMAIL', None)
- os.environ.pop('USER_ID', None)
- os.environ.pop('USER_IS_ADMIN', None)
-
- def test_login_required_middleware_invalid(self):
- class MyHandler(HomeHandler):
- middleware = [LoginRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_login_required_decorator_invalid(self):
- class MyHandler(HomeHandler):
- @login_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_login_required_middleware(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- middleware = [LoginRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_login_required_decorator(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- @login_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_middleware_invalid(self):
- class MyHandler(HomeHandler):
- middleware = [UserRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_user_required_decorator_invalid(self):
- class MyHandler(HomeHandler):
- @user_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_user_required_middleware_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- middleware = [UserRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
-
- def test_user_required_decorator_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- @user_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
-
- def test_user_required_middleware_with_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- middleware = [UserRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_decorator_with_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- @user_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_if_authenticated_middleware(self):
- class MyHandler(HomeHandler):
- middleware = [UserRequiredIfAuthenticatedMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_if_authenticate_decorator(self):
- class MyHandler(HomeHandler):
- @user_required_if_authenticated
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_if_authenticated_middleware_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- middleware = [UserRequiredIfAuthenticatedMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
-
- def test_user_required_if_authenticate_decorator_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- @user_required_if_authenticated
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/signup?redirect=%2F')
-
- def test_user_required_if_authenticated_middleware_with_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- middleware = [UserRequiredIfAuthenticatedMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_user_required_if_authenticate_decorator_with_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- @user_required_if_authenticated
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_admin_required_middleware(self):
- class MyHandler(HomeHandler):
- middleware = [AdminRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_admin_required_decorator(self):
- class MyHandler(HomeHandler):
- @admin_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 302)
-
self.assertEqual(response.headers['Location'], '
http://localhost/login?redirect=%2F')
-
- def test_admin_required_middleware_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- middleware = [AdminRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 403)
-
- def test_admin_required_decorator_logged_in(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- class MyHandler(HomeHandler):
- @admin_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 403)
-
- def test_admin_required_middleware_with_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- middleware = [AdminRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 403)
-
- def test_admin_required_decorator_withd_user(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me')
-
- class MyHandler(HomeHandler):
- @admin_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 403)
-
- def test_admin_required_middleware_with_admin(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me', is_admin=True)
-
- class MyHandler(HomeHandler):
- middleware = [AdminRequiredMiddleware()]
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
- def test_admin_required_decorator_with_admin(self):
- os.environ['USER_EMAIL'] = '
m...@myself.com'
- os.environ['USER_ID'] = 'me'
-
- User.create('me', 'gae|me', is_admin=True)
-
- class MyHandler(HomeHandler):
- @admin_required
- def get(self, **kwargs):
- return Response('home sweet home')
-
- app = get_app()
- app.router.add(Rule('/', name='home', handler=MyHandler))
- client = app.get_test_client()
-
- response = client.get('/')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data, 'home sweet home')
-
-
-class TestUserModel(DataStoreTestCase, unittest.TestCase):
- def setUp(self):
- DataStoreTestCase.setUp(self)
-
- def tearDown(self):
- local.__release_local__()
-
- def test_create(self):
- user = User.create('my_username', 'my_id')
- self.assertEqual(isinstance(user, User), True)
-
- # Second one will fail to be created.
- user = User.create('my_username', 'my_id')
- self.assertEqual(user, None)
-
- def test_create_with_password_hash(self):
- user = User.create('my_username', 'my_id', password_hash='foo')
-
- self.assertEqual(isinstance(user, User), True)
- self.assertEqual(user.password, 'foo')
-
- def test_create_with_password(self):
- user = User.create('my_username', 'my_id', password='foo')
-
- self.assertEqual(isinstance(user, User), True)
- self.assertNotEqual(user.password, 'foo')
- self.assertEqual(len(user.password.split('$')), 3)
-
- def test_set_password(self):
- user = User.create('my_username', 'my_id', password='foo')
- self.assertEqual(isinstance(user, User), True)
-
- password = user.password
-
- user.set_password('bar')
- self.assertNotEqual(user.password, password)
-
- self.assertNotEqual(user.password, 'bar')
- self.assertEqual(len(user.password.split('$')), 3)
-
- def test_check_password(self):
- app = Tipfy()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- user = User.create('my_username', 'my_id', password='foo')
-
- self.assertEqual(user.check_password('foo'), True)
- self.assertEqual(user.check_password('bar'), False)
-
- def test_check_session(self):
- app = Tipfy()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- user = User.create('my_username', 'my_id', password='foo')
-
- session_id = user.session_id
- self.assertEqual(user.check_session(session_id), True)
- self.assertEqual(user.check_session('bar'), False)
-
- def test_get_by_username(self):
- user = User.create('my_username', 'my_id')
- user_1 = User.get_by_username('my_username')
-
- self.assertEqual(isinstance(user, User), True)
- self.assertEqual(isinstance(user_1, User), True)
- self.assertEqual(str(user.key()), str(user_1.key()))
-
- def test_get_by_auth_id(self):
- user = User.create('my_username', 'my_id')
- user_1 = User.get_by_auth_id('my_id')
-
- self.assertEqual(isinstance(user, User), True)
- self.assertEqual(isinstance(user_1, User), True)
- self.assertEqual(str(user.key()), str(user_1.key()))
-
- def test_unicode(self):
- user_1 = User(username='Calvin', auth_id='test', session_id='test')
- self.assertEqual(unicode(user_1), u'Calvin')
-
- def test_str(self):
- user_1 = User(username='Hobbes', auth_id='test', session_id='test')
- self.assertEqual(str(user_1), u'Hobbes')
-
- def test_eq(self):
- user_1 = User(key_name='test', username='Calvin', auth_id='test',
session_id='test')
- user_2 = User(key_name='test', username='Calvin', auth_id='test',
session_id='test')
-
- self.assertEqual(user_1, user_2)
- self.assertNotEqual(user_1, '')
-
- def test_ne(self):
- user_1 = User(key_name='test', username='Calvin', auth_id='test',
session_id='test')
- user_2 = User(key_name='test_2', username='Calvin',
auth_id='test', session_id='test')
-
- self.assertEqual((user_1 != user_2), True)
-
- def test_renew_session(self):
- app = Tipfy()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- user = User.create('my_username', 'my_id')
- user.renew_session(max_age=86400)
-
- def test_renew_session_force(self):
- app = Tipfy()
- user = User.create('my_username', 'my_id')
- user.renew_session(force=True, max_age=86400)
-
-
-class TestMiscelaneous(DataStoreTestCase, unittest.TestCase):
- def setUp(self):
- DataStoreTestCase.setUp(self)
-
- def tearDown(self):
- local.__release_local__()
-
- def test_create_session_id(self):
- self.assertEqual(len(create_session_id()), 32)
-
-
-class TestMultiAuthStore(DataStoreTestCase, unittest.TestCase):
- def setUp(self):
- DataStoreTestCase.setUp(self)
-
- def tearDown(self):
- local.__release_local__()
-
- def get_app(self):
- app = Tipfy(config={'tipfy.sessions': {
- 'secret_key': 'secret',
- }})
- local.current_handler = RequestHandler(app,
Request.from_values('/'))
- return app
-
- def test_login_with_form_invalid(self):
- app = self.get_app()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = MultiAuthStore(local.current_handler)
- res = store.login_with_form('foo', 'bar', remember=True)
-
- self.assertEqual(res, False)
-
- def test_login_with_form(self):
- user = User.create('foo', 'foo_id', password='bar')
- app = self.get_app()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = MultiAuthStore(local.current_handler)
- res = store.login_with_form('foo', 'bar', remember=True)
- self.assertEqual(res, True)
-
- res = store.login_with_form('foo', 'bar', remember=False)
- self.assertEqual(res, True)
-
- def test_login_with_auth_id(self):
- app = self.get_app()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = MultiAuthStore(local.current_handler)
- store.login_with_auth_id('foo_id', remember=False)
-
- user = User.create('foo', 'foo_id', password='bar')
- app = self.get_app()
- store.login_with_auth_id('foo_id', remember=True)
-
- def test_real_login(self):
- user = User.create('foo', 'foo_id', auth_remember=True)
- app = self.get_app()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = MultiAuthStore(local.current_handler)
- store.login_with_auth_id('foo_id', remember=False)
-
- response = Response()
- local.current_handler.session_store.save(response)
-
- request = Request.from_values('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- local.current_handler = RequestHandler(app, request)
- store = MultiAuthStore(local.current_handler)
- self.assertNotEqual(store.user, None)
- self.assertEqual(store.user.username, 'foo')
- self.assertEqual(store.user.auth_id, 'foo_id')
-
- def test_real_logout(self):
- user = User.create('foo', 'foo_id', auth_remember=True)
- app = self.get_app()
- request = Request.from_values('/')
- local.current_handler = RequestHandler(app, request)
-
- store = MultiAuthStore(local.current_handler)
- store.login_with_auth_id('foo_id', remember=False)
-
- response = Response()
- local.current_handler.session_store.save(response)
-
- request = Request.from_values('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- local.current_handler = RequestHandler(app, request)
- store = MultiAuthStore(local.current_handler)
- self.assertNotEqual(store.user, None)
- self.assertEqual(store.user.username, 'foo')
- store.logout()
-
- response = Response()
- local.current_handler.session_store.save(response)
-
- request = Request.from_values('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- local.current_handler = RequestHandler(app, request)
- store = MultiAuthStore(local.current_handler)
-
- #self.assertEqual(store.user, None)
- self.assertEqual(store.session, None)
-
- def test_real_login_no_user(self):
- app = self.get_app()
- store = MultiAuthStore(local.current_handler)
- user = store.create_user('foo', 'foo_id')
- store.login_with_auth_id('foo_id', remember=False)
-
- response = Response()
- local.current_handler.session_store.save(response)
-
- user.delete()
-
- request = Request.from_values('/', headers={
- 'Cookie': '\n'.join(response.headers.getlist('Set-Cookie')),
- })
- local.current_handler = RequestHandler(app, request)
- store = MultiAuthStore(local.current_handler)
- self.assertEqual(store.session['id'], 'foo_id')
- self.assertEqual(store.user, None)
-
- def test_real_login_invalid(self):
- app = self.get_app()
- store = MultiAuthStore(local.current_handler)
- self.assertEqual(store.user, None)
- self.assertEqual(store.session, None)
-
-
=======================================
--- /tests/test_config.py Thu Mar 31 07:24:20 2011
+++ /dev/null
@@ -1,376 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Tests for tipfy config
-"""
-from __future__ import with_statement
-
-import unittest
-
-from tipfy import Tipfy, RequestHandler, REQUIRED_VALUE
-from
tipfy.app import local
-from tipfy.config import Config
-
-
-class TestConfig(unittest.TestCase):
- def tearDown(self):
- local.__release_local__()
-
- def test_get(self):
- config = Config({'foo': {
- 'bar': 'baz',
- 'doo': 'ding',
- }})
-
- self.assertEqual(config.get('foo'), {
- 'bar': 'baz',
- 'doo': 'ding',
- })
-
- self.assertEqual(config.get('bar'), {})
-
- def test_get_existing_keys(self):
- config = Config({'foo': {
- 'bar': 'baz',
- 'doo': 'ding',
- }})
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- def test_get_existing_keys_from_default(self):
- config = Config({}, {'foo': {
- 'bar': 'baz',
- 'doo': 'ding',
- }})
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- def test_get_non_existing_keys(self):
- config = Config()
-
- self.assertRaises(KeyError, config.get_config, 'foo', 'bar')
-
- def test_get_dict_existing_keys(self):
- config = Config({'foo': {
- 'bar': 'baz',
- 'doo': 'ding',
- }})
-
- self.assertEqual(config.get_config('foo'), {
- 'bar': 'baz',
- 'doo': 'ding',
- })
-
- def test_get_dict_non_existing_keys(self):
- config = Config()
-
- self.assertRaises(KeyError, config.get_config, 'bar')
-
- def test_get_with_default(self):
- config = Config()
-
- self.assertRaises(KeyError,
config.get_config, 'foo', 'bar', 'ooops')
- self.assertRaises(KeyError,
config.get_config, 'foo', 'doo', 'wooo')
-
- def test_get_with_default_and_none(self):
- config = Config({'foo': {
- 'bar': None,
- }})
-
- self.assertEqual(config.get_config('foo', 'bar', 'ooops'), None)
-
- def test_update(self):
- config = Config({'foo': {
- 'bar': 'baz',
- 'doo': 'ding',
- }})
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- config.update('foo', {'bar': 'other'})
-
- self.assertEqual(config.get_config('foo', 'bar'), 'other')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- def test_setdefault(self):
- config = Config()
-
- self.assertRaises(KeyError, config.get_config, 'foo')
-
- config.setdefault('foo', {
- 'bar': 'baz',
- 'doo': 'ding',
- })
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- def test_setdefault2(self):
- config = Config({'foo': {
- 'bar': 'baz',
- }})
-
- self.assertEqual(config.get_config('foo'), {
- 'bar': 'baz',
- })
-
- config.setdefault('foo', {
- 'bar': 'wooo',
- 'doo': 'ding',
- })
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
- self.assertEqual(config.get_config('foo', 'doo'), 'ding')
-
- def test_setitem(self):
- config = Config()
- config['foo'] = {'bar': 'baz'}
-
- self.assertEqual(config, {'foo': {'bar': 'baz'}})
- self.assertEqual(config['foo'], {'bar': 'baz'})
-
- def test_init_no_dict_values(self):
- self.assertRaises(AssertionError, Config, {'foo': 'bar'})
- self.assertRaises(AssertionError, Config, {'foo': None})
- self.assertRaises(AssertionError, Config, 'foo')
-
- def test_init_no_dict_default(self):
- self.assertRaises(AssertionError, Config, {}, {'foo': 'bar'})
- self.assertRaises(AssertionError, Config, {}, {'foo': None})
- self.assertRaises(AssertionError, Config, {}, 'foo')
-
- def test_update_no_dict_values(self):
- config = Config()
-
- self.assertRaises(AssertionError, config.update,
{'foo': 'bar'}, 'baz')
- self.assertRaises(AssertionError, config.update, {'foo':
None}, 'baz')
- self.assertRaises(AssertionError, config.update, 'foo', 'bar')
-
- def test_setdefault_no_dict_values(self):
- config = Config()
-
- self.assertRaises(AssertionError, config.setdefault, 'foo', 'bar')
- self.assertRaises(AssertionError, config.setdefault, 'foo', None)
-
- def test_setitem_no_dict_values(self):
- config = Config()
-
- def setitem(key, value):
- config[key] = value
- return config
-
- self.assertRaises(AssertionError, setitem, 'foo', 'bar')
- self.assertRaises(AssertionError, setitem, 'foo', None)
-
-
-class TestLoadConfig(unittest.TestCase):
- def tearDown(self):
- local.__release_local__()
-
- def test_default_config(self):
- config = Config()
-
- from resources.template import default_config as template_config
- from resources.i18n import default_config as i18n_config
-
-
self.assertEqual(config.get_config('resources.template', 'templates_dir'),
template_config['templates_dir'])
- self.assertEqual(config.get_config('resources.i18n', 'locale'),
i18n_config['locale'])
- self.assertEqual(config.get_config('resources.i18n', 'timezone'),
i18n_config['timezone'])
-
- def test_default_config_with_non_existing_key(self):
- config = Config()
-
- from resources.i18n import default_config as i18n_config
-
- # In the first time the module config will be loaded normally.
- self.assertEqual(config.get_config('resources.i18n', 'locale'),
i18n_config['locale'])
-
- # In the second time it won't be loaded, but won't find the value
and then use the default.
-
self.assertEqual(config.get_config('resources.i18n', 'i_dont_exist', 'foo'), 'foo')
-
- def test_override_config(self):
- config = Config({
- 'resources.template': {
- 'templates_dir': 'apps/templates'
- },
- 'resources.i18n': {
- 'locale': 'pt_BR',
- 'timezone': 'America/Sao_Paulo',
- },
- })
-
-
self.assertEqual(config.get_config('resources.template', 'templates_dir'), 'apps/templates')
-
self.assertEqual(config.get_config('resources.i18n', 'locale'), 'pt_BR')
-
self.assertEqual(config.get_config('resources.i18n', 'timezone'), 'America/Sao_Paulo')
-
- def test_override_config2(self):
- config = Config({
- 'resources.i18n': {
- 'timezone': 'America/Sao_Paulo',
- },
- })
-
-
self.assertEqual(config.get_config('resources.i18n', 'locale'), 'en_US')
-
self.assertEqual(config.get_config('resources.i18n', 'timezone'), 'America/Sao_Paulo')
-
- def test_get(self):
- config = Config({'foo': {
- 'bar': 'baz',
- }})
-
- self.assertEqual(config.get_config('foo', 'bar'), 'baz')
-
- def test_get_with_default(self):
- config = Config()
-
-
self.assertEqual(config.get_config('resources.i18n', 'bar', 'baz'), 'baz')
-
- def test_get_with_default_and_none(self):
- config = Config({'foo': {
- 'bar': None,
- }})
-
- self.assertEqual(config.get_config('foo', 'bar', 'ooops'), None)
-
- def test_get_with_default_and_module_load(self):
- config = Config()
-
self.assertEqual(config.get_config('resources.i18n', 'locale'), 'en_US')
-
self.assertEqual(config.get_config('resources.i18n', 'locale', 'foo'), 'en_US')
-
- def test_required_config(self):
- config = Config()
- self.assertRaises(KeyError,
config.get_config, 'resources.i18n', 'foo')
-
- def test_missing_module(self):
- config = Config()
- self.assertRaises(KeyError,
config.get_config, 'i_dont_exist', 'i_dont_exist')
-
- def test_missing_module2(self):
- config = Config()
- self.assertRaises(KeyError, config.get_config, 'i_dont_exist')
-
- def test_missing_key(self):
- config = Config()
- self.assertRaises(KeyError,
config.get_config, 'resources.i18n', 'i_dont_exist')
-
- def test_missing_default_config(self):
- config = Config()
- self.assertRaises(KeyError, config.get_config, 'tipfy', 'foo')
-
- def test_request_handler_get_config(self):
- app = Tipfy()
- with app.get_test_context() as request:
- handler = RequestHandler(request)
-
-
self.assertEqual(handler.get_config('resources.i18n', 'locale'), 'en_US')
-
self.assertEqual(handler.get_config('resources.i18n', 'locale', 'foo'), 'en_US')
- self.assertEqual(handler.get_config('resources.i18n'), {
- 'locale': 'en_US',
- 'timezone': 'America/Chicago',
- 'required': REQUIRED_VALUE,
- })
-
-
-class TestLoadConfigGetItem(unittest.TestCase):
- def tearDown(self):
- local.__release_local__()
-
- def test_default_config(self):
- config = Config()
-
- from resources.template import default_config as template_config
- from resources.i18n import default_config as i18n_config
-
- self.assertEqual(config['resources.template']['templates_dir'],
template_config['templates_dir'])
- self.assertEqual(config['resources.i18n']['locale'],
i18n_config['locale'])
- self.assertEqual(config['resources.i18n']['timezone'],
i18n_config['timezone'])
-
- def test_default_config_with_non_existing_key(self):
- config = Config()
-
- from resources.i18n import default_config as i18n_config
-
- # In the first time the module config will be loaded normally.
- self.assertEqual(config['resources.i18n']['locale'],
i18n_config['locale'])
-
- # In the second time it won't be loaded, but won't find the value
and then use the default.
-
self.assertEqual(config['resources.i18n'].get('i_dont_exist', 'foo'), 'foo')
-
- def test_override_config(self):
- config = Config({
- 'resources.template': {
- 'templates_dir': 'apps/templates'
- },
- 'resources.i18n': {
- 'locale': 'pt_BR',
- 'timezone': 'America/Sao_Paulo',
- },
- })
-
-
self.assertEqual(config['resources.template']['templates_dir'], 'apps/templates')
- self.assertEqual(config['resources.i18n']['locale'], 'pt_BR')
-
self.assertEqual(config['resources.i18n']['timezone'], 'America/Sao_Paulo')
-
- def test_override_config2(self):
- config = Config({
- 'resources.i18n': {
- 'timezone': 'America/Sao_Paulo',
- },
- })
-
- self.assertEqual(config['resources.i18n']['locale'], 'en_US')
-
self.assertEqual(config['resources.i18n']['timezone'], 'America/Sao_Paulo')
-
- def test_get(self):
- config = Config({'foo': {
- 'bar': 'baz',
- }})
-
- self.assertEqual(config['foo']['bar'], 'baz')
-
- def test_get_with_default(self):
- config = Config()
-
- self.assertEqual(config['resources.i18n'].get('bar', 'baz'), 'baz')
-
- def test_get_with_default_and_none(self):
- config = Config({'foo': {
- 'bar': None,
- }})
-
- self.assertEqual(config['foo'].get('bar', 'ooops'), None)
-
- def test_get_with_default_and_module_load(self):
- config = Config()
- self.assertEqual(config['resources.i18n']['locale'], 'en_US')
-
self.assertEqual(config['resources.i18n'].get('locale', 'foo'), 'en_US')
-
- def test_required_config(self):
- config = Config()
- self.assertRaises(KeyError,
config['resources.i18n'].__getitem__, 'foo')
- self.assertRaises(KeyError,
config['resources.i18n'].__getitem__, 'required')
-
- def test_missing_module(self):
- config = Config()
- self.assertRaises(KeyError, config.__getitem__, 'i_dont_exist')
-
- def test_missing_key(self):
- config = Config()
- self.assertRaises(KeyError,
config['resources.i18n'].__getitem__, 'i_dont_exist')
-
- def test_missing_default_config(self):
- config = Config()
- self.assertRaises(KeyError, config['tipfy'].__getitem__, 'foo')
-
-
-class TestGetConfig(unittest.TestCase):
- def tearDown(self):
- local.__release_local__()
-
- '''
- def test_get_config(self):
- app = Tipfy()
- self.assertEqual(get_config('resources.i18n', 'locale'), 'en_US')
- '''
=======================================
--- /tests/test_dev.py Thu Sep 30 14:29:41 2010
+++ /dev/null
@@ -1,28 +0,0 @@
-import unittest
-
-from tipfy import Tipfy
-
-
-class TestSysPath(unittest.TestCase):
- def test_ultimate_sys_path(self):
- """Mostly here to not be marked as uncovered."""
- from
tipfy.dev import _ULTIMATE_SYS_PATH, fix_sys_path
- fix_sys_path()
-
- def test_ultimate_sys_path2(self):
- """Mostly here to not be marked as uncovered."""
- from
tipfy.dev import _ULTIMATE_SYS_PATH, fix_sys_path
- _ULTIMATE_SYS_PATH = []
- fix_sys_path()
-
- def test_ultimate_sys_path3(self):
- """Mostly here to not be marked as uncovered."""
- import sys
- path = list(sys.path)
- sys.path = []
-
- from
tipfy.dev import _ULTIMATE_SYS_PATH, fix_sys_path
- _ULTIMATE_SYS_PATH = []
- fix_sys_path()
-
- sys.path = path
=======================================
--- /tests/test_ext_jinja2.py Sat Nov 27 04:58:08 2010
+++ /dev/null
@@ -1,215 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Tests for tipfyext.jinja2
-"""
-import os
-import sys
-import unittest
-
-from jinja2 import FileSystemLoader, Environment
-
-from tipfy import RequestHandler, Request, Response, Tipfy
-from
tipfy.app import local
-from tipfyext.jinja2 import Jinja2, Jinja2Mixin
-
-current_dir = os.path.abspath(os.path.dirname(__file__))
-templates_dir = os.path.join(current_dir, 'resources', 'templates')
-templates_compiled_target =
os.path.join(current_dir, 'resources', 'templates_compiled')
-
-
-class TestJinja2(unittest.TestCase):
- def tearDown(self):
- local.__release_local__()
-
- def test_render_template(self):
- app = Tipfy(config={'tipfyext.jinja2': {'templates_dir':
templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, World!'
- res =
jinja2.render_template(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(res, message)
-
- def test_render_template_with_i18n(self):
- app = Tipfy(config={
- 'tipfyext.jinja2': {
- 'templates_dir': templates_dir,
- 'environment_args': dict(
- autoescape=True,
-
extensions=['jinja2.ext.autoescape', 'jinja2.ext.with_', 'jinja2.ext.i18n'],
- ),
- },
- 'tipfy.sessions': {
- 'secret_key': 'secret',
- },
- })
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, i18n World!'
- res =
jinja2.render_template(local.current_handler, 'template2.html',
message=message)
- self.assertEqual(res, message)
-
- def test_render_response(self):
- app = Tipfy(config={'tipfyext.jinja2': {'templates_dir':
templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, World!'
- response =
jinja2.render_response(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'text/html')
- self.assertEqual(response.data, message)
-
- def test_render_response_force_compiled(self):
- app = Tipfy(config={
- 'tipfyext.jinja2': {
- 'templates_compiled_target': templates_compiled_target,
- 'force_use_compiled': True,
- }
- }, debug=False)
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, World!'
- response =
jinja2.render_response(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'text/html')
- self.assertEqual(response.data, message)
-
- def test_jinja2_mixin_render_template(self):
- class MyHandler(RequestHandler, Jinja2Mixin):
- def __init__(self, app, request):
-
self.app = app
- self.request = request
- self.context = {}
-
- app = Tipfy(config={'tipfyext.jinja2': {'templates_dir':
templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = MyHandler(app, request)
- jinja2 = Jinja2(app)
- message = 'Hello, World!'
-
- response = handler.render_template('template1.html',
message=message)
- self.assertEqual(response, message)
-
- def test_jinja2_mixin_render_response(self):
- class MyHandler(RequestHandler, Jinja2Mixin):
- def __init__(self, app, request):
-
self.app = app
- self.request = request
- self.context = {}
-
- app = Tipfy(config={'tipfyext.jinja2': {'templates_dir':
templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = MyHandler(app, request)
- jinja2 = Jinja2(app)
- message = 'Hello, World!'
-
- response = handler.render_response('template1.html',
message=message)
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'text/html')
- self.assertEqual(response.data, message)
-
- def test_get_template_attribute(self):
- app = Tipfy(config={'tipfyext.jinja2': {'templates_dir':
templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- hello = jinja2.get_template_attribute('hello.html', 'hello')
- self.assertEqual(hello('World'), 'Hello, World!')
-
- def test_engine_factory(self):
- def get_jinja2_env():
- app =
local.current_handler.app
- cfg = app.get_config('tipfyext.jinja2')
-
- loader = FileSystemLoader(cfg.get( 'templates_dir'))
-
- return Environment(loader=loader)
-
- app = Tipfy(config={'tipfyext.jinja2': {
- 'templates_dir': templates_dir,
- 'engine_factory': get_jinja2_env,
- }})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, World!'
- res =
jinja2.render_template(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(res, message)
-
- def test_engine_factory2(self):
- old_sys_path = sys.path[:]
- sys.path.insert(0, current_dir)
-
- app = Tipfy(config={'tipfyext.jinja2': {
- 'templates_dir': templates_dir,
- 'engine_factory': 'resources.get_jinja2_env',
- }})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- message = 'Hello, World!'
- res =
jinja2.render_template(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(res, message)
-
- sys.path = old_sys_path
-
- def test_engine_factory3(self):
- app = Tipfy()
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- _globals = {'message': 'Hey there!'}
- filters = {'ho': lambda e: e + ' Ho!'}
- jinja2 = Jinja2(app, _globals=_globals, filters=filters)
-
- template = jinja2.environment.from_string("""{{ message|ho }}""")
-
- self.assertEqual(template.render(), 'Hey there! Ho!')
-
- def test_after_environment_created(self):
- def after_creation(environment):
- environment.filters['ho'] = lambda x: x + ', Ho!'
-
- app = Tipfy(config={'tipfyext.jinja2':
{'after_environment_created': after_creation}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- template = jinja2.environment.from_string("""{{ 'Hey'|ho }}""")
- self.assertEqual(template.render(), 'Hey, Ho!')
-
- def test_after_environment_created_using_string(self):
- app = Tipfy(config={'tipfyext.jinja2':
{'after_environment_created': 'resources.jinja2_after_environment_created.after_creation'}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- template = jinja2.environment.from_string("""{{ 'Hey'|ho }}""")
- self.assertEqual(template.render(), 'Hey, Ho!')
-
- def test_translations(self):
- app = Tipfy(config={
- 'tipfyext.jinja2': {
- 'environment_args': {
- 'extensions': ['jinja2.ext.i18n',],
- },
- },
- 'tipfy.sessions': {
- 'secret_key': 'foo',
- },
- })
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- jinja2 = Jinja2(app)
-
- template = jinja2.environment.from_string("""{{ _('foo = %(bar)s',
bar='foo') }}""")
- self.assertEqual(template.render(), 'foo = foo')
=======================================
--- /tests/test_ext_mako.py Thu Dec 9 16:19:23 2010
+++ /dev/null
@@ -1,75 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Tests for tipfyext.mako
-"""
-import os
-import sys
-import unittest
-
-from tipfy import RequestHandler, Request, Response, Tipfy
-from
tipfy.app import local
-from tipfyext.mako import Mako, MakoMixin
-
-current_dir = os.path.abspath(os.path.dirname(__file__))
-templates_dir = os.path.join(current_dir, 'resources', 'mako_templates')
-
-
-class TestMako(unittest.TestCase):
- def tearDown(self):
- local.__release_local__()
-
- def test_render_template(self):
- app = Tipfy(config={'tipfyext.mako': {'templates_dir':
templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- mako = Mako(app)
-
- message = 'Hello, World!'
- res =
mako.render_template(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(res, message + '\n')
-
- def test_render_response(self):
- app = Tipfy(config={'tipfyext.mako': {'templates_dir':
templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = RequestHandler(app, request)
- mako = Mako(app)
-
- message = 'Hello, World!'
- response =
mako.render_response(local.current_handler, 'template1.html',
message=message)
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'text/html')
- self.assertEqual(response.data, message + '\n')
-
- def test_mako_mixin_render_template(self):
- class MyHandler(RequestHandler, MakoMixin):
- def __init__(self, app, request):
-
self.app = app
- self.request = request
- self.context = {}
-
- app = Tipfy(config={'tipfyext.mako': {'templates_dir':
templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = MyHandler(app, request)
- mako = Mako(app)
- message = 'Hello, World!'
-
- response = handler.render_template('template1.html',
message=message)
- self.assertEqual(response, message + '\n')
-
- def test_mako_mixin_render_response(self):
- class MyHandler(RequestHandler, MakoMixin):
- def __init__(self, app, request):
-
self.app = app
- self.request = request
- self.context = {}
-
- app = Tipfy(config={'tipfyext.mako': {'templates_dir':
templates_dir}})
- request = Request.from_values()
- local.current_handler = handler = MyHandler(app, request)
- mako = Mako(app)
- message = 'Hello, World!'
-
- response = handler.render_response('template1.html',
message=message)
- self.assertEqual(isinstance(response, Response), True)
- self.assertEqual(response.mimetype, 'text/html')
- self.assertEqual(response.data, message + '\n')
=======================================
--- /tests/test_gae_acl.py Wed Dec 1 02:59:15 2010
+++ /dev/null
@@ -1,434 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Tests for tipfy.appengine.acl
-"""
-import unittest
-
-from gaetestbed import DataStoreTestCase, MemcacheTestCase
-
-from google.appengine.api import memcache
-
-from tipfy import Tipfy, Request, RequestHandler, CURRENT_VERSION_ID
-from
tipfy.app import local
-from tipfy.appengine.acl import Acl, AclRules, _rules_map, AclMixin
-
-from
tipfy.app import local
-
-
-class TestAcl(DataStoreTestCase, MemcacheTestCase, unittest.TestCase):
- def setUp(self):
- # Clean up datastore.
- super(TestAcl, self).setUp()
-
-
self.app = Tipfy()
- self.app.config['tipfy']['dev'] = False
- local.current_handler = RequestHandler(
self.app,
Request.from_values())
-
- Acl.roles_map = {}
- Acl.roles_lock = CURRENT_VERSION_ID
- _rules_map.clear()
-
- def tearDown(self):
- local.__release_local__()
- self.app.config['tipfy']['dev'] = True
-
- Acl.roles_map = {}
- Acl.roles_lock = CURRENT_VERSION_ID
- _rules_map.clear()
-
- def test_test_insert_or_update(self):
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertEqual(user_acl, None)
-
- # Set empty rules.
- user_acl = AclRules.insert_or_update(area='test', user='test')
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertNotEqual(user_acl, None)
- self.assertEqual(user_acl.rules, [])
- self.assertEqual(user_acl.roles, [])
-
- rules = [
- ('topic_1', 'name_1', True),
- ('topic_1', 'name_2', True),
- ('topic_2', 'name_1', False),
- ]
-
- user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertNotEqual(user_acl, None)
- self.assertEqual(user_acl.rules, rules)
- self.assertEqual(user_acl.roles, [])
-
- extra_rule = ('topic_3', 'name_3', True)
- rules.append(extra_rule)
-
- user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules, roles=['foo', 'bar', 'baz'])
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertNotEqual(user_acl, None)
- self.assertEqual(user_acl.rules, rules)
- self.assertEqual(user_acl.roles, ['foo', 'bar', 'baz'])
-
- def test_set_rules(self):
- """Test setting and appending rules."""
- rules = [
- ('topic_1', 'name_1', True),
- ('topic_1', 'name_2', True),
- ('topic_2', 'name_1', False),
- ]
- extra_rule = ('topic_3', 'name_3', True)
-
- # Set empty rules.
- user_acl = AclRules.insert_or_update(area='test', user='test')
-
- # Set rules and save the record.
- user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
-
- # Fetch the record again, and compare.
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertEqual(user_acl.rules, rules)
-
- # Append more rules.
- user_acl.rules.append(extra_rule)
- user_acl.put()
- rules.append(extra_rule)
-
- # Fetch the record again, and compare.
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertEqual(user_acl.rules, rules)
-
- def test_delete_rules(self):
- rules = [
- ('topic_1', 'name_1', True),
- ('topic_1', 'name_2', True),
- ('topic_2', 'name_1', False),
- ]
- user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
-
- # Fetch the record again, and compare.
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- self.assertEqual(user_acl.rules, rules)
-
- key_name = AclRules.get_key_name('test', 'test')
- acl = Acl('test', 'test')
-
- cached = memcache.get(key_name, namespace=AclRules.__name__)
- self.assertEqual(key_name in _rules_map, True)
- self.assertEqual(cached, _rules_map[key_name])
-
- user_acl.delete()
- user_acl2 = AclRules.get_by_area_and_user('test', 'test')
-
- cached = memcache.get(key_name, namespace=AclRules.__name__)
- self.assertEqual(user_acl2, None)
- self.assertEqual(key_name not in _rules_map, True)
- self.assertEqual(cached, None)
-
- def test_is_rule_set(self):
- rules = [
- ('topic_1', 'name_1', True),
- ('topic_1', 'name_2', True),
- ('topic_2', 'name_1', False),
- ]
- user_acl = AclRules.insert_or_update(area='test', user='test',
rules=rules)
-
- # Fetch the record again, and compare.
- user_acl = AclRules.get_by_area_and_user('test', 'test')
-
- self.assertEqual(user_acl.is_rule_set(*rules[0]), True)
- self.assertEqual(user_acl.is_rule_set(*rules[1]), True)
- self.assertEqual(user_acl.is_rule_set(*rules[2]), True)
- self.assertEqual(user_acl.is_rule_set('topic_1', 'name_3', True),
False)
-
- def test_no_area_or_no_user(self):
- acl1 = Acl('foo', None)
- acl2 = Acl(None, 'foo')
-
- self.assertEqual(acl1.has_any_access(), False)
- self.assertEqual(acl2.has_any_access(), False)
-
- def test_default_roles_lock(self):
- Acl.roles_lock = None
- acl2 = Acl('foo', 'foo')
-
- self.assertEqual(acl2.roles_lock, CURRENT_VERSION_ID)
-
- def test_set_invalid_rules(self):
- rules = {}
- self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
-
- rules = ['foo', 'bar', True]
- self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
-
- rules = [('foo',)]
- self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
-
- rules = [('foo', 'bar')]
- self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
-
- rules = [(1, 2, 3)]
- self.assertRaises(AssertionError, AclRules.insert_or_update,
area='test', user='test', rules=rules)
-
- rules = [('foo', 'bar', True)]
- AclRules.insert_or_update(area='test', user='test', rules=rules)
- user_acl = AclRules.get_by_area_and_user('test', 'test')
- user_acl.rules.append((1, 2, 3))
- self.assertRaises(AssertionError, user_acl.put)
-
- def test_example(self):
- """Tests the example set in the acl module."""
- # Set a dict of roles with an 'admin' role that has full access
and assign
- # users to it. Each role maps to a list of rules. Each rule, a
tuple
- # (topic, name, flag), where flag, as bool to allow or disallow
access.
- # Wildcard '*' can be used to match all topics and/or names.
- Acl.roles_map = {
- 'admin': [
- ('*', '*', True),
- ],
- }
-
- # Assign users 'user_1' and 'user_2' to the 'admin' role.
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['admin'])
- AclRules.insert_or_update(area='my_area', user='user_2',
roles=['admin'])
-
- # Restrict 'user_2' from accessing a specific resource, adding a
new rule
- # with flag set to False. Now this user has access to everything
except this
- # resource.
- user_acl = AclRules.get_by_area_and_user('my_area', 'user_2')
- user_acl.rules.append(('UserAdmin', '*', False))
- user_acl.put()
-
- # Check 'user_2' permission.
- acl = Acl(area='my_area', user='user_2')
- self.assertEqual(acl.has_access(topic='UserAdmin', name='save'),
False)
- self.assertEqual(acl.has_access(topic='UserAdmin', name='get'),
False)
- self.assertEqual(acl.has_access(topic='AnythingElse', name='put'),
True)
-
- def test_is_one(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
-
- acl = Acl(area='my_area', user='user_1')
- self.assertEqual(acl.is_one('editor'), True)
- self.assertEqual(acl.is_one('designer'), True)
- self.assertEqual(acl.is_one('admin'), False)
-
- def test_is_any(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
-
- acl = Acl(area='my_area', user='user_1')
- self.assertEqual(acl.is_any(['editor', 'admin']), True)
- self.assertEqual(acl.is_any(['admin', 'designer']), True)
- self.assertEqual(acl.is_any(['admin', 'user']), False)
-
- def test_is_all(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
-
- acl = Acl(area='my_area', user='user_1')
- self.assertEqual(acl.is_all(['editor', 'admin']), False)
- self.assertEqual(acl.is_all(['admin', 'designer']), False)
- self.assertEqual(acl.is_all(['admin', 'user']), False)
- self.assertEqual(acl.is_all(['editor', 'designer']), True)
-
- def test_non_existent_user(self):
- acl = Acl(area='my_area', user='user_3')
- self.assertEqual(acl.has_any_access(), False)
-
- def test_has_any_access(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor', 'designer'])
- AclRules.insert_or_update(area='my_area', user='user_2',
rules=[('*', '*', True)])
- AclRules.insert_or_update(area='my_area', user='user_3')
-
- acl = Acl(area='my_area', user='user_1')
- self.assertEqual(acl.has_any_access(), True)
-
- acl = Acl(area='my_area', user='user_2')
- self.assertEqual(acl.has_any_access(), True)
-
- acl = Acl(area='my_area', user='user_3')
- self.assertEqual(acl.has_any_access(), False)
- self.assertEqual(acl._rules, [])
- self.assertEqual(acl._roles, [])
-
- def test_has_access_invalid_parameters(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
rules=[('*', '*', True)])
-
- acl1 = Acl(area='my_area', user='user_1')
-
- self.assertRaises(ValueError, acl1.has_access, 'content', '*')
- self.assertRaises(ValueError, acl1.has_access, '*', 'content')
-
- def test_has_access(self):
- AclRules.insert_or_update(area='my_area', user='user_1',
rules=[('*', '*', True)])
- AclRules.insert_or_update(area='my_area', user='user_2',
rules=[('content', '*', True), ('content', 'delete', False)])
- AclRules.insert_or_update(area='my_area', user='user_3',
rules=[('content', 'read', True)])
-
- acl1 = Acl(area='my_area', user='user_1')
- acl2 = Acl(area='my_area', user='user_2')
- acl3 = Acl(area='my_area', user='user_3')
-
- self.assertEqual(acl1.has_access('content', 'read'), True)
- self.assertEqual(acl1.has_access('content', 'update'), True)
- self.assertEqual(acl1.has_access('content', 'delete'), True)
-
- self.assertEqual(acl2.has_access('content', 'read'), True)
- self.assertEqual(acl2.has_access('content', 'update'), True)
- self.assertEqual(acl2.has_access('content', 'delete'), False)
-
- self.assertEqual(acl3.has_access('content', 'read'), True)
- self.assertEqual(acl3.has_access('content', 'update'), False)
- self.assertEqual(acl3.has_access('content', 'delete'), False)
-
- def test_has_access_with_roles(self):
- Acl.roles_map = {
- 'admin': [('*', '*', True),],
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete',
False)],
- 'designer': [('design', '*', True),],
- }
-
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['admin'])
- acl1 = Acl(area='my_area', user='user_1')
-
- AclRules.insert_or_update(area='my_area', user='user_2',
roles=['admin'], rules=[('ManageUsers', '*', False)])
- acl2 = Acl(area='my_area', user='user_2')
-
- AclRules.insert_or_update(area='my_area', user='user_3',
roles=['editor'])
- acl3 = Acl(area='my_area', user='user_3')
-
- AclRules.insert_or_update(area='my_area', user='user_4',
roles=['contributor'], rules=[('design', '*', True),])
- acl4 = Acl(area='my_area', user='user_4')
-
- self.assertEqual(acl1.has_access('ApproveUsers', 'save'), True)
- self.assertEqual(acl1.has_access('ManageUsers', 'edit'), True)
- self.assertEqual(acl1.has_access('ManageUsers', 'delete'), True)
-
- self.assertEqual(acl1.has_access('ApproveUsers', 'save'), True)
- self.assertEqual(acl2.has_access('ManageUsers', 'edit'), False)
- self.assertEqual(acl2.has_access('ManageUsers', 'delete'), False)
-
- self.assertEqual(acl3.has_access('ApproveUsers', 'save'), False)
- self.assertEqual(acl3.has_access('ManageUsers', 'edit'), False)
- self.assertEqual(acl3.has_access('ManageUsers', 'delete'), False)
- self.assertEqual(acl3.has_access('content', 'edit'), True)
- self.assertEqual(acl3.has_access('content', 'delete'), True)
- self.assertEqual(acl3.has_access('content', 'save'), True)
- self.assertEqual(acl3.has_access('design', 'edit'), False)
- self.assertEqual(acl3.has_access('design', 'delete'), False)
-
- self.assertEqual(acl4.has_access('ApproveUsers', 'save'), False)
- self.assertEqual(acl4.has_access('ManageUsers', 'edit'), False)
- self.assertEqual(acl4.has_access('ManageUsers', 'delete'), False)
- self.assertEqual(acl4.has_access('content', 'edit'), True)
- self.assertEqual(acl4.has_access('content', 'delete'), False)
- self.assertEqual(acl4.has_access('content', 'save'), True)
- self.assertEqual(acl4.has_access('design', 'edit'), True)
- self.assertEqual(acl4.has_access('design', 'delete'), True)
-
- def test_roles_lock_unchanged(self):
- roles_map1 = {
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete',
False)],
- }
- Acl.roles_map = roles_map1
- Acl.roles_lock = 'initial'
-
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
- acl1 = Acl(area='my_area', user='user_1')
-
- AclRules.insert_or_update(area='my_area', user='user_2',
roles=['contributor'])
- acl2 = Acl(area='my_area', user='user_2')
-
- self.assertEqual(acl1.has_access('content', 'add'), True)
- self.assertEqual(acl1.has_access('content', 'edit'), True)
- self.assertEqual(acl1.has_access('content', 'delete'), True)
-
- self.assertEqual(acl2.has_access('content', 'add'), True)
- self.assertEqual(acl2.has_access('content', 'edit'), True)
- self.assertEqual(acl2.has_access('content', 'delete'), False)
-
- roles_map2 = {
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete',
False), ('content', 'add', False)],
- }
- Acl.roles_map = roles_map2
- # Don't change the lock to check that the cache will be kept.
- # Acl.roles_lock = 'changed'
-
- acl1 = Acl(area='my_area', user='user_1')
- acl2 = Acl(area='my_area', user='user_2')
-
- self.assertEqual(acl1.has_access('content', 'add'), True)
- self.assertEqual(acl1.has_access('content', 'edit'), True)
- self.assertEqual(acl1.has_access('content', 'delete'), True)
-
- self.assertEqual(acl2.has_access('content', 'add'), True)
- self.assertEqual(acl2.has_access('content', 'edit'), True)
- self.assertEqual(acl2.has_access('content', 'delete'), False)
-
- def test_roles_lock_changed(self):
- roles_map1 = {
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete',
False)],
- }
- Acl.roles_map = roles_map1
- Acl.roles_lock = 'initial'
-
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
- acl1 = Acl(area='my_area', user='user_1')
-
- AclRules.insert_or_update(area='my_area', user='user_2',
roles=['contributor'])
- acl2 = Acl(area='my_area', user='user_2')
-
- self.assertEqual(acl1.has_access('content', 'add'), True)
- self.assertEqual(acl1.has_access('content', 'edit'), True)
- self.assertEqual(acl1.has_access('content', 'delete'), True)
-
- self.assertEqual(acl2.has_access('content', 'add'), True)
- self.assertEqual(acl2.has_access('content', 'edit'), True)
- self.assertEqual(acl2.has_access('content', 'delete'), False)
-
- roles_map2 = {
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete',
False), ('content', 'add', False)],
- }
- Acl.roles_map = roles_map2
- Acl.roles_lock = 'changed'
-
- acl1 = Acl(area='my_area', user='user_1')
- acl2 = Acl(area='my_area', user='user_2')
-
- self.assertEqual(acl1.has_access('content', 'add'), True)
- self.assertEqual(acl1.has_access('content', 'edit'), True)
- self.assertEqual(acl1.has_access('content', 'delete'), True)
-
- self.assertEqual(acl2.has_access('content', 'add'), False)
- self.assertEqual(acl2.has_access('content', 'edit'), True)
- self.assertEqual(acl2.has_access('content', 'delete'), False)
-
- def test_acl_mixin(self):
- roles_map1 = {
- 'editor': [('content', '*', True),],
- 'contributor': [('content', '*', True), ('content', 'delete',
False)],
- }
- AclRules.insert_or_update(area='my_area', user='user_1',
roles=['editor'])
-
- class Area(object):
- def key(self):
- return 'my_area'
-
- class User(object):
- def key(self):
- return 'user_1'
-
- class MyHandler(AclMixin):
- roles_map = roles_map1
- roles_lock = 'foo'
-
- def __init__(self):
- self.area = Area()
- self.current_user = User()
-
- handler = MyHandler()
- self.assertEqual(handler.acl.has_access('content', 'add'), True)
- self.assertEqual(handler.acl.has_access('content', 'edit'), True)
- self.assertEqual(handler.acl.has_access('content', 'delete'), True)
- self.assertEqual(handler.acl.has_access('foo', 'delete'), False)
=======================================
--- /tests/test_gae_blobstore.py Wed Dec 1 02:59:15 2010
+++ /dev/null
@@ -1,90 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Tests for tipfy.appengine.blobstore
-"""
-import datetime
-import decimal
-import email
-import StringIO
-import time
-import unittest
-
-from google.appengine.ext import blobstore
-
-from tipfy.appengine.blobstore import (CreationFormatError,
parse_blob_info,
- parse_creation)
-
-from werkzeug import FileStorage
-
-
-class TestParseCreation(unittest.TestCase):
- """YYYY-mm-dd HH:MM:SS.ffffff"""
- def test_invalid_format(self):
- self.assertRaises(CreationFormatError, parse_creation, '2010-05-20
6:20:35', 'my_field_name')
-
- def test_invalid_format2(self):
- self.assertRaises(CreationFormatError, parse_creation, '2010-05-20
6:20:35.1234.5678', 'my_field_name')
-
- def test_invalid_format3(self):
- self.assertRaises(CreationFormatError,
parse_creation, 'youcannot.parseme', 'my_field_name')
-
- def test_parse(self):
- timestamp = time.time()
- parts = str(timestamp).split('.', 1)
- ms = parts[1][:4]
- timestamp = decimal.Decimal(parts[0] + '.' + ms)
- curr_date = datetime.datetime.fromtimestamp(timestamp)
-
- to_convert = '%s.%s' % (curr_date.strftime('%Y-%m-%d %H:%M:%S'),
ms)
-
- res = parse_creation(to_convert, 'my_field_name')
-
- self.assertEqual(res.timetuple(), curr_date.timetuple())
-
-
-
-class TestParseBlobInfo(unittest.TestCase):
- def test_none(self):
- self.assertEqual(parse_blob_info(None, 'my_field_name'), None)
-
- def test_file(self):
- stream = StringIO.StringIO()
- stream.write("""\
-Content-Type: application/octet-stream
-Content-Length: 1
-X-AppEngine-Upload-Creation: 2010-10-01 05:34:00.000000
-""")
- stream.seek(0)
- headers = {}
- headers['Content-Type'] = 'image/png; blob-key=foo'
-
- f = FileStorage(stream=stream, headers=headers)
- self.assertNotEqual(parse_blob_info(f, 'my_field_name'), None)
-
- def test_invalid_size(self):
- stream = StringIO.StringIO()
- stream.write("""\
-Content-Type: application/octet-stream
-Content-Length: zzz
-X-AppEngine-Upload-Creation: 2010-10-01 05:34:00.000000
-""")
- stream.seek(0)
- headers = {}
- headers['Content-Type'] = 'image/png; blob-key=foo'
-
- f = FileStorage(stream=stream, headers=headers)
- self.assertRaises(blobstore.BlobInfoParseError,
parse_blob_info,f, 'my_field_name')
-
- def test_invalid_CREATION(self):
- stream = StringIO.StringIO()
- stream.write("""\
-Content-Type: application/octet-stream
-Content-Length: 1
-X-AppEngine-Upload-Creation: XXX
-""")
- stream.seek(0)
- headers = {}
- headers['Content-Type'] = 'image/png; blob-key=foo'
-
- f = FileStorage(stream=stream, headers=headers)
- self.assertRaises(blobstore.BlobInfoParseError,
parse_blob_info,f, 'my_field_name')
=======================================
***Additional files exist in this changeset.***