Pipeline API Causing TaskTooLargeError when Payload is Small

66 views
Skip to first unread message

Aaron

unread,
Apr 28, 2011, 9:26:34 PM4/28/11
to Google App Engine Pipeline API
Hi,

I posted this question a few days ago in the general app engine group,
but felt this would be more appropriate here.

I'm getting a TaskTooLarge Error when starting a pipeline, but I've
logged the arguments passed in, and the size of the parameters are
all well within the size limit. I feel like maybe the pipeline api
stores all child tasks within some larger task, but can't find
anything in the documentation related to his.

I've attached the pipeline that is causing problems and the
traceback. As you can see, from my logs it appears that the run
function is completely executed before the TB is raised. I'm not sure
what happens after this that could cause the error. Do the tasks
enqueued in the background by the pipeline api occur after the run
definition is completely processed? And if so, are pipeline future
objects really that big? It seems like a list of 3 pipeline futures
is causing payload problems.

Any help would be greatly appreciated.


Code:
class Blocker(pipeline.Pipeline):
def run(self, *pipeline_values):
return 1

class QueueDateContentReportPulls(pipeline.Pipeline):
def run(self, userkey, dates, organic, no_results,
*pipeline_values):
date = dates[0]
batch_size = 100
this_date_tasks = []
index = 1
logging.info("%s results in queuedatecontent" % no_results)
while index+batch_size<=no_results:
this_date_tasks += [ (yield
PullContentReportPipelineItem(userkey,[date],index,organic)) ]
index += batch_size
blocker_batch_size = 3
while len(this_date_tasks)>blocker_batch_size:
tmp = []
new_date_tasks = []
for i in range(0,len(this_date_tasks)):
tmp += [this_date_tasks[i]]
if len(tmp)>=blocker_batch_size:
new_date_tasks += [ (yield Blocker(*tmp)) ]
tmp = []
if len(tmp)>0:
new_date_tasks += [ (yield Blocker(*tmp)) ]
this_date_tasks = new_date_tasks
this_date_tasks += [ (yield Blocker(*this_date_tasks)) ]
logging.info("is it reaching this?")
return

Traceback:
Running
logic.ga.QueueDateContentReportPulls(*(u'ahF3b3JkZmluLXN0YWdpbmcxMnINCxIFVXNlcnMYm-8CDA',
[1303937953], True, 11738, 1), **{})#e6b7d5dc71fb11e09dad87b25f39df37
I 2011-04-28 18:01:17.285
11738 results in queuedatecontent
I 2011-04-28 18:01:17.301
is it reaching this?
E 2011-04-28 18:01:19.165
Task size must be less than 10240; found 13911
Traceback (most recent call last):
File "/base/python_runtime/python_lib/versions/1/google/appengine/
ext/webapp/__init__.py", line 636, in __call__
handler.post(*groups)
File "/base/data/home/apps/wordfin-staging12/1.350050126581756725/
pipeline/pipeline.py", line 2242, in post
attempt=int(self.request.get('attempt', '0')))
File "/base/data/home/apps/wordfin-staging12/1.350050126581756725/
pipeline/pipeline.py", line 1985, in evaluate
pipelines_to_run=pipelines_to_run)
File "/base/data/home/apps/wordfin-staging12/1.350050126581756725/
pipeline/pipeline.py", line 2095, in transition_run
db.run_in_transaction(txn)
File "/base/python_runtime/python_lib/versions/1/google/appengine/
api/datastore.py", line 2148, in RunInTransaction
DEFAULT_TRANSACTION_RETRIES, function, *args, **kwargs)
File "/base/python_runtime/python_lib/versions/1/google/appengine/
api/datastore.py", line 2247, in RunInTransactionCustomRetries
ok, result = _DoOneTry(new_connection, function, args, kwargs)
File "/base/python_runtime/python_lib/versions/1/google/appengine/
api/datastore.py", line 2269, in _DoOneTry
result = function(*args, **kwargs)
File "/base/data/home/apps/wordfin-staging12/1.350050126581756725/
pipeline/pipeline.py", line 2072, in txn
params=dict(pipeline_key=[str(key) for key in pipelines_to_run]))
File "/base/python_runtime/python_lib/versions/1/google/appengine/
api/labs/taskqueue/taskqueue.py", line 532, in __init__
(MAX_TASK_SIZE_BYTES, self.size))
TaskTooLargeError: Task size must be less than 10240; found 13911

Aaron

unread,
Apr 29, 2011, 4:30:15 AM4/29/11
to Google App Engine Pipeline API
I haven't yet figured out a solution to this problem. I wanted to
update this to clarify what I'm trying to accomplish by using the
Pipeline API.

Essentially, I have 101 tasks that I need to schedule. However, the
last task must wait until the first 100 tasks are completed.

The way that I first tried to implement this was to schedule the first
100 tasks as tasks with the pipeline api. Then, I stored the pipeline
future objects from these 100 tasks and passed it in as a keyword
argument to the last task that needed to wait until the first batch of
tasks completed. This failed with a TaskTooLargeError. I assumed it
was because the list of 100 pipeline future objects was too big.

So, the second solution I tried (as displayed in the provided code)
was to create intermediate Blocker tasks. Each Blocker task would
wait for 3 of the first 100 tasks to finish. Therefore, I ended up
with 34 Blocker tasks. Then, I created another layer of Blocker
tasks, each which waits for 3 of the initial Blocker tasks. I
continued this until there was no point in my code where more than 3
pipeline future objects were passed into a pipeline task. However, I
am still getting the TaskTooLargeError.

I hope this clarifies my problem. I am more than happy to provide more
detail if it would help.
> logic.ga.QueueDateContentReportPulls(*(u'ahF3b3JkZmluLXN0YWdpbmcxMnINCxIFVX NlcnMYm-8CDA',

Brett Slatkin

unread,
Apr 29, 2011, 12:07:21 PM4/29/11
to app-engine-...@googlegroups.com
Hey Aaron,

On Thu, Apr 28, 2011 at 6:26 PM, Aaron <aaron.t...@gmail.com> wrote:
> Hi,
>
> I posted this question a few days ago in the general app engine group,
> but felt this would be more appropriate here.

Ahh sorry I missed that; I monitor this list much more than the other ones.

> I'm getting a TaskTooLarge Error when starting a pipeline, but I've
> logged the  arguments passed in, and the size of the parameters are
> all well within the size limit.  I feel like maybe the pipeline api
> stores all child tasks within some larger task, but can't find
> anything in the documentation related to his.

This is a known limitation and I'm sorry for the trouble. The task
payload limit is 10KB right now, which is what's hurting you. Everyone
would like that limit to be higher and I believe that's in the cards.

What your log tells me is your pipeline is getting scheduled just fine
(so job.start() runs okay), but when it runs and tries to schedule
child jobs it explodes. The reason is the "fanout handler" task
payload has all the keys of the child pipelines. I've described an
alternative approach in this issue which I think will solve your
problem:

http://code.google.com/p/appengine-pipeline/issues/detail?id=26

Not sure when I'll be able to get a patch out to fix this but hopefully soon.

In the meantime, the trivial solution is to split your iteration up
into two child pipelines, one issuing children 1-50, the other 51-100.
The task payload of each of those should be 13KB/2, and thus should
get scheduled fine. You'll just need to tweak your parameters and
introduce a new parent pipeline to coordinate this. The parent
pipeline can also send off the 101th pipeline that relies on the other
100.

Hope that helps!

-Brett

Brett Slatkin

unread,
Apr 29, 2011, 1:04:51 PM4/29/11
to app-engine-...@googlegroups.com, aaron.t...@gmail.com
Hey Aaron,

I had time right now, so I've checked in a fix for your issue:

http://code.google.com/p/appengine-pipeline/source/detail?r=39

Can you pull the source again and try it out? Let me know that it
fixes your issue.

-Brett

Aaron

unread,
Apr 29, 2011, 8:38:24 PM4/29/11
to Google App Engine Pipeline API
Hi Brett,

Thanks a lot for your time. Your update fixed the scenario when I'm
only delaying a batch of ~100 tasks. However, when I am trying to
handle larger data sets, I am still running into this error. In
particular, the number of tasks that I need to wait for can range
anywhere from 100-2000. I ran into the same error when I needed to
wait for 1280 tasks before executing the last task.

From the description of the fix, it seems like this isn't expected
behavior. Is there something I'm missing?

-Aaron

On Apr 29, 10:04 am, Brett Slatkin <bslat...@google.com> wrote:
> Hey Aaron,
>
> I had time right now, so I've checked in a fix for your issue:
>
> http://code.google.com/p/appengine-pipeline/source/detail?r=39
>
> Can you pull the source again and try it out? Let me know that it
> fixes your issue.
>
> -Brett
>
>
>
>
>
>
>
> On Fri, Apr 29, 2011 at 9:07 AM, Brett Slatkin <bslat...@google.com> wrote:
> > Hey Aaron,
>

Brett Slatkin

unread,
May 2, 2011, 2:16:20 PM5/2/11
to app-engine-...@googlegroups.com
Hey Aaron,

On Fri, Apr 29, 2011 at 5:38 PM, Aaron <aaron.t...@gmail.com> wrote:
> Thanks a lot for your time.  Your update fixed the scenario when I'm
> only delaying a batch of ~100 tasks.  However, when I am trying to
> handle larger data sets, I am still running into this error.  In
> particular, the number of tasks that I need to wait for can range
> anywhere from 100-2000.  I ran into the same error when I needed to
> wait for 1280 tasks before executing the last task.
>
> From the description of the fix, it seems like this isn't expected
> behavior.  Is there something I'm missing?

Even with the optimization I added, you're going to hit the 10KB task
size limit. Until the task size limit is raised, you should try the
work-arounds I described in my first response to you. Essentially, I'm
suggesting you split the 1000+ child pipelines into multiple parts
(say, that yield 100 children each) and then have a parent task wait
on those 10 child tasks (thus, 10 children * 1000+ children's children
= 10,000 tasks in all).

-Brett

Reply all
Reply to author
Forward
0 new messages