On Wed, Jan 13, 2010 at 02:06:57PM -0500, Jim Irrer wrote:
> I have several consumers reading from the same queue. I would like
> to be able to interrupt their pending read to suspend and resume any
> one of them. I've played around with:
>
> Channel.basicCancel(consumerTag) : don't know how to resume
> Channel.abort() produces com.rabbitmq.client.ShutdownSignalException
> Channel.close() produces com.rabbitmq.client.ShutdownSignalException
>
> The close() and abort() methods seem to act about the same. I can resume
> reading from the queue by creating a new channel and a new QueueingConsumer.
> I could not figure out how to resume after a basicCancel.
>
> Is using close() and then re-constructing the channel and QueueingConsumer
> the right way to go? Will resources be properly taken care of by garbage
> collection?
What I would do is set QoS prefetch to 1, and make sure that you're
doing ack-ing manually (i.e. don't set noAck). Then, when you want to
"suspend", just delay acking the last message you received. That'll
prevent further messages being sent to you. Once you want to resume,
send the ack and then the next message will be sent down to you. Does
that help?
> BTW - I use the consumerTag returned by Channel.basicConsume(queueName,
> consumer)
> for the argument to Channel.basicCancel, eg: *
> amq.ctag-1m/H7+SDcZpbzTMgsUyhNg==* .
> When called, it prints:
>
> Consumer null method handleCancelOk for channel AMQChannel(amqp://
> gu...@172.20.125.34:5672/,1) threw an exception:
> java.lang.NullPointerException
> at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:728)
> at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:721)
> at
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCommand(AMQChannel.java:327)
> at
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)
> at
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:456)
The Javadoc says:
/**
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
* method before returning.
* @param consumerTag a client- or server-generated consumer tag to establish context
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Cancel
* @see com.rabbitmq.client.AMQP.Basic.CancelOk
*/
void basicCancel(String consumerTag) throws IOException;
The DefaultConsumer does have the handleCancelOk filled in, and Consumer
is an interface, so I'm a little alarmed by the possibility that it
can't find the handleCancelOk method. What consumer class are you using?
- in short, could you send us a small code example that exhibits this
behaviour?
Best wishes,
Matthew
_______________________________________________
rabbitmq-discuss mailing list
rabbitmq...@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
On Thu, Jan 14, 2010 at 11:19:41AM -0500, Jim Irrer wrote:
> Matthew -
>
> Thanks for trying but that does not really help. I want to interrupt a
> free-running
> process that has already posted the read. To explore this, I modified
> RabbitMQ's
> SimpleConsumer class (below) to post a read (via the nextDelivery method),
> and
> while it is waiting, have another thread close the channel.
Hmm. Could you explain why you're wanting to do this? I'm sufficiently
confused that I'm suspecting there's a better solution to the problem
you're trying to solve.
On Thu, Jan 14, 2010 at 01:30:13PM -0500, Jim Irrer wrote:
> Servers generally run in a loop, and in many cases spend most of
> their time (very efficiently) waiting on read. Given such a server,
> we wish to suspend it before it processes another message.
>
> If there are several servers reading from the same queue, and
> we want to suspend one of them, and if we used the ack-ing manually
> approach, then one message would be held by the suspended server
> and could not be processed by any of the others. We would like that
> message to instead be processed by one of the other servers. I suppose
> that the suspended server could write the message back to the queue,
> but that seems a little like a hack.
Ok, that makes sense. Is there a reason why you prefer suspending and
resuming rather than just stopping/killing a consumer and starting a new
one up?
From your description, I'd have thought that closing the channel and
maybe connection too, is the way to go.
Closing the channel sounds like a reasonable thing to do.
Tony