Problem with Akka-AMQP and Apache QPid

565 views
Skip to first unread message

Mario Camou

unread,
Mar 14, 2011, 10:10:10 AM3/14/11
to Akka User List
Hi all,

I'm trying to get an Akka-AMQP app running with an Apache QPid server. I'm getting the following exception:

15:07:24.061 [main] DEBUG cc.abstra.gyokuro.gateway.Gateway$ - AMQP initialized. Incoming queue: [gateway], outgoing queue: [ruote]
15:07:24.062 [akka:event-driven:dispatcher:amqp-producers-3] DEBUG akka.dispatch.MonitorableThread - Created thread akka:event-driven:dispatcher:amqp-producers-3
15:07:24.063 [main] INFO  cc.abstra.gyokuro.gateway.Gateway$ - Waiting for connections on port 1234
15:07:24.070 [akka:event-driven:dispatcher:global-4] DEBUG akka.dispatch.MonitorableThread - Created thread akka:event-driven:dispatcher:global-4
15:07:24.071 [akka:event-driven:dispatcher:amqp-producers-3] INFO  akka.amqp.ProducerActor - AMQP.Poducer[id= amqp.producer, exchangeParameters=Some(ExchangeParameters(ruote,direct,ActiveDeclaration(true,false,false),Map()))] is requesting new channel from supervising connection
15:07:24.123 [akka:event-driven:dispatcher:amqp-consumers-2] DEBUG akka.amqp.ConsumerActor - Actively declaring new queue [gateway] for AMQP.Consumer[id= akka.amqp.ConsumerActor, exchangeParameters=Some(ExchangeParameters(gateway,direct,ActiveDeclaration(true,false,false),Map())), queueDeclaration=ActiveDeclaration(true,false,false)]
15:07:24.124 [akka:event-driven:dispatcher:amqp-producers-3] INFO  akka.amqp.ProducerActor - Channel setup for AMQP.Poducer[id= amqp.producer, exchangeParameters=Some(ExchangeParameters(ruote,direct,ActiveDeclaration(true,false,false),Map()))]
15:07:24.130 [akka:event-driven:dispatcher:amqp-consumers-2] DEBUG akka.amqp.ConsumerActor - Binding new queue [gateway] with [gateway] for AMQP.Consumer[id= akka.amqp.ConsumerActor, exchangeParameters=Some(ExchangeParameters(gateway,direct,ActiveDeclaration(true,false,false),Map())), queueDeclaration=ActiveDeclaration(true,false,false)]
15:07:24.140 [akka:event-driven:dispatcher:amqp-consumers-2] ERROR akka.actor.Actor$ - Exception when invoking 
actor [Actor[akka.amqp.ConsumerActor:627e4560-4e44-11e0-b676-000c2906f997]] 
with message [Start]
15:07:24.142 [akka:event-driven:dispatcher:global-5] DEBUG akka.dispatch.MonitorableThread - Created thread akka:event-driven:dispatcher:global-5
15:07:24.151 [akka:event-driven:dispatcher:global-5] ERROR a.amqp.FaultTolerantConnectionActor - ConnectionShutdown is hard error - self terminating
com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:620) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:467) ~[amqp-client-1.8.1.jar:na]
java.io.EOFException: null
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:273) ~[na:1.6.0_24]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:118) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:133) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:409) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:437) ~[amqp-client-1.8.1.jar:na]
15:07:24.152 [akka:event-driven:dispatcher:amqp-consumers-2] ERROR akka.actor.Actor$ - Problem
java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:121) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:704) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:668) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:660) ~[amqp-client-1.8.1.jar:na]
at akka.amqp.ConsumerActor.setupChannel(ConsumerActor.scala:58) ~[akka-amqp-1.0.jar:na]
at akka.amqp.FaultTolerantChannelActor.akka$amqp$FaultTolerantChannelActor$$setupChannelInternal(FaultTolerantChannelActor.scala:82) ~[akka-amqp-1.0.jar:na]
at akka.amqp.FaultTolerantChannelActor$$anonfun$channelMessageHandler$1$$anonfun$apply$1$$anonfun$apply$3$$anonfun$apply$4.apply(FaultTolerantChannelActor.scala:32) ~[akka-amqp-1.0.jar:na]
at akka.amqp.FaultTolerantChannelActor$$anonfun$channelMessageHandler$1$$anonfun$apply$1$$anonfun$apply$3$$anonfun$apply$4.apply(FaultTolerantChannelActor.scala:32) ~[akka-amqp-1.0.jar:na]
at scala.Option.foreach(Option.scala:185) ~[scala-library-2.8.1.jar:na]
at akka.amqp.FaultTolerantChannelActor$$anonfun$channelMessageHandler$1$$anonfun$apply$1$$anonfun$apply$3.apply(FaultTolerantChannelActor.scala:32) ~[akka-amqp-1.0.jar:na]
at akka.amqp.FaultTolerantChannelActor$$anonfun$channelMessageHandler$1$$anonfun$apply$1$$anonfun$apply$3.apply(FaultTolerantChannelActor.scala:32) ~[akka-amqp-1.0.jar:na]
at scala.Option.foreach(Option.scala:185) ~[scala-library-2.8.1.jar:na]
at akka.amqp.FaultTolerantChannelActor$$anonfun$channelMessageHandler$1$$anonfun$apply$1.apply(FaultTolerantChannelActor.scala:32) ~[akka-amqp-1.0.jar:na]
at akka.amqp.FaultTolerantChannelActor$$anonfun$channelMessageHandler$1$$anonfun$apply$1.apply(FaultTolerantChannelActor.scala:29) ~[akka-amqp-1.0.jar:na]
at scala.Option.foreach(Option.scala:185) ~[scala-library-2.8.1.jar:na]
at akka.amqp.FaultTolerantChannelActor$$anonfun$channelMessageHandler$1.apply(FaultTolerantChannelActor.scala:28) ~[akka-amqp-1.0.jar:na]
at akka.amqp.FaultTolerantChannelActor$$anonfun$channelMessageHandler$1.apply(FaultTolerantChannelActor.scala:25) ~[akka-amqp-1.0.jar:na]
at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:45) ~[scala-library-2.8.1.jar:na]
at akka.actor.Actor$$anonfun$4.apply(Actor.scala:481) ~[akka-actor-1.0.jar:na]
at akka.actor.Actor$$anonfun$4.apply(Actor.scala:464) ~[akka-actor-1.0.jar:na]
at akka.actor.Actor$class.apply(Actor.scala:435) ~[akka-actor-1.0.jar:na]
at akka.amqp.FaultTolerantChannelActor.apply(FaultTolerantChannelActor.scala:15) ~[akka-amqp-1.0.jar:na]
at akka.actor.LocalActorRef.akka$actor$LocalActorRef$$dispatch(ActorRef.scala:1012) [akka-actor-1.0.jar:na]
at akka.actor.LocalActorRef$$anonfun$invoke$1.apply$mcV$sp(ActorRef.scala:832) [akka-actor-1.0.jar:na]
at akka.actor.LocalActorRef$$anonfun$invoke$1.apply(ActorRef.scala:828) [akka-actor-1.0.jar:na]
at akka.actor.LocalActorRef$$anonfun$invoke$1.apply(ActorRef.scala:828) [akka-actor-1.0.jar:na]
at akka.util.ReentrantGuard.withGuard(LockUtil.scala:19) [akka-actor-1.0.jar:na]
at akka.actor.LocalActorRef.invoke(ActorRef.scala:827) [akka-actor-1.0.jar:na]
at akka.dispatch.MessageInvocation.invoke(MessageHandling.scala:23) [akka-actor-1.0.jar:na]
at akka.dispatch.ExecutableMailbox$class.processMailbox(ExecutorBasedEventDrivenDispatcher.scala:190) [akka-actor-1.0.jar:na]
at akka.dispatch.ExecutorBasedEventDrivenDispatcher$$anon$1.processMailbox(ExecutorBasedEventDrivenDispatcher.scala:109) [akka-actor-1.0.jar:na]
at akka.dispatch.ExecutableMailbox$class.run(ExecutorBasedEventDrivenDispatcher.scala:166) [akka-actor-1.0.jar:na]
at akka.dispatch.ExecutorBasedEventDrivenDispatcher$$anon$1.run(ExecutorBasedEventDrivenDispatcher.scala:109) [akka-actor-1.0.jar:na]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_24]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_24]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_24]
at akka.dispatch.MonitorableThread.run(ThreadPoolBuilder.scala:185) [akka-actor-1.0.jar:na]
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:81) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:47) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:342) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:702) ~[amqp-client-1.8.1.jar:na]
... 35 common frames omitted
java.io.EOFException: null
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:273) ~[na:1.6.0_24]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:118) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:133) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:409) ~[amqp-client-1.8.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:437) ~[amqp-client-1.8.1.jar:na]
15:07:24.153 [akka:event-driven:dispatcher:amqp-consumers-2] DEBUG a.d.ExecutorBasedEventDrivenDispatcher - Suspending 627e4560-4e44-11e0-b676-000c2906f997

On the server side I get the following:

2011-03-14 14:54:00,370 INFO  [MINANetworkDriver(Acceptor)-5] (Log4jMessageLogger.java:72) - [con:103(/127.0.0.1:43387)] CON-1001 : Open2011-03-14 14:54:00,370 INFO  [pool-3-thread-24] (Log4jMessageLogger.java:72) - [con:103(/127.0.0.1:43387)] CON-1001 : Open : Protocol Version : 8-02011-03-14 14:54:00,381 INFO  [pool-3-thread-4] (Log4jMessageLogger.java:72) - [con:103(gateway@/127.0.0.1:43387/gyokuro)/ch:1] CHN-1001 : Create2011-03-14 14:54:00,383 INFO  [pool-3-thread-3] (Log4jMessageLogger.java:72) - [con:103(gateway@/127.0.0.1:43387/gyokuro)/ch:2] CHN-1001 : Create2011-03-14 14:54:00,385 INFO  [pool-3-thread-7] (Log4jMessageLogger.java:72) - [con:103(gateway@/127.0.0.1:43387/gyokuro)/ch:1] CHN-1004 : Prefetch Size (bytes) 0 : Count 02011-03-14 14:54:00,392 ERROR [MINANetworkDriver(Acceptor)-8] (AMQProtocolEngine.java:262) - Unexpected exception when processing datablockjava.nio.BufferUnderflowException        at java.nio.Buffer.nextGetIndex(Buffer.java:480)
        at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:336)        at org.apache.mina.common.support.BaseByteBuffer.getInt(BaseByteBuffer.java:323)
        at org.apache.mina.common.ByteBuffer.getUnsignedInt(ByteBuffer.java:725)        at org.apache.qpid.framing.EncodingUtils.readFieldTable(EncodingUtils.java:645)
        at org.apache.qpid.framing.AMQMethodBodyImpl.readFieldTable(AMQMethodBodyImpl.java:146)        at org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl.<init>(BasicConsumeBodyImpl.java:81)        at org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl$1.newInstance(BasicConsumeBodyImpl.java:47)        at org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0.convertToBody(MethodRegistry_8_0.java:298)        at org.apache.qpid.framing.AMQMethodBodyFactory.createBody(AMQMethodBodyFactory.java:43)        at org.apache.qpid.framing.AMQFrame.<init>(AMQFrame.java:42)        at org.apache.qpid.framing.AMQDataBlockDecoder.createAndPopulateFrame(AMQDataBlockDecoder.java:97)
        at org.apache.qpid.codec.AMQDecoder.decodeBuffer(AMQDecoder.java:324)
        at org.apache.qpid.server.protocol.AMQProtocolEngine.received(AMQProtocolEngine.java:238)        at org.apache.qpid.server.protocol.AMQProtocolEngine.received(AMQProtocolEngine.java:100)
        at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:101)
        at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:36)
        at org.apache.qpid.transport.network.mina.MINANetworkDriver.messageReceived(MINANetworkDriver.java:337)
        at org.apache.mina.common.support.AbstractIoFilterChain$TailFilter.messageReceived(AbstractIoFilterChain.java:703)
        at org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:362)
        at org.apache.mina.common.support.AbstractIoFilterChain.access$1200(AbstractIoFilterChain.java:54)
        at org.apache.mina.common.support.AbstractIoFilterChain$EntryImpl$1.messageReceived(AbstractIoFilterChain.java:800)
        at org.apache.mina.filter.executor.ExecutorFilter.processEvent(ExecutorFilter.java:243)
        at org.apache.mina.filter.executor.ExecutorFilter$ProcessEventsRunnable.run(ExecutorFilter.java:305)
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:665)
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:690)
        at java.lang.Thread.run(Thread.java:662)


Here's my code (Config.amqpIncomingQueue is "gateway" and Config.amqpOutgoingQueue is "ruote"):

  private def initAmqp = {
    val connectionCallback = actorOf(new Actor { def receive = {
      case Connected => log info "Connection callback: Connected"
      case Reconnecting => log info "Connection callback: Reconnecting"
      case Disconnected => log info "Connection callback: Disconnected"
    }})

    val connection = AMQP.newConnection(ConnectionParameters(host = Config.amqpHost, 
                                                             port = Config.amqpPort,
                                                             username = Config.amqpUser,
                                                             password = Config.amqpPasword,
                                                             virtualHost = Config.amqpVhost,
                                                             connectionCallback = Some(connectionCallback)))

    
    log info "Connected to AMQP server %s:%d vhost %s as user %s".format(Config.amqpHost,
                                                                         Config.amqpPort,
                                                                         Config.amqpVhost,
                                                                         Config.amqpUser)

    val channelCallback = actorOf(new Actor { def receive = {
      case Started => log info "Channel callback: Started"
      case Restarting => log info "Channel callback: Restarting"
      case Stopped => log info"Channel callback: Stopped"
    }})
    
    // This will be the same for both incoming and outgoing queues
    val channelParams = ChannelParameters(channelCallback = Some(channelCallback))
    
    // This will be the same for both incoming and outgoing queues
    val declaration = ActiveDeclaration(durable = true,
                                        autoDelete = false, 
                                        exclusive = false)

    val inExchangeParams = ExchangeParameters(exchangeName = Config.amqpIncomingQueue,
                                              exchangeType = Direct, 
                                              exchangeDeclaration = declaration)
    
    val consumer = AMQP.newConsumer(connection, 
                                    ConsumerParameters(routingKey = Config.amqpIncomingQueue,
                                                       deliveryHandler = actorOf[OutboundMessageActor],
                                                       queueName = Some(Config.amqpIncomingQueue),
                                                       exchangeParameters = Some(inExchangeParams),
                                                       queueDeclaration = declaration,
                                                       selfAcknowledging = false,
                                                       channelParameters = Some(channelParams)))

    val outExchangeParams = ExchangeParameters(exchangeName = Config.amqpOutgoingQueue, 
                                              exchangeType = Direct, 
                                              exchangeDeclaration = declaration)
    
    val producer = AMQP.newProducer(connection, 
                                    ProducerParameters(exchangeParameters = Some(outExchangeParams),
                                                       producerId = Some(AMQP_PRODUCER_ID),
                                                       returnListener = None,
                                                       channelParameters = Some(channelParams)))

    log debug "AMQP initialized. Incoming queue: [%s], outgoing queue: [%s]".format(Config.amqpIncomingQueue,
                                                                                    Config.amqpOutgoingQueue)
    
    (producer, consumer)
  }
}

Any ideas?

TIA,
-Mario.

--
I want to change the world but they won't give me the source code.

Irmo Manie

unread,
Mar 14, 2011, 2:26:15 PM3/14/11
to akka...@googlegroups.com
I quickly tried to run a QPid locally and connect but I get the same errors.
I don't think the rabbitmq java lib is fully compatible with the QPid
broker, but to be sure you could ask the RabbitMQ mailing list.

If there's something we can change to make it compatible let me know,
but I guess theoretically the AMQP protocol should make them
compatible.

- Irmo

Mario Camou

unread,
Mar 16, 2011, 10:28:26 PM3/16/11
to akka...@googlegroups.com, Irmo Manie
Thank you very much Irmo.

That's my take on it too, AMQP should be compatible across implementations. I tried with a Ruby client using the ruby-amqp gem and am having the same problem, and in both cases I also get an exception on the QPid side. I assume there's some problem with their AMQP implementation when talking to clients other than theirs.

For the moment I am reverting to RabbitMQ on the server side. However, with RabbitMQ the binding doesn't seem to get created. The code is the same one I pasted in my original message. Here's the stack trace I get (this occurs when trying to send a message):

03:25:20.040 [akka:event-driven:dispatcher:amqp-producers-9] DEBUG akka.amqp.ProducerActor - Sending message [Message([B@2a48f675,ruote,true,false,None)]   
ReturnListener.handleBasicReturn threw an exception for channel AMQChannel(amqp://gateway@localhost:5672/gyokuro,1):akka.amqp.MessageNotDeliveredException: Could not deliver message [[B@368b1a4f] with reply code [312] with reply text [NO_ROUTE] and routing key [ruote] to exchange [ruote]
        at akka.amqp.ProducerActor$$anon$1.handleBasicReturn(ProducerActor.scala:44)
        at akka.amqp.ProducerActor$$anon$1.handleBasicReturn(ProducerActor.scala:36)        at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:284)
        at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:165)        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:453)03:25:20.055 [akka:event-driven:dispatcher:global-10] DEBUG akka.dispatch.MonitorableThread - Created thread akka:event-driven:dispatcher:global-10
03:25:20.063 [akka:event-driven:dispatcher:global-10] ERROR a.amqp.FaultTolerantConnectionActor - ConnectionShutdown is hard error - self terminating
com.rabbitmq.client.ShutdownSignalException: connection error; reason: #method<connection.close>(reply-code=541,reply-text=Internal error in ReturnListener.handleBasicReturn,class-id=0,method-id=0)        at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:620) ~[amqp-client-1.8.1.jar:na]        at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:724) ~[amqp-client-1.8.1.jar:na]        at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:702) ~[amqp-client-1.8.1.jar:na]        at com.rabbitmq.client.impl.DefaultExceptionHandler.handleChannelKiller(DefaultExceptionHandler.java:77) ~[amqp-client-1.8.1.jar:na]        at com.rabbitmq.client.impl.DefaultExceptionHandler.handleReturnListenerException(DefaultExceptionHandler.java:51) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:292) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:165) ~[amqp-client-1.8.1.jar:na]        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110) ~[amqp-client-1.8.1.jar:na]        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:453) ~[amqp-client-1.8.1.jar:na]
Caused by: akka.amqp.MessageNotDeliveredException: Could not deliver message [[B@368b1a4f] with reply code [312] with reply text [NO_ROUTE] and routing key [ruote] to exchange [ruote]        at akka.amqp.ProducerActor$$anon$1.handleBasicReturn(ProducerActor.scala:44) ~[akka-amqp-1.0.jar:na]        at akka.amqp.ProducerActor$$anon$1.handleBasicReturn(ProducerActor.scala:36) ~[akka-amqp-1.0.jar:na]        at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:284) ~[amqp-client-1.8.1.jar:na]
        ... 3 common frames omitted


If I then go in through the web admin interface (I installed the plugin as detailed in http://www.rabbitmq.com/management.html) and manually create the binding from exchange "ruote" to queue "ruote" using routing key "ruote", everything starts working. 

I tried using the default exchange ("") but I get the following:

03:20:55.943 [akka:event-driven:dispatcher:amqp-producers-3] ERROR akka.actor.Actor$ - Exception when invoking
        actor [Actor[amqp.producer:308c67d0-503d-11e0-a666-000c2906f997]]        with message [Start]03:20:55.950 [akka:event-driven:dispatcher:amqp-consumers-2] DEBUG akka.amqp.ConsumerActor - Actively declaring new queue [gateway] for AMQP.Consumer[id= akka.amqp.ConsumerActor, exchangeParameters=Some(ExchangeParameters(gateway,direct,ActiveDeclaration(true,false,false),Map())), queueDeclaration=ActiveDeclaration(true,false,false)]
03:20:55.957 [akka:event-driven:dispatcher:amqp-producers-3] ERROR akka.actor.Actor$ - Problemjava.io.IOException: null        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:121) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:145) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:481) ~[amqp-client-1.8.1.jar:na]
       at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:71) ~[amqp-client-1.8.1.jar:na]        at akka.amqp.FaultTolerantChannelActor$$anonfun$akka$amqp$FaultTolerantChannelActor$$setupChannelInternal$1.apply(FaultTolerantChannelActor.scala:71)
 ~[akka-amqp-1.0.jar:na]
        at akka.amqp.FaultTolerantChannelActor$$anonfun$akka$amqp$FaultTolerantChannelActor$$setupChannelInternal$1.apply(FaultTolerantChannelActor.scala:66) ~[akka-amqp-1.0.jar:na]
        at scala.Option.foreach(Option.scala:185) ~[scala-library-2.8.1.jar:na]        at akka.amqp.FaultTolerantChannelActor.akka$amqp$FaultTolerantChannelActor$$setupChannelInternal(FaultTolerantChannelActor.scala:65) ~[akka-amqp-1.0.
jar:na]        at akka.amqp.FaultTolerantChannelActor$$anonfun$channelMessageHandler$1$$anonfun$apply$1$$anonfun$apply$3$$anonfun$apply$4.apply(FaultTolerantChannel
        at akka.dispatch.MonitorableThread.run(ThreadPoolBuilder.scala:185) [akka-actor-1.0.jar:na]Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=403,reply-text=ACCESS_REFUSED - operation n
ot permitted on the default exchange,class-id=40,method-id=10),null,""}
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:81) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:47) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:342) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:215) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:139) ~[amqp-client-1.8.1.jar:na]
        ... 37 common frames omittedcom.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=403,reply-text=ACCESS_REFUSED - operation not permitte
d on the default exchange,class-id=40,method-id=10),null,""}
        at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:230) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:165) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110) ~[amqp-client-1.8.1.jar:na]
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:453) ~[amqp-client-1.8.1.jar:na]

-Mario.

--
I want to change the world but they won't give me the source code.


--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.


Irmo Manie

unread,
Mar 17, 2011, 3:23:49 AM3/17/11
to Mario Camou, akka...@googlegroups.com
I see you use a configured incoming queue name also as the consumer
exchange name, and a different queue name as the producer exchange
name.
You send a message over an exchange and not to a queue. The broker
delivers to the queue bound to the routing key for you. So be sure to
send over the same exchange name, since that's where the consumer has
its binding.

So:

consumer: 'some_exchange' binding queue 'some_queue' to routing key
'some_routing_key'

producer: 'some_exchange' - sending with routing key 'some_routing_key'

broker than delivers message on exchange 'some_exchange' to queue
'some_queue' because that is bound to the routing key
'some_routing_key' the message is sent with.

In your case, where you send with the mandatory flag to true, the
broker sees that there is no way to deliver the message to any
consumer and you get the message back with the 'no route' error.

Cheers,
Irmo

Mario Camou

unread,
Mar 17, 2011, 4:02:22 AM3/17/11
to akka...@googlegroups.com, Irmo Manie
Aaahhh... now I get it...

So I use the same exchange for sending and receiving. This is so bizarre after coming from other queuing systems.

Thanks!

-Mario.

--
I want to change the world but they won't give me the source code.


Irmo Manie

unread,
Mar 17, 2011, 4:14:08 AM3/17/11
to Mario Camou, akka...@googlegroups.com
There is no such thing in AMQP as an outgoing queue, only incoming ;-)
The exchange, hence then name, exchanges the messages. The queue, only
on consumer side, should be bound to a routing key on a certain
exchange.

Could you create a gist (or some other online scrapbook) with the code
you're trying to run? So also the message sending part. Then I can
have a look where it goes wrong.

Cheers,
Irmo

On Thu, Mar 17, 2011 at 9:08 AM, Mario Camou <mca...@tecnoguru.com> wrote:
> Hmmm... I *THOUGHT* I had it... but no. I changed the ExchangeParameters to
> use the same ExchangeName for both queues, but I still get the 312 error.
> The way this works is, I have another system with which I am communicating,
> so the 'gateway' queue is where the external system sends me the data, and
> the 'ruote' queue is where I send data to the other system. The queues *DO*
> exist, I'm using the queue name as routing key, and the only thing missing
> is the binding for the outgoing queue.


> -Mario.
>
> --
> I want to change the world but they won't give me the source code.
>
>

Mario Camou

unread,
Mar 17, 2011, 4:08:01 AM3/17/11
to akka...@googlegroups.com, Irmo Manie
Hmmm... I *THOUGHT* I had it... but no. I changed the ExchangeParameters to use the same ExchangeName for both queues, but I still get the 312 error.

The way this works is, I have another system with which I am communicating, so the 'gateway' queue is where the external system sends me the data, and the 'ruote' queue is where I send data to the other system. The queues *DO* exist, I'm using the queue name as routing key, and the only thing missing is the binding for the outgoing queue.
-Mario.

--
I want to change the world but they won't give me the source code.


On Thu, Mar 17, 2011 at 09:02, Mario Camou <mca...@tecnoguru.com> wrote:
Reply all
Reply to author
Forward
0 new messages