from celery import Celery, signature, chain, group
import time
celery = Celery(
'some_project',
broker='redis://redis:6379/0',
task_serializer='json',
task_ignore_result=True
)
def get_sig():
return signature('tasks.run_node', queue='default',
immutable=True, kwargs={
'node_id': 7,
'job_id': 22,
'job_task_id': '#xx, Not too long string',
'backend': {
'logger_url': 'https://xxxxxxxxxx.yyy/some_url?awdawdawd=awdwadwadawd',
'job_details_url': 'https://zzzzzzzzzzzzzzzzz.yyy/some_other_url?ddwczcd=qwdwa'
}
})
for x in range(1, 5):
chain_tasks = [ get_sig() for i in range(x*300) ]
start = time.time()
result = chain(chain_tasks).apply_async()
print(f'{x*300} tasks chain, took: {str(time.time() - start)}s')
chain_tasks = []
chain_tasks.append( group([ get_sig() for i in range(x*300) ]) )
start = time.time()
result = chain(chain_tasks).apply_async()
print(f'{x*300} tasks group, took: {str(time.time() - start)}s')
300 tasks chain, took: 0.910630464553833s
300 tasks group, took: 0.49317383766174316s
600 tasks chain, took: 3.5126118659973145s
600 tasks group, took: 1.040590763092041s
900 tasks chain, took: 7.6711084842681885s
900 tasks group, took: 1.437443733215332s
1200 tasks chain, took: 13.37359619140625s
1200 tasks group, took: 1.9136178493499756s