strange Kombu + gevent behaviour

197 views
Skip to first unread message

Jorge Sanchez

unread,
Oct 2, 2013, 3:22:16 AM10/2/13
to carrot...@googlegroups.com
Hello,

firstly I would like to thank for the all effort and nice work you have done on the project. 
I am using Kombu with rabbitmq as message passing RPC middleware between python processes where I pass 
the python pickle serialized objects to python consumers. 

Sometimes and I unfortunately don't have the reproduction scenario to validate the consumer seems to be ignoring the
message and acking it without processing (according to logs):


Here is the consumer: http://pastebin.com/evHVr2zP

The producer looks like this (the consumer giving me issues is on "notifier" queue):

            self.exchange = Exchange('tasks', type='direct')
            try:
                    ensured_connection = self.connection.ensure_connection(max_retries=20)
                    producer = ensured_connection.Producer()
                    ensured_publish = ensured_connection.ensure(producer, producer.publish, max_retries=20)
                    ensured_publish(payload,
                                                serializer='pickle',
                                                compression='bzip2',
                                                exchange=self.exchange,
                                                routing_key="notifier")
                        ensured_publish(payload2,
                                                serializer='pickle',
                                                compression='bzip2',
                                                exchange=self.exchange,
                                                routing_key="mmsender")

                except Exception as e:
                        print e
                        pass



Here are the rabbitmq queues:
+-------+--------------------------------+-------------+-----------+---------+------------------------+---------------------+--------+-----------------+--------+---------+
| vhost |              name              | auto_delete | consumers | durable | exclusive_consumer_tag |     idle_since      | memory |      node       | policy | status  |
+-------+--------------------------------+-------------+-----------+---------+------------------------+---------------------+--------+-----------------+--------+---------+
| /     | amq.gen-O_-sQGcQepHz4aFJ2Q464A | False       | 1         | False   |                        | 2013-10-02 5:37:48  | 7156   | rabbit@gruppuco |        | running |
| /     | mmsender_queue                 | False       | 1         | True    |                        | 2013-10-02 9:26:43  | 11044  | rabbit@gruppuco |        | running |
| /     | mmsparse_queue                 | False       | 1         | True    |                        | 2013-10-02 9:49:54  | 11004  | rabbit@gruppuco |        | running |
| /     | notifier_queue                 | False       | 1         | True    |                        | 2013-10-02 9:26:43  | 11044  | rabbit@gruppuco |        | running |
| /     | transcoder_queue               | False       | 1         | True    |                        | 2013-10-01 23:25:11 | 10964  | rabbit@gruppuco |        | running |
+-------+--------------------------------+-------------+-----------+---------+------------------------+---------------------+--------+-----------------+--------+---------+


Here are the bindings:
+-------+--------------------+--------------------------------+------------------+--------------------------------+--------------------------------+
| vhost |       source       |          destination           | destination_type |          routing_key           |         properties_key         |
+-------+--------------------+--------------------------------+------------------+--------------------------------+--------------------------------+
| /     |                    | amq.gen-O_-sQGcQepHz4aFJ2Q464A | queue            | amq.gen-O_-sQGcQepHz4aFJ2Q464A | amq.gen-O_-sQGcQepHz4aFJ2Q464A |
| /     |                    | mmsender_queue                 | queue            | mmsender_queue                 | mmsender_queue                 |
| /     |                    | mmsparse_queue                 | queue            | mmsparse_queue                 | mmsparse_queue                 |
| /     |                    | notifier_queue                 | queue            | notifier_queue                 | notifier_queue                 |
| /     |                    | transcoder_queue               | queue            | transcoder_queue               | transcoder_queue               |
| /     | amq.rabbitmq.trace | amq.gen-O_-sQGcQepHz4aFJ2Q464A | queue            | #                              | %23                            |
| /     | tasks              | mmsender_queue                 | queue            | mmsender                       | mmsender                       |
| /     | tasks              | mmsender_queue                 | queue            | transcoder                     | transcoder                     |
| /     | tasks              | mmsparse_queue                 | queue            | mmsparse                       | mmsparse                       |
| /     | tasks              | notifier_queue                 | queue            | notifier                       | notifier                       |
| /     | tasks              | transcoder_queue               | queue            | transcoder                     | transcoder                     |
+-------+--------------------+--------------------------------+------------------+--------------------------------+--------------------------------+



Here are the connections:
+-------+-----------------------------------+---------+----------------+----------+-----------+-----------+------------------+-----------------+-----------------+------------------+-------------------+--------------------+-----------+-----------+------+--------------+----------+----------+----------+----------+-----------+-------+------------+----------+------------------+--------------+---------+---------+-------+
| vhost |               name                |  type   | auth_mechanism | channels | frame_max |   host    | last_blocked_age | last_blocked_by |      node       | peer_cert_issuer | peer_cert_subject | peer_cert_validity | peer_host | peer_port | port |   protocol   | recv_cnt | recv_oct | send_cnt | send_oct | send_pend |  ssl  | ssl_cipher | ssl_hash | ssl_key_exchange | ssl_protocol |  state  | timeout | user  |
+-------+-----------------------------------+---------+----------------+----------+-----------+-----------+------------------+-----------------+-----------------+------------------+-------------------+--------------------+-----------+-----------+------+--------------+----------+----------+----------+----------+-----------+-------+------------+----------+------------------+--------------+---------+---------+-------+
| /     | 127.0.0.1:35219 -> 127.0.0.1:5672 | network | AMQPLAIN       | 1        | 131072    | 127.0.0.1 | infinity         | none            | rabbit@gruppuco |                  |                   |                    | 127.0.0.1 | 35219     | 5672 | AMQP 0-9-1   | 13       | 464      | 12       | 6584     | 0         | False |            |          |                  |              | running | 0       | guest |
| /     | 127.0.0.1:46465 -> 127.0.0.1:5672 | network | AMQPLAIN       | 1        | 131072    | 127.0.0.1 | infinity         | none            | rabbit@gruppuco |                  |                   |                    | 127.0.0.1 | 46465     | 5672 | AMQP 0-9-1   | 9        | 388      | 8        | 462      | 0         | False |            |          |                  |              | running | 0       | guest |
| /     | 127.0.0.1:55919 -> 127.0.0.1:5672 | network | AMQPLAIN       | 1        | 131072    | 127.0.0.1 | infinity         | none            | rabbit@gruppuco |                  |                   |                    | 127.0.0.1 | 55919     | 5672 | AMQP 0-9-1   | 23       | 674      | 22       | 17583    | 0         | False |            |          |                  |              | running | 0       | guest |
| /     | <rab...@gruppuco.1.452.0>         | direct  |                |          |           |           |                  |                 | rabbit@gruppuco |                  |                   |                    |           |           |      | Direct 0-9-1 |          |          |          |          |           |       |            |          |                  |              |         |         | guest |
| /     | [::1]:47227 -> [::1]:5672         | network | PLAIN          | 1        | 131072    | ::1       | infinity         | none            | rabbit@gruppuco |                  |                   |                    | ::1       | 47227     | 5672 | AMQP 0-9-1   | 9        | 342      | 8        | 460      | 0         | False |            |          |                  |              | running | 0       | guest |
| /     | [::1]:47598 -> [::1]:5672         | network | PLAIN          | 0        | 131072    | ::1       | infinity         | none            | rabbit@gruppuco |                  |                   |                    | ::1       | 47598     | 5672 | AMQP 0-9-1   | 4        | 178      | 3        | 371      | 0         | False |            |          |                  |              | running | 0       | guest |
+-------+-----------------------------------+---------+----------------+----------+-----------+-----------+------------------+-----------------+-----------------+------------------+-------------------+--------------------+-----------+-----------+------+--------------+----------+----------+----------+----------+-----------+-------+------------+----------+------------------+--------------+---------+---------+-------+


Also the tracing is on and I can see that the message comes in and it's sent to consumer:

2013-10-02 4:53:42: Message published

Node:         rabbit@gruppuco
Exchange:     tasks
Routing keys: [<<"notifier">>]
Properties:   [{<<"priority">>,signedint,0},
               {<<"delivery_mode">>,signedint,2},
               {<<"headers">>,table,
                [{<<"compression">>,longstr,<<"application/x-bz2">>}]},
               {<<"content_encoding">>,longstr,<<"binary">>},
               {<<"content_type">>,longstr,
                <<"application/x-python-serialize">>}]


2013-10-02 4:53:42: Message received

Node:         rabbit@gruppuco
Exchange:     tasks
Queue:        notifier_queue
Routing keys: [<<"notifier">>]
Properties:   [{<<"priority">>,signedint,0},
               {<<"delivery_mode">>,signedint,2},
               {<<"headers">>,table,
                [{<<"compression">>,longstr,<<"application/x-bz2">>}]},
               {<<"content_encoding">>,longstr,<<"binary">>},
               {<<"content_type">>,longstr,
                <<"application/x-python-serialize">>}]


When I strace the process, I can see that the process is receiving the message:
15042 epoll_ctl(5, EPOLL_CTL_DEL, 3, {EPOLLIN, {u32=3, u64=3}}) = 0
15042 recv(3, "\1\0\1\0\0\0\37\0<\0<\0011\0\0\0\0\0\0\0\5\0\0\16notifier_queue\316\2\0\1\0\0\0\23\0<\0\0\0\0\0\0\0\0\0\0060\0\0\0\0\0\1\316\3\0\1\0\0\0\6testik\316", 131072, 0) = 80
15042 send(3, "\1\0\1\0\0\0\r\0<\0P\0\0\0\0\0\0\0\5\0\316", 21, 0) = 21
15042 recv(3, 0xacc38cc, 131072, 0)     = -1 EAGAIN (Resource temporarily unavailable)
15042 epoll_ctl(5, EPOLL_CTL_ADD, 3, {EPOLLIN, {u32=3, u64=3}}) = 0
15042 epoll_wait(5, {}, 32, 1000)       = 0



I would like to use Kombu but this issue is driving me crazy, unfortunately I don't have skills and knowledge to properly troubleshoot this one, if you see something obviously wrong please let me know. 


Thanks,

Jorge Sanchez

Ask Solem

unread,
Oct 3, 2013, 11:33:25 AM10/3/13
to carrot...@googlegroups.com

On Oct 2, 2013, at 8:22 AM, Jorge Sanchez <xsa...@gmail.com> wrote:

> Hello,
>
> firstly I would like to thank for the all effort and nice work you have done on the project.
> I am using Kombu with rabbitmq as message passing RPC middleware between python processes where I pass
> the python pickle serialized objects to python consumers.
>
> Sometimes and I unfortunately don't have the reproduction scenario to validate the consumer seems to be ignoring the
> message and acking it without processing (according to logs):
>
>
> Here is the consumer: http://pastebin.com/evHVr2zP


Hey Jorge!

The ConsumerMixin has a default on_decode_error callback set to log an error
and ack the message.

Maybe logging is not setup so that the error message is silent?

from kombu.log import setup_logging
setup_logging()


You can also specify a custom on_decode_error to propagate the error:

class Consumer(ConsumerMixin):

def on_decode_error(self, message, exc):
raise




--
Ask Solem
twitter.com/asksol

Reply all
Reply to author
Forward
0 new messages