twisted reactor not running properly inside celery

68 views
Skip to first unread message

Prashant Gaur

unread,
Apr 13, 2020, 8:53:21 AM4/13/20
to celery-users
**System/Dependencies details:**

    CPU --> 4
    requirements
--> celery==4.3.0, twisted==19.7.0 , python3.7


Below is the celery setup I have -->

    from threading import Thread
   
from celery import Celery
   
from twisted.internet import threads, reactor, defer
   
from twisted.web.error import Error
   
from celery import signals
   
    app
=   Celery('tasks', broker='pyamqp://guest@localhost//')
   
   
@signals.worker_process_init.connect
   
def configure_infrastructure(**kwargs):
       
Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
       
print('started new thread')
   
   
@signals.worker_process_shutdown.connect()
   
def shutdown_reactor(**kwargs):
       
"""
        This is invoked when the individual workers shut down. It just stops the twisted reactor
        @param kwargs:
        @return:
        """

        reactor
.callFromThread(reactor.stop)
       
print('REACTOR SHUTDOWN')


   
def getPage(inp):
       
print(inp)
       
return inp
   
   
def inThread():
       
print('inside inthread method')
       
try:
            result
= threads.blockingCallFromThread(
                reactor
, getPage, "http://twistedmatrix.com/")
       
except Exception as exc:
           
print(exc)
       
else:
           
print(result)
       
#reactor.callFromThread(reactor.stop)
   
   
   
@app.task
   
def add(x, y):
       
print('inside add method')
        inThread
()
   
#    reactor.run()
       
return x + y



**Running celery worker like below:**

   
 celery -A run worker --loglevel=info


**Logs when celery start:**

    (2_env) ubuntu@gpy:~/app/env/src$ celery -A run worker --loglevel=info
   
   
[tasks]
     
. run.add
   
   
[2020-04-09 07:25:29,357: WARNING/Worker-1] started new thread
   
[2020-04-09 07:25:29,362: WARNING/Worker-4] started new thread
   
[2020-04-09 07:25:29,362: WARNING/Worker-3] started new thread
   
[2020-04-09 07:25:29,364: WARNING/Worker-2] started new thread
   
[2020-04-09 07:25:29,367: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//

    
     
**calling method like below:**

    >>> run.add.delay(1,2)
   
<AsyncResult: d41680fd-7cc1-4e75-81be-6496bad0cc16>
   
>>>

 

**sometimes I can see it is working fine.**

    [2020-04-09 07:27:17,998: INFO/MainProcess] Received task: run.add[00934769-48c4-48b8-852c-8b746bdd5e03]
   
[2020-04-09 07:27:17,999: WARNING/Worker-4] inside add method
   
[2020-04-09 07:27:17,999: WARNING/Worker-4] inside inthread method
   
[2020-04-09 07:27:18,000: WARNING/Worker-4] http://twistedmatrix.com/
   
[2020-04-09 07:27:18,000: WARNING/Worker-4] http://twistedmatrix.com/
   
[2020-04-09 07:27:18,000: INFO/MainProcess] Task run.add[00934769-48c4-48b8-852c-8b746bdd5e03] succeeded in 0.00144551398989s: 3


**Sometimes I can see it's not able to call `getPage` method and got hung like below logs** 

    [2020-04-09 07:27:22,318: INFO/MainProcess] Received task: run.add[d41680fd-7cc1-4e75-81be-6496bad0cc16]
   
[2020-04-09 07:27:22,319: WARNING/Worker-2] inside add method
   
[2020-04-09 07:27:22,319: WARNING/Worker-2] inside inthread method


is there any issue in using `reactor.run` inside `Thread`?  

Reply all
Reply to author
Forward
0 new messages