Hi all,
I have been experimenting with task handlers that page through a
number of entities and run into some severe memory problems.
I've included my test application below:
- Setup handler to insert a number of entities with ~25k of data each
- Basic task handler that initiates a request with a longer deadline
to total the length of data in all of the entities
- Page task handler that accomplishes the same using cursors and paged
queries.
There is a constant called BATCH_SIZE which can be modified to test
various scenarios. In order to stay within the instance memory limits
I am regularly clearing the NDB context cache.
However, the results are not great:
BasicTaskHandler:
- BATCH_SIZE = 25, Success, instance memory usage grows to 100MB and
remains after request.
- BATCH_SIZE = 50, Success but memory usage grows to 143MB and
instance is killed after request due to high memory.
- BATCH_SIZE = 100, Failure due to high memory usage.
PageTaskHandler:
- Failure due to memory usage on all batch sizes.
It seems crazy to think that the application can only count 25
entities at a time without failing (I'm aware there is a great deal of
overhead, but theoretically 25 * 25k = 625k). There also seems to be
some problem with leaking memory between requests.
I would appreciate any insight from the list.
Thanks,
Nikolaj
from google.appengine.api import taskqueue
from google.appengine.ext import webapp, ndb
from google.appengine.ext.webapp import util
import logging
BATCH_SIZE = 25
class Entity(ndb.Model):
text = ndb.TextProperty()
class SetupHandler(webapp.RequestHandler):
def get(self):
for n in xrange(100):
entities = [Entity(text='a' * 25000) for n in xrange(100)]
ndb.put_multi(entities)
ndb.get_context().clear_cache()
class BasicTaskHandler(webapp.RequestHandler):
def get(self):
self.queue()
def post(self):
if self.request.headers.get('X-AppEngine-TaskRetryCount',
'0') != '0':
logging.warning("Preventing retry.")
return
total = 0
entity_query = Entity.query()
for entity in entity_query.iter(batch_size=BATCH_SIZE):
total += len(entity.text)
ndb.get_context().clear_cache()
logging.info("Total: %d" % total)
@classmethod
def queue(cls):
task = taskqueue.Task(url='/task/basic')
taskqueue.Queue().add(task)
class PageTaskHandler(webapp.RequestHandler):
def get(self):
self.queue()
def post(self):
if self.request.headers.get('X-AppEngine-TaskRetryCount',
'0') != '0':
logging.warning("Preventing retry.")
return
total = 0
entity_query = Entity.query()
cursor = ndb.Cursor()
while True:
entities, cursor, more =
entity_query.fetch_page(BATCH_SIZE, start_cursor=cursor)
for entity in entities:
total += len(entity.text)
ndb.get_context().clear_cache()
if not more:
break
logging.info("Total: %d" % total)
@classmethod
def queue(cls):
task = taskqueue.Task(url='/task/page')
taskqueue.Queue().add(task)
app = webapp.WSGIApplication([
('/setup', SetupHandler),
('/task/basic', BasicTaskHandler),
('/task/page', PageTaskHandler)
], debug=True)
def main():
util.run_wsgi_app(app)
if __name__ == '__main__':
main()