RPC-extension and heartbeat if my service doing long task (above 20min)

101 views
Skip to first unread message

ts...@atlas.ru

unread,
Apr 2, 2018, 5:31:16 AM4/2/18
to nameko-dev
The error lies in the fact that the received task from the queue returns to the queue back some time later, if you look at the logs there is an error associated with the timeouts. 
Here are the detailed logs and question, how do I increase the count of heartbeats?

>> TRACE

2018-03-07 15:14:07,102 - WARNING - Connection to broker lost, trying to re-establish connection...
Traceback (most recent call last):
 
File "/home/etl/.local/lib/python3.5/site-packages/kombu/mixins.py", line 199, in consume
 conn
.drain_events(timeout=safety_interval)
 
File "/home/etl/.local/lib/python3.5/site-packages/kombu/connection.py", line 288, in drain_events
 
return self.transport.drain_events(self.connection, **kwargs)
 
File "/home/etl/.local/lib/python3.5/site-packages/kombu/transport/pyamqp.py", line 95, in drain_events
 
return connection.drain_events(**kwargs)
 
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py", line 303, in drain_events
 chanmap
, None, timeout=timeout,
 
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py", line 366, in _wait_multiple
 channel
, method_sig, args, content = read_timeout(timeout)
 
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py", line 337, in read_timeout
 
return self.method_reader.read_method()
 
File "/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py", line 189, in read_method
 
raise m
 
File "/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py", line 107, in _next_method
 frame_type
, channel, payload = read_frame()
 
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py", line 154, in read_frame
 frame_header
= read(7, True)
 
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py", line 277, in _read
 s
= recv(n - len(rbuf))
 
File "/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py", line 360, in recv
 
return self._recv_loop(self.fd.recv, b'', bufsize, flags)
 
File "/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py", line 354, in _recv_loop
 
self._read_trampoline()
 
File "/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py", line 325, in _read_trampoline
 timeout_exc
=socket_timeout('timed out'))
 
File "/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py", line 207, in _trampoline
 mark_as_closed
=self._mark_as_closed)
 
File "/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/__init__.py", line 163, in trampoline
 
return hub.switch()
 
File "/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/hub.py", line 295, in switch
 
return self.greenlet.switch()
socket
.timeout: timed out

Thanks all!

juko...@gmail.com

unread,
Apr 2, 2018, 12:24:55 PM4/2/18
to nameko-dev
I had a similar issue.

The default is to send a heartbeat every 60 seconds, with 3 retries.

You can increase the heartbeat timeout with the `HEARTBEAT` config value.

If your 'long task' happens to be a SubProcess call (like mine was) you should make sure you import the 'greened' Subprocess module (Subprocess is not monkey-patched automatically, you have to specifically import it).

Geoff
Message has been deleted

alex...@gmail.com

unread,
Apr 12, 2018, 5:41:06 AM4/12/18
to nameko-dev
Yeap! Thanks for advice! I fixed it with eventlet. 

Davis Kirkendall

unread,
May 23, 2018, 11:54:21 AM5/23/18
to nameko-dev
I'm having problems with setting the "HEARTBEAT" parameter.  It seems like kombu (or the underlying amqp lib) uses the minimum of server-heartbeat and client-heartbeat.  So if the server heartbeat is set to 30 seconds, we can't increase the heartbeat to 5 minutes for example.

Is there any (functional) reason that the client heartbeat can't be larger than the server heartbeat?

btw. This issue here: https://github.com/nameko/nameko/issues/477 deals with the same problem and has some more solutions (unfortunately the tpool solution does not work for all cases, hence the heartbeat question above).  

Davis

Matt Yule-Bennett

unread,
Jul 16, 2018, 4:24:51 AM7/16/18
to nameko-dev
Davis,

Sorry, I completely missed this post. 

The server setting the max heartbeat is a RabbitMQ design. See https://www.rabbitmq.com/heartbeats.html#heartbeats-timeout:

> Note that in case RabbitMQ server has a non-zero heartbeat timeout configured (which is the default in versions starting with 3.6.x), the client can only lower the value but not increase it.

However, a service method that blocks for more than a few seconds is a bad idea. As I described in https://github.com/nameko/nameko/issues/477, this is just part of how implicitly yielding threads work. You must yield occasionally otherwise any other threads will starve (which is what's happening to the AMQP consumer thread here)

In this situation you generally have two choices:

1. Explicitly yield. If you've got some code that's purely CPU bound with no I/O, make sure insert an eventlet.yield() occasionally
2. If you can't insert a yield, for example because the blocking code is in some library that you don't control, use the tpool (this is exactly what tpool is for)  
Reply all
Reply to author
Forward
0 new messages