How to test with webtest and multiprocessing in GAE

89 views
Skip to first unread message

puriketu99

unread,
Aug 6, 2014, 11:49:12 AM8/6/14
to google-a...@googlegroups.com
Hello,experts.

Please tell me how to solve a following problem.
I would like to use webtest with multiprocess but it fails.

result
======================================================================
FAIL: test_answer (testlab.LabTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Library/Python/2.7/site-packages/mock.py", line 1201, in patched
    return func(*args, **keywargs)
  File "/Users/unko/dropbox/test/testlab.py", line 48, in test_answer
    self.assertEqual(u.param,"bar")
AssertionError: 'foo' != 'bar'

----------------------------------------------------------------------
testlab.py
#!-*- coding: utf-8 -*-

import unittest
import webtest
import webapp2
from google.appengine.ext import testbed,ndb
import json
import time
from google.appengine.api import apiproxy_stub_map 
from google.appengine.api import urlfetch_stub 
from mock import patch, Mock
from google.appengine.ext import db
from lab import Lab
from lab import Unko
import multiprocessing


class LabTestCase(unittest.TestCase):
  def setUp(self):
    app = webapp2.WSGIApplication([
      ('/lab', Lab),
      ('/(.*)', Lab)
    ],debug=True)
    self.testapp = webtest.TestApp(app)
    self.testbed = testbed.Testbed()
    self.testbed.setup_env(app_id='sagifugoh')
    self.testbed.activate()
    self.testbed.init_datastore_v3_stub()
    self.testbed.init_memcache_stub()
    self.testbed.init_channel_stub()
    self.testbed.init_urlfetch_stub()
  def tearDown(self):
    self.testbed.deactivate()
  @patch('google.appengine.api.urlfetch.urlfetch_service_pb.URLFetchResponse')
  def test_answer(self, URLFetchResponse):
    def request(param):
      response = self.testapp.post('/lab',{"key":"key","param":param})
    def async(param):
      p = multiprocessing.Process(target=request,args=[param])
      jobs.append(p)
      p.start()
    jobs = []
    u = Unko.get_or_insert("key")
    u.param = "foo"
    u.put()
    async("bar")
    time.sleep(2)
    self.assertEqual(u.param,"bar")

if __name__ == '__main__':
  unittest.main()
lab.py
#!-*- coding: utf-8 -*-
import webapp2
from google.appengine.ext import db

class Unko(db.Model):
  param = db.StringProperty()

class Lab(webapp2.RequestHandler):
  def post(self):
    key =  self.request.get('key')
    param = self.request.get('param')
    u = Unko.get_or_insert(key)
    u.param = param
    u.put()

Vinny P

unread,
Aug 7, 2014, 11:07:49 PM8/7/14
to google-a...@googlegroups.com
On Wed, Aug 6, 2014 at 10:49 AM, puriketu99 <puriket...@gmail.com> wrote:
Hello,experts. Please tell me how to solve a following problem. I would like to use webtest with multiprocess but it fails.
import multiprocessing


 
 
-----------------
-Vinny P
Technology & Media Consultant
Chicago, IL

App Engine Code Samples: http://www.learntogoogleit.com


puriketu.white

unread,
Aug 7, 2014, 11:23:43 PM8/7/14
to google-a...@googlegroups.com
I did not know it.thx.


--
You received this message because you are subscribed to a topic in the Google Groups "Google App Engine" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/google-appengine/9s3lynDu_Cc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to google-appengi...@googlegroups.com.
To post to this group, send email to google-a...@googlegroups.com.
Visit this group at http://groups.google.com/group/google-appengine.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages