I have a client pipeline which needs to post a request to an external calculation server, then retrieve the results. The calculation takes some time, so the external server processes it in a task queue. The client pipeline isn't exactly sure when the results will be ready; instead it needs to poll the server (get_status(id)), then if status==Completed retrieve the results (get_results(id))
Problem is that occasionally the server calculation fails, in which case I need the client pipeline to retry. I have a pipeline pattern based on this post:
which looks as follows:
class ReducePipeline(pipeline.Pipeline):
def run(self, *args):
results=[result for result in list(args)
if result]
if results==[]:
return None
return results[0]
class CallbackPipeline(pipeline.Pipeline):
def run(self, id):
status=get_status(id) # call external server status
results, future_results = None, None
if status==Error:
pass
elif status==Completed:
results=get_results(id) # get results from external server
elif status!=Completed:
with pipeline.InOrder():
yield Delay(seconds=CallbackWait) # NB
future_results=yield CallbackPipeline(id)
yield ReducePipeline(results, future_results)
class StartPipeline(pipeline.Pipeline):
def run(self, request):
id=start(request) # post request to external server; get job id in return
yield CallbackPipeline(id)
"""
is this really the best way to retry the pipeline ? dev_appserver.py results don't look promising :-(
"""
def finalized(self):
if not self.outputs.default:
raise pipeline.Retry()
but this doesn't seem to work on dev_appserver.py, simply causing repeated errors at the StartPipeline finalisation stage. I was hoping to get the entire StartPipeline to retry instead.
Can anyone advise me on a sensible pattern for retrying the StartPipeline on receipt of None results ?
Thanks.