how to stop and start consuming after a few messages

233 views
Skip to first unread message

james jasper

unread,
Jan 25, 2023, 12:48:38 PM1/25/23
to rabbitmq-users
Hi,
Im currently using pika version 1.3.1, and erlang 22.2.7.  Im currently using blocking connection. My consumer can process only few messages i just wanted to know how  can i stop consuming after a few messages and start again and repeat the process.  

Luke Bakken

unread,
Jan 25, 2023, 6:30:52 PM1/25/23
to rabbitmq-users
Hello,

A great way to start this discussion would be to provide a git repository I could clone to see what you're doing now.

One option is to set the channel prefetch to the number of messages you'd like to process at a time. Only acknowledge the batch of messages when all are done, then you would get the next batch. No need to stop / start consuming in that case.

Thanks,
Luke

james jasper

unread,
Jan 26, 2023, 3:31:52 AM1/26/23
to rabbitmq-users

I apologize for not adding the repo, here it is https://github.com/Opperessor/rabbitmq_pika/blob/main/consumer.py  . So im trying to consume messages based on size. once the size limit(4000 bytes ) it reached i have stop consuming and process it and start consuming again.

Luke Bakken

unread,
Jan 26, 2023, 12:06:47 PM1/26/23
to rabbitmq-users
Hi James,

Please see the following PR:

You were not using getsizeof() correctly. The function is not recursive by default - https://docs.python.org/3/library/sys.html#sys.getsizeof

I made corrections to your use of Pika as well. If you need further assistance let me know.

Thanks,
Luke

james jasper

unread,
Jan 27, 2023, 5:52:42 AM1/27/23
to rabbitmq-users
hi luke,i
thanks for the reply and correction. From the modified code i'm not able to process messages for 20 minutes its not able to publish, referring to this(for long running process) document https://github.com/pika/pika/blob/main/examples/basic_consumer_threaded.py , i made few changes to the code in this repo https://github.com/Opperessor/rabbitmq_pika/blob/test_branch/consumer.py
getting error as  raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer'). Could you pls help me out.

Thanks,
james

Luke Bakken

unread,
Jan 27, 2023, 12:23:19 PM1/27/23
to rabbitmq-users
What is logged by RabbitMQ at that point in time?

james jasper

unread,
Jan 27, 2023, 1:34:00 PM1/27/23
to rabbitmq-users
hi luke, checked the log, got this missed heartbeats from client, timeout: 5s on server side log.

Luke Bakken

unread,
Jan 27, 2023, 1:56:15 PM1/27/23
to rabbitmq-users
Thanks. I see the issue with your code and will open a new PR. Basically you're still doing work on the same thread that Pika is using and thus your work blocks Pika's I/O loop.

Luke Bakken

unread,
Jan 27, 2023, 4:13:10 PM1/27/23
to rabbitmq-users
https://github.com/Opperessor/rabbitmq_pika/pull/3

I will comment on the PR itself as to what I changed.

james jasper

unread,
Feb 6, 2023, 9:40:15 AM2/6/23
to rabbitmq-users
Hi, Luke thank you so much for the reply, there seems to be no connection timeout error. However when the queue is declared as a quorum queue multiple consumers are produced and few messages are in unackd state(if the messages in the consume queue are in odd number like 9, one message goes to unack state and 2 consumers can be seen), I tried to set it as x-single-active-consumer or using cancel() method didnt seem to work. This is my code https://github.com/Opperessor/rabbitmq_pika/blob/feature_branch/consumer.py made few changes referring from the pr you raised(thank you so much)  changed to consume generator to make use of the timeout option and i tried to avoid recursive way, the consumer is being increased still could you pls help me out.
Thanks,
james

Luke Bakken

unread,
Feb 6, 2023, 3:33:30 PM2/6/23
to rabbitmq-users
Hello,

Please re-read what I wrote in my previous pull request: https://github.com/Opperessor/rabbitmq_pika/pull/3#issue-1560439218

I don't think you should use a "for" loop consumer if you plan on cancelling it.

At this point your consuming code is complicated enough that you should consider using SelectConnection rather than BlockingConnection.

I'll see if I have time to assist you further.

Thanks,
Luke

james jasper

unread,
Feb 7, 2023, 1:59:22 AM2/7/23
to rabbitmq-users
Sure Luke,
thanks 

Luke Bakken

unread,
Feb 9, 2023, 2:06:02 PM2/9/23
to rabbitmq-users

james jasper

unread,
Feb 19, 2023, 11:52:52 PM2/19/23
to rabbitmq-users
Thank you so much! Luke
Reply all
Reply to author
Forward
0 new messages