Hello,
firstly I would like to thank for the all effort and nice work you have done on the project.
I am using Kombu with rabbitmq as message passing RPC middleware between python processes where I pass
the python pickle serialized objects to python consumers.
Sometimes and I unfortunately don't have the reproduction scenario to validate the consumer seems to be ignoring the
message and acking it without processing (according to logs):
The producer looks like this (the consumer giving me issues is on "notifier" queue):
self.exchange = Exchange('tasks', type='direct')
try:
ensured_connection = self.connection.ensure_connection(max_retries=20)
producer = ensured_connection.Producer()
ensured_publish = ensured_connection.ensure(producer, producer.publish, max_retries=20)
ensured_publish(payload,
serializer='pickle',
compression='bzip2',
exchange=self.exchange,
routing_key="notifier")
ensured_publish(payload2,
serializer='pickle',
compression='bzip2',
exchange=self.exchange,
routing_key="mmsender")
except Exception as e:
print e
pass
Here are the rabbitmq queues:
+-------+--------------------------------+-------------+-----------+---------+------------------------+---------------------+--------+-----------------+--------+---------+
| vhost | name | auto_delete | consumers | durable | exclusive_consumer_tag | idle_since | memory | node | policy | status |
+-------+--------------------------------+-------------+-----------+---------+------------------------+---------------------+--------+-----------------+--------+---------+
| / | amq.gen-O_-sQGcQepHz4aFJ2Q464A | False | 1 | False | | 2013-10-02 5:37:48 | 7156 | rabbit@gruppuco | | running |
| / | mmsender_queue | False | 1 | True | | 2013-10-02 9:26:43 | 11044 | rabbit@gruppuco | | running |
| / | mmsparse_queue | False | 1 | True | | 2013-10-02 9:49:54 | 11004 | rabbit@gruppuco | | running |
| / | notifier_queue | False | 1 | True | | 2013-10-02 9:26:43 | 11044 | rabbit@gruppuco | | running |
| / | transcoder_queue | False | 1 | True | | 2013-10-01 23:25:11 | 10964 | rabbit@gruppuco | | running |
+-------+--------------------------------+-------------+-----------+---------+------------------------+---------------------+--------+-----------------+--------+---------+
Here are the bindings:
+-------+--------------------+--------------------------------+------------------+--------------------------------+--------------------------------+
| vhost | source | destination | destination_type | routing_key | properties_key |
+-------+--------------------+--------------------------------+------------------+--------------------------------+--------------------------------+
| / | | amq.gen-O_-sQGcQepHz4aFJ2Q464A | queue | amq.gen-O_-sQGcQepHz4aFJ2Q464A | amq.gen-O_-sQGcQepHz4aFJ2Q464A |
| / | | mmsender_queue | queue | mmsender_queue | mmsender_queue |
| / | | mmsparse_queue | queue | mmsparse_queue | mmsparse_queue |
| / | | notifier_queue | queue | notifier_queue | notifier_queue |
| / | | transcoder_queue | queue | transcoder_queue | transcoder_queue |
| / | amq.rabbitmq.trace | amq.gen-O_-sQGcQepHz4aFJ2Q464A | queue | # | %23 |
| / | tasks | mmsender_queue | queue | mmsender | mmsender |
| / | tasks | mmsender_queue | queue | transcoder | transcoder |
| / | tasks | mmsparse_queue | queue | mmsparse | mmsparse |
| / | tasks | notifier_queue | queue | notifier | notifier |
| / | tasks | transcoder_queue | queue | transcoder | transcoder |
+-------+--------------------+--------------------------------+------------------+--------------------------------+--------------------------------+
Here are the connections:
+-------+-----------------------------------+---------+----------------+----------+-----------+-----------+------------------+-----------------+-----------------+------------------+-------------------+--------------------+-----------+-----------+------+--------------+----------+----------+----------+----------+-----------+-------+------------+----------+------------------+--------------+---------+---------+-------+
| vhost | name | type | auth_mechanism | channels | frame_max | host | last_blocked_age | last_blocked_by | node | peer_cert_issuer | peer_cert_subject | peer_cert_validity | peer_host | peer_port | port | protocol | recv_cnt | recv_oct | send_cnt | send_oct | send_pend | ssl | ssl_cipher | ssl_hash | ssl_key_exchange | ssl_protocol | state | timeout | user |
+-------+-----------------------------------+---------+----------------+----------+-----------+-----------+------------------+-----------------+-----------------+------------------+-------------------+--------------------+-----------+-----------+------+--------------+----------+----------+----------+----------+-----------+-------+------------+----------+------------------+--------------+---------+---------+-------+
| / | 127.0.0.1:35219 -> 127.0.0.1:5672 | network | AMQPLAIN | 1 | 131072 | 127.0.0.1 | infinity | none | rabbit@gruppuco | | | | 127.0.0.1 | 35219 | 5672 | AMQP 0-9-1 | 13 | 464 | 12 | 6584 | 0 | False | | | | | running | 0 | guest | | / | 127.0.0.1:46465 -> 127.0.0.1:5672 | network | AMQPLAIN | 1 | 131072 | 127.0.0.1 | infinity | none | rabbit@gruppuco | | | | 127.0.0.1 | 46465 | 5672 | AMQP 0-9-1 | 9 | 388 | 8 | 462 | 0 | False | | | | | running | 0 | guest | | / | 127.0.0.1:55919 -> 127.0.0.1:5672 | network | AMQPLAIN | 1 | 131072 | 127.0.0.1 | infinity | none | rabbit@gruppuco | | | | 127.0.0.1 | 55919 | 5672 | AMQP 0-9-1 | 23 | 674 | 22 | 17583 | 0 | False | | | | | running | 0 | guest | | / | <rab...@gruppuco.1.452.0> | direct | | | | | | | rabbit@gruppuco | | | | | | | Direct 0-9-1 | | | | | | | | | | | | | guest |
| / | [::1]:47227 -> [::1]:5672 | network | PLAIN | 1 | 131072 | ::1 | infinity | none | rabbit@gruppuco | | | | ::1 | 47227 | 5672 | AMQP 0-9-1 | 9 | 342 | 8 | 460 | 0 | False | | | | | running | 0 | guest |
| / | [::1]:47598 -> [::1]:5672 | network | PLAIN | 0 | 131072 | ::1 | infinity | none | rabbit@gruppuco | | | | ::1 | 47598 | 5672 | AMQP 0-9-1 | 4 | 178 | 3 | 371 | 0 | False | | | | | running | 0 | guest |
+-------+-----------------------------------+---------+----------------+----------+-----------+-----------+------------------+-----------------+-----------------+------------------+-------------------+--------------------+-----------+-----------+------+--------------+----------+----------+----------+----------+-----------+-------+------------+----------+------------------+--------------+---------+---------+-------+
Also the tracing is on and I can see that the message comes in and it's sent to consumer:
2013-10-02 4:53:42: Message published
Node: rabbit@gruppuco
Exchange: tasks
Routing keys: [<<"notifier">>]
Properties: [{<<"priority">>,signedint,0},
{<<"delivery_mode">>,signedint,2},
{<<"headers">>,table,
[{<<"compression">>,longstr,<<"application/x-bz2">>}]},
{<<"content_encoding">>,longstr,<<"binary">>},
{<<"content_type">>,longstr,
<<"application/x-python-serialize">>}]
2013-10-02 4:53:42: Message received
Node: rabbit@gruppuco
Exchange: tasks
Queue: notifier_queue
Routing keys: [<<"notifier">>]
Properties: [{<<"priority">>,signedint,0},
{<<"delivery_mode">>,signedint,2},
{<<"headers">>,table,
[{<<"compression">>,longstr,<<"application/x-bz2">>}]},
{<<"content_encoding">>,longstr,<<"binary">>},
{<<"content_type">>,longstr,
<<"application/x-python-serialize">>}]
When I strace the process, I can see that the process is receiving the message:
15042 epoll_ctl(5, EPOLL_CTL_DEL, 3, {EPOLLIN, {u32=3, u64=3}}) = 0
15042 recv(3, "\1\0\1\0\0\0\37\0<\0<\0011\0\0\0\0\0\0\0\5\0\0\16notifier_queue\316\2\0\1\0\0\0\23\0<\0\0\0\0\0\0\0\0\0\0060\0\0\0\0\0\1\316\3\0\1\0\0\0\6testik\316", 131072, 0) = 80
15042 send(3, "\1\0\1\0\0\0\r\0<\0P\0\0\0\0\0\0\0\5\0\316", 21, 0) = 21
15042 recv(3, 0xacc38cc, 131072, 0) = -1 EAGAIN (Resource temporarily unavailable)
15042 epoll_ctl(5, EPOLL_CTL_ADD, 3, {EPOLLIN, {u32=3, u64=3}}) = 0
15042 epoll_wait(5, {}, 32, 1000) = 0
I would like to use Kombu but this issue is driving me crazy, unfortunately I don't have skills and knowledge to properly troubleshoot this one, if you see something obviously wrong please let me know.
Thanks,
Jorge Sanchez