[rabbitmq-discuss] suspending and resuming a QueueingConsumer

41 views
Skip to first unread message

Jim Irrer

unread,
Jan 13, 2010, 2:06:57 PM1/13/10
to rabbitmq-discuss, Jim Irrer
Hi -

I have several consumers reading from the same queue.  I would like
to be able to interrupt their pending read to suspend and resume any
one of them.   I've played around with:

Channel.basicCancel(consumerTag) : don't know how to resume
Channel.abort()    produces com.rabbitmq.client.ShutdownSignalException
Channel.close()   produces com.rabbitmq.client.ShutdownSignalException

The close() and abort() methods seem to act about the same.  I can resume
reading from the queue by creating a new channel and a new QueueingConsumer.
I could not figure out how to resume after a basicCancel.

Is using close() and then re-constructing the channel and QueueingConsumer
the right way to go?  Will resources be properly taken care of by garbage collection?

BTW - I use the consumerTag returned by Channel.basicConsume(queueName, consumer)
for the argument to Channel.basicCancel, eg: amq.ctag-1m/H7+SDcZpbzTMgsUyhNg== .
When called, it prints:

Consumer null method handleCancelOk for channel AMQChannel(amqp://gu...@172.20.125.34:5672/,1) threw an exception:
java.lang.NullPointerException
    at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:728)
    at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:721)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCommand(AMQChannel.java:327)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:456)

Thanks for any hints,

- Jim

Jim Irrer     ir...@umich.edu       (734) 647-4409
University of Michigan Hospital Radiation Oncology
519 W. William St.             Ann Arbor, MI 48103

Matthew Sackman

unread,
Jan 14, 2010, 6:24:59 AM1/14/10
to rabbitmq...@lists.rabbitmq.com
Hi Jim,

On Wed, Jan 13, 2010 at 02:06:57PM -0500, Jim Irrer wrote:
> I have several consumers reading from the same queue. I would like
> to be able to interrupt their pending read to suspend and resume any
> one of them. I've played around with:
>
> Channel.basicCancel(consumerTag) : don't know how to resume
> Channel.abort() produces com.rabbitmq.client.ShutdownSignalException
> Channel.close() produces com.rabbitmq.client.ShutdownSignalException
>
> The close() and abort() methods seem to act about the same. I can resume
> reading from the queue by creating a new channel and a new QueueingConsumer.
> I could not figure out how to resume after a basicCancel.
>
> Is using close() and then re-constructing the channel and QueueingConsumer
> the right way to go? Will resources be properly taken care of by garbage
> collection?

What I would do is set QoS prefetch to 1, and make sure that you're
doing ack-ing manually (i.e. don't set noAck). Then, when you want to
"suspend", just delay acking the last message you received. That'll
prevent further messages being sent to you. Once you want to resume,
send the ack and then the next message will be sent down to you. Does
that help?

> BTW - I use the consumerTag returned by Channel.basicConsume(queueName,
> consumer)

> for the argument to Channel.basicCancel, eg: *
> amq.ctag-1m/H7+SDcZpbzTMgsUyhNg==* .


> When called, it prints:
>
> Consumer null method handleCancelOk for channel AMQChannel(amqp://
> gu...@172.20.125.34:5672/,1) threw an exception:
> java.lang.NullPointerException
> at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:728)
> at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:721)
> at
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCommand(AMQChannel.java:327)
> at
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)
> at
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:456)

The Javadoc says:

/**
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
* method before returning.
* @param consumerTag a client- or server-generated consumer tag to establish context
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Cancel
* @see com.rabbitmq.client.AMQP.Basic.CancelOk
*/
void basicCancel(String consumerTag) throws IOException;

The DefaultConsumer does have the handleCancelOk filled in, and Consumer
is an interface, so I'm a little alarmed by the possibility that it
can't find the handleCancelOk method. What consumer class are you using?
- in short, could you send us a small code example that exhibits this
behaviour?

Best wishes,

Matthew

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq...@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

Jim Irrer

unread,
Jan 14, 2010, 11:19:41 AM1/14/10
to rabbitmq...@lists.rabbitmq.com, mat...@lshift.net, Jim Irrer
Matthew -

Thanks for trying but that does not really help.  I want to interrupt a free-running
process that has already posted the read.  To explore this, I modified RabbitMQ's
SimpleConsumer class (below) to post a read (via the nextDelivery method), and
while it is waiting, have another thread close the channel.

Thanks,

- Jim

//   The contents of this file are subject to the Mozilla Public License
//   Version 1.1 (the "License"); you may not use this file except in
//   compliance with the License. You may obtain a copy of the License at
//   http://www.mozilla.org/MPL/
//
//   Software distributed under the License is distributed on an "AS IS"
//   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
//   License for the specific language governing rights and limitations
//   under the License.
//
//   The Original Code is RabbitMQ.
//
//   The Initial Developers of the Original Code are LShift Ltd,
//   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
//
//   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
//   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
//   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
//   Technologies LLC, and Rabbit Technologies Ltd.
//
//   Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
//   Ltd. Portions created by Cohesive Financial Technologies LLC are
//   Copyright (C) 2007-2009 Cohesive Financial Technologies
//   LLC. Portions created by Rabbit Technologies Ltd are Copyright
//   (C) 2007-2009 Rabbit Technologies Ltd.
//
//   All Rights Reserved.
//
//   Contributor(s): ______________________________________.
//

package com.rabbitmq.examples;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class SimpleConsumerCancel implements Runnable {
   
    private static int count = 0;
   
    private static Channel channel = null;
   
    private static String consumerTag = null;

    public static void main(String[] args) {
        try {
            String hostName = (args.length > 0) ? args[0] : "172.20.125.34";
            int portNumber = (args.length > 1) ? Integer.parseInt(args[1]) : AMQP.PROTOCOL.PORT;
            String queueName = (args.length > 2) ? args[2] : "SimpleQueue";

            ConnectionFactory connFactory = new ConnectionFactory();
            Connection conn = connFactory.newConnection(hostName, portNumber);

            channel = conn.createChannel();

            channel.queueDeclare(queueName);

            Thread thread = new Thread(new SimpleConsumerCancel());
            thread.start();

            QueueingConsumer consumer = new QueueingConsumer(channel);
            consumerTag = channel.basicConsume(queueName, consumer);
            System.out.println("consumerTag: " + consumerTag);
            while (true) {
                try {
                    System.out.println("Waiting for delivery.");
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    System.out.println("\nGot delivery.");
                    count++;
                    System.out.println("SimpleConsumerCancel got Message: " + new String(delivery.getBody()) + " : " + count);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
                catch (Exception ex) {
                    System.err.println("Read thread caught exception: " + ex);

                    channel = conn.createChannel();
                    channel.queueDeclare(queueName);
                    consumer = new QueueingConsumer(channel);
                    consumerTag = channel.basicConsume(queueName, consumer);
                    System.out.println("consumerTag: " + consumerTag);
                }
            }
        }
        catch (Exception ex) {
            System.err.println("Main thread caught exception: " + ex);
            //ex.printStackTrace();
        }
        System.out.println("\nMain thread exiting normally.");
    }

    @Override
    public void run() {

        while (true) {
            try {
                System.out.println("Cancel thread is sleeping...");
                Thread.sleep(4000);
                System.out.println("Going to cancel channel with consumerTag: " + consumerTag);
                // channel.basicCancel(consumerTag);
                channel.close();
                // channel.abort();
                System.out.println("\nCancelled channel with consumerTag: " + consumerTag);
            }
            catch (Exception ex) {
                System.err.println("SimpleConsumerCancel ex: " + ex);
                //ex.printStackTrace();

            }
        }

    }
}



Jim Irrer     ir...@umich.edu       (734) 647-4409
University of Michigan Hospital Radiation Oncology
519 W. William St.             Ann Arbor, MI 48103


Matthew Sackman

unread,
Jan 14, 2010, 12:04:34 PM1/14/10
to rabbitmq...@lists.rabbitmq.com
Hi Jim,

On Thu, Jan 14, 2010 at 11:19:41AM -0500, Jim Irrer wrote:
> Matthew -
>
> Thanks for trying but that does not really help. I want to interrupt a
> free-running
> process that has already posted the read. To explore this, I modified
> RabbitMQ's
> SimpleConsumer class (below) to post a read (via the nextDelivery method),
> and
> while it is waiting, have another thread close the channel.

Hmm. Could you explain why you're wanting to do this? I'm sufficiently
confused that I'm suspecting there's a better solution to the problem
you're trying to solve.

Jim Irrer

unread,
Jan 14, 2010, 1:30:13 PM1/14/10
to rabbitmq...@lists.rabbitmq.com, Jim Irrer
Matthew -

I'm trying to build an infrastructure on top of RabbitMQ that allows
an administrator to remotely suspend and resume a server.

Servers generally run in a loop, and in many cases spend most of
their time (very efficiently) waiting on read.  Given such a server,
we wish to suspend it before it processes another message.

If there are several servers reading from the same queue, and
we want to suspend one of them, and if we used the ack-ing manually
approach, then one message would be held by the suspended server
and could not be processed by any of the others.  We would like that
message to instead be processed by one of the other servers.  I suppose
that the suspended server could write the message back to the queue,
but that seems a little like a hack.

Sorry if I'm not being clear,

- Jim


Jim Irrer     ir...@umich.edu       (734) 647-4409
University of Michigan Hospital Radiation Oncology
519 W. William St.             Ann Arbor, MI 48103


Matthew Sackman

unread,
Jan 15, 2010, 7:28:43 AM1/15/10
to rabbitmq...@lists.rabbitmq.com
Hi Jim,

On Thu, Jan 14, 2010 at 01:30:13PM -0500, Jim Irrer wrote:
> Servers generally run in a loop, and in many cases spend most of
> their time (very efficiently) waiting on read. Given such a server,
> we wish to suspend it before it processes another message.
>
> If there are several servers reading from the same queue, and
> we want to suspend one of them, and if we used the ack-ing manually
> approach, then one message would be held by the suspended server
> and could not be processed by any of the others. We would like that
> message to instead be processed by one of the other servers. I suppose
> that the suspended server could write the message back to the queue,
> but that seems a little like a hack.

Ok, that makes sense. Is there a reason why you prefer suspending and
resuming rather than just stopping/killing a consumer and starting a new
one up?

From your description, I'd have thought that closing the channel and
maybe connection too, is the way to go.

Jim Irrer

unread,
Jan 15, 2010, 8:46:27 AM1/15/10
to rabbitmq...@lists.rabbitmq.com, Jim Irrer
Matthew -

Ok - I think I'll go with the channel close approach.  I was
really just checking to make sure that I was not stressing
the API in a bad way.

Thanks for your advice,


- Jim

Jim Irrer     ir...@umich.edu       (734) 647-4409
University of Michigan Hospital Radiation Oncology
519 W. William St.             Ann Arbor, MI 48103


Tony Garnock-Jones

unread,
Jan 15, 2010, 9:35:19 AM1/15/10
to rabbitmq...@lists.rabbitmq.com
Matthew Sackman wrote:
> From your description, I'd have thought that closing the channel and
> maybe connection too, is the way to go.

Closing the channel sounds like a reasonable thing to do.

Tony

Reply all
Reply to author
Forward
0 new messages