chain() very slow with large number of tasks

55 views
Skip to first unread message

Arek Oll

unread,
Dec 4, 2019, 12:28:22 AM12/4/19
to celery-users
We have a job with thousands of tasks, from which some of them can run in parallel while other have to wait. 
Combination of chain() & group() works when the number of tasks is small. 

from celery import Celery, signature, chain, group
import time


celery
= Celery(
           
'some_project',
            broker
='redis://redis:6379/0',
            task_serializer
='json',
            task_ignore_result
=True
       
)


def get_sig():
 
return signature('tasks.run_node', queue='default',
 immutable
=True, kwargs={
     
'node_id': 7,
     
'job_id': 22,
     
'job_task_id': '#xx, Not too long string',
     
'backend': {
         
'logger_url': 'https://xxxxxxxxxx.yyy/some_url?awdawdawd=awdwadwadawd',
         
'job_details_url': 'https://zzzzzzzzzzzzzzzzz.yyy/some_other_url?ddwczcd=qwdwa'
     
}
 
})


for x in range(1, 5):


 chain_tasks
= [ get_sig() for i in range(x*300) ]
 start
= time.time()
 result
= chain(chain_tasks).apply_async()
 
print(f'{x*300} tasks chain, took: {str(time.time() - start)}s')


 chain_tasks
= []
 chain_tasks
.append( group([ get_sig() for i in range(x*300) ]) )
 start
= time.time()
 result
= chain(chain_tasks).apply_async()
 
print(f'{x*300} tasks group, took: {str(time.time() - start)}s')

results in :
300 tasks chain, took: 0.910630464553833s
300 tasks group, took: 0.49317383766174316s
600 tasks chain, took: 3.5126118659973145s
600 tasks group, took: 1.040590763092041s
900 tasks chain, took: 7.6711084842681885s
900 tasks group, took: 1.437443733215332s
1200 tasks chain, took: 13.37359619140625s
1200 tasks group, took: 1.9136178493499756s

Any idea what is taking so long and how to make it faster?

Reply all
Reply to author
Forward
0 new messages