Redesigned AMQP module now on master (using RabbitMQ 1.8.0)

25 views
Skip to first unread message

Irmo Manie

unread,
Jul 2, 2010, 7:40:03 AM7/2/10
to akka...@googlegroups.com
Hi all,

I just pushed the redesign of the AMQP module to master.

Biggest change is that it now should be really fault tolerant and even more easy to configure/use because of the usage of more configuration defaults.
Also no more connection(factory) per channel, but nice reuse of the connection.

All tests are disabled because they need a local RabbitMQ running.

I will update the docs later today, but if you want to have a peek: the ExampleSession.scala is updated too, showing the simplicity.

Any review/comments is much appreciated.

Cheers,
Irmo

Debasish Ghosh

unread,
Jul 2, 2010, 7:49:44 AM7/2/10
to akka...@googlegroups.com
great stuff ! Will look into it more very soon ..

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.



--
Debasish Ghosh
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Jonas Bonér

unread,
Jul 2, 2010, 8:04:19 AM7/2/10
to akka...@googlegroups.com
Great.
But disabling the tests like you have done makes the all fail. So
build fails now.

info] Test Starting: initializationError
[error] Test Failed: initializationError
java.lang.Exception: No runnable methods
at org.junit.runners.BlockJUnit4ClassRunner.validateInstanceMethods(BlockJUnit4ClassRunner.java:154)
at org.junit.runners.BlockJUnit4ClassRunner.collectInitializationErrors(BlockJUnit4ClassRunner.java:112)
at org.junit.runners.ParentRunner.validate(ParentRunner.java:253)
at org.junit.runners.ParentRunner.<init>(ParentRunner.java:55)
at org.junit.runners.BlockJUnit4ClassRunner.<init>(BlockJUnit4ClassRunner.java:56)
at org.junit.internal.builders.JUnit4Builder.runnerForClass(JUnit4Builder.java:13)
at org.junit.runners.model.RunnerBuilder.safeRunnerForClass(RunnerBuilder.java:57)
at org.junit.internal.builders.AllDefaultPossibilitiesBuilder.runnerForClass(AllDefaultPossibilitiesBuilder.java:29)
at org.junit.runners.model.RunnerBuilder.safeRunnerForClass(RunnerBuilder.java:57)
at org.junit.runners.model.RunnerBuilder.runners(RunnerBuilder.java:93)
at org.junit.runners.model.RunnerBuilder.runners(RunnerBuilder.java:84)
at org.junit.runners.Suite.<init>(Suite.java:66)
at org.junit.runner.Request.classes(Request.java:68)
at org.junit.runner.JUnitCore.run(JUnitCore.java:107)
at org.scalatest.junit.JUnitSuite$class.run(JUnitSuite.scala:261)
at se.scalablesolutions.akka.amqp.test.AMQPProducerMessageTest.run(AMQPProducerMessageTest.scala:16)
at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
at sbt.TestRunner.run(TestFramework.scala:52)
at sbt.TestRunner.runTest$1(TestFramework.scala:66)
at sbt.TestRunner.run(TestFramework.scala:75)
at sbt.TestFramework$$anonfun$9$$anonfun$apply$11.runTest$2(TestFramework.scala:192)
at sbt.TestFramework$$anonfun$9$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:203)
at sbt.TestFramework$$anonfun$9$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:203)
at sbt.NamedTestTask.run(TestFramework.scala:91)
at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:187)
at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:187)
at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
at sbt.impl.RunTask.runTask(RunTask.scala:85)
at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
at sbt.Control$.trapUnit(Control.scala:19)
at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

> --
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To post to this group, send email to akka...@googlegroups.com.
> To unsubscribe from this group, send email to
> akka-user+...@googlegroups.com.
> For more options, visit this group at
> http://groups.google.com/group/akka-user?hl=en.
>

--
Jonas Bonér

work: http://jayway.com
code: http://akkasource.com
blog: http://jonasboner.com
twitter: @jboner

Irmo Manie

unread,
Jul 2, 2010, 8:11:56 AM7/2/10
to akka...@googlegroups.com
Oh damn... let me 'fix' that

Irmo Manie

unread,
Jul 2, 2010, 8:16:57 AM7/2/10
to akka...@googlegroups.com
> test-only se.scalablesolutions.akka.amqp.test.AMQP*
.....

[info] == akka-amqp / test-finish ==
[info] Passed: : Total 8, Failed 0, Errors 0, Passed 8, Skipped 0
[info]
[info] All tests PASSED.
[info] == akka-amqp / test-finish ==


:)

Irmo Manie

unread,
Jul 2, 2010, 1:15:51 PM7/2/10
to akka...@googlegroups.com
And updated the docs: http://doc.akkasource.org/amqp

Any review, advice, changes, complaints and love letters are welcome :P

Cheers,
Irmo

Jonas Bonér

unread,
Jul 2, 2010, 1:20:12 PM7/2/10
to akka...@googlegroups.com
Looks great. Thank you.

Jonathan Schwietert

unread,
Jul 6, 2010, 12:13:24 PM7/6/10
to Akka User List
When might this feature set be included in a release tag? When you say
"master", where exactly could that be found? Would that be a source-
build or similar to the 0.9.1 release package?

On Jul 2, 11:20 am, Jonas Bonér <jo...@jonasboner.com> wrote:
> Looks great. Thank you.
>
> On 2 July 2010 19:15, Irmo Manie <irmo.ma...@gmail.com> wrote:
>
>
>
> > And updated the docs: http://doc.akkasource.org/amqp
> > Any review, advice, changes, complaints and love letters are welcome :P
> > Cheers,
> > Irmo
>
> > On Fri, Jul 2, 2010 at 2:16 PM, Irmo Manie <irmo.ma...@gmail.com> wrote:
>
> >> > test-only se.scalablesolutions.akka.amqp.test.AMQP*
> >> .....
> >> [info] == akka-amqp / test-finish ==
> >> [info] Passed: : Total 8, Failed 0, Errors 0, Passed 8, Skipped 0
> >> [info]
> >> [info] All tests PASSED.
> >> [info] == akka-amqp / test-finish ==
>
> >> :)

Irmo Manie

unread,
Jul 6, 2010, 12:27:23 PM7/6/10
to akka...@googlegroups.com
With 'master' we mean the the master branch on GitHub: http://github.com/jboner/akka
So yes: a source build :)

All that is in 'master' is automatically for the next release, otherwise it is stashed into different branches.
So this would be included in 0.10 as a released package.

Cheers,
Irmo

Jonathan Schwietert

unread,
Jul 6, 2010, 12:34:50 PM7/6/10
to Akka User List
Awesome, thanks! Guess i need to look into an akka build :)

On Jul 6, 10:27 am, Irmo Manie <irmo.ma...@gmail.com> wrote:
> With 'master' we mean the the master branch on GitHub:http://github.com/jboner/akka
> <http://github.com/jboner/akka>So yes: a source build :)
>
> All that is in 'master' is automatically for the next release, otherwise it
> is stashed into different branches.
> So this would be included in 0.10 as a released package.
>
> Cheers,
> Irmo
>
> On Tue, Jul 6, 2010 at 6:13 PM, Jonathan Schwietert <
>
> > > >>>> > akka-user+...@googlegroups.com<akka-user%2Bunsu...@googlegroups.com>
> > .
> > > >>>> > For more options, visit this group at
> > > >>>> >http://groups.google.com/group/akka-user?hl=en.
>
> > > >>>> --
> > > >>>> Jonas Bonér
>
> > > >>>> work:  http://jayway.com
> > > >>>> code:  http://akkasource.com
> > > >>>> blog:    http://jonasboner.com
> > > >>>> twitter: @jboner
>
> > > >>>> --
> > > >>>> You received this message because you are subscribed to the Google
> > > >>>> Groups "Akka User List" group.
> > > >>>> To post to this group, send email to akka...@googlegroups.com.
> > > >>>> To unsubscribe from this group, send email to
> > > >>>> akka-user+...@googlegroups.com<akka-user%2Bunsu...@googlegroups.com>
> > .
> > > >>>> For more options, visit this group at
> > > >>>>http://groups.google.com/group/akka-user?hl=en.
>
> > > > --
> > > > You received this message because you are subscribed to the Google
> > Groups
> > > > "Akka User List" group.
> > > > To post to this group, send email to akka...@googlegroups.com.
> > > > To unsubscribe from this group, send email to
> > > > akka-user+...@googlegroups.com<akka-user%2Bunsu...@googlegroups.com>
> > .
> > > > For more options, visit this group at
> > > >http://groups.google.com/group/akka-user?hl=en.
>
> > > --
> > > Jonas Bonér
>
> > > work:  http://jayway.com
> > > code:  http://akkasource.com
> > > blog:    http://jonasboner.com
> > > twitter: @jboner
>
> > --
> > You received this message because you are subscribed to the Google Groups
> > "Akka User List" group.
> > To post to this group, send email to akka...@googlegroups.com.
> > To unsubscribe from this group, send email to
> > akka-user+...@googlegroups.com<akka-user%2Bunsu...@googlegroups.com>
> > .

Irmo Manie

unread,
Jul 6, 2010, 12:50:49 PM7/6/10
to akka...@googlegroups.com
If you need any help, just post it here on the mailing list :)



To unsubscribe from this group, send email to akka-user+...@googlegroups.com.

Irmo Manie

unread,
Jul 8, 2010, 11:20:15 AM7/8/10
to akka...@googlegroups.com
I've already been refactoring a bit and just pushed the changes into master.

Some minor changes and one bigger addition: actor based rpc over rabbitmq 

From the ExampleSession.scala:


    val connection = AMQP.newConnection()

    val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic)

    val stringSerializer = new Serializer {
      def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes)
      def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes
    }

    val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer, {
      case "rpc_request" => "rpc_response"
      case _ => error("unknown request")
    })

    val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer)

    val response = (rpcClient !! "rpc_request")
    log.info("Response: " + response)


Next step is to enhance this further by having better typed handling and serialization going on. :-)

Will add example to the docs later on.

Cheers,
Irmo

Irmo Manie

unread,
Jul 13, 2010, 12:03:32 PM7/13/10
to akka...@googlegroups.com
Mm, the usergroup got removed, so FYI :)

---------- Forwarded message ----------
From: Irmo Manie <irmo....@gmail.com>
Date: Tue, Jul 13, 2010 at 6:02 PM
Subject: Re: [akka-user] Re: Redesigned AMQP module now on master (using RabbitMQ 1.8.0)
To: Sebastian Latza <ma...@sebastian-latza.de>


Apparently,  making the delivery actor fail, doesn't make the consumer actor restart properly.
I'll get this fixed and push this back into master later on.

Thanks for spotting! :-)


On Tue, Jul 13, 2010 at 5:46 PM, Irmo Manie <irmo....@gmail.com> wrote:
On Tue, Jul 13, 2010 at 5:23 PM, Sebastian Latza <ma...@sebastian-latza.de> wrote:
Hi Irmo,


Hi Sebastian,

 
some details I came across while briefly trying out the code:
- it would be nice if I could specify the name of my queues. Right now
they seem to be autogenerated, but in my usage scenario I need access
to the naming scheme since other consumers are accessing the queues
too.

You can optionally specify your own queue name with the ConsumerParameters, see:

case class ConsumerParameters(exchangeParameters: ExchangeParameters,
                                routingKey: String,
                                deliveryHandler: ActorRef,
                                queueName: Option[String] = None,
                                queueDurable: Boolean = false,
                                queueAutoDelete: Boolean = true,
                                queuePassive: Boolean = false,
                                queueExclusive: Boolean = false,
                                selfAcknowledging: Boolean = true,
                                channelParameters: Option[ChannelParameters] = None) 

So something like: 

val consParams = new ConsumerParameters(exParams, "my.key", handlerActor, Some("my.queue"))

 
- As I mentioned a while ago on the list, you might run into
starvation if the consuming fails to acknowlege the recieved in the
non-selfAcknowledging mode when hitting basicQos. A Scheduler with
some kind of message timeout tracking would help here I guess. (maybe
just a fault on my side, missing this mechanic in the code)

If something goes wrong in the handling actor that prevents acknowledgement, you should throw an exception or kill your actor.
Because the handling actor is automatically supervised by the consumer actor, all are restarted, killing the current channel too.
By killing the channel, the broker puts the unacknowledged messages back in the 'messages ready' state so another consumer can pick it up.

"Let it fail" ;-)

I might make a nice example and/or unit test that can prove this

 
- In the documentation:

val exchangeParameters = ExchangeParameters("my_topic_exchange",
ExchangeType.Topic)
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters(..

I think exchangeParameters should be ConsumerParameters there.


Ah, no I already see what's wrong. Currently in the example the ConsumerParameters take channelParameters as a param, but that was changed to the new ExchangeParameters. ChannelParameters can still be set, but only if needed.
Thanks for spotting and fixed :)
 
Anyway, great work, especially the explicit ack'ing solves some major
problems for me.


You're welcome. :-) 
I use it heavily too, so I will continuesly improve and optimize it, but the main API should be fine like it is now. 
I will make some more improvements on the convenience rpc actors and their serialization posibilities, but I have to use/implement it a couple of times first to find the pain-points.


Regards
Sebastian



Cheers,
Irmo
 
2010/7/8 Irmo Manie <irmo....@gmail.com>

Irmo Manie

unread,
Jul 14, 2010, 4:28:56 AM7/14/10
to akka...@googlegroups.com, Sebastian Latza
Hi Sebastian,

A fix for the manual acknowledgement is now on master. :-)

So if you can't acknowledge, just let your actor fail, this will cause a restart of the consumer and your actor.

If you want to re-consume the unacknowledged message again with the restarted actors, be sure to use a named, non autodelete queue.

Cheers,
Irmo

Jonas Bonér

unread,
Jul 14, 2010, 4:31:36 AM7/14/10
to akka...@googlegroups.com, Sebastian Latza
Great. Thanks.

Santi Manninen

unread,
Jul 23, 2010, 4:16:42 PM7/23/10
to akka...@googlegroups.com
Looks great.

I've been having trouble killing 'temporary' (ie. ones I need to get
rid of cleanly later) consumers in 0.9.1 recently.

Sending a ! Stop to a consumer kills the entire process! After a lot
of trial and error, I have resorted to sending a new
MessageConsumerListener with a bad queue name and route key to get rid
of them in order to (maybe) replace them later. They won't go away (an
AMQP thread with a connection persists whatever I do), however.

This new AMQP stuff changes a lot, and I haven't tried this with the
master branch yet .. so I'm just saying.. and stuck with the current
badly leaking setup. :)

I'll look into it better after Monday, can't touch it before then.
Just thought I'd mention it (remember this concerns 0.9.1)

Santi

Irmo Manie

unread,
Jul 23, 2010, 4:27:15 PM7/23/10
to akka...@googlegroups.com
The new AMQP stuff of 0.10 changes quite a bit yes, but i'ts worth it to change to it. If you need any help with that don't hesitate to ask your questions here on the mailing list. :-) 

opyate

unread,
Aug 11, 2010, 11:29:44 AM8/11/10
to Akka User List
Hi hAkkers,

Can I suggest hosting an http://rabbitmq-instance.akkasource.org for
the tests to pass?
Perhaps also listening on port 80 so folks behind a proxy don't have
problems.

Thanks,
Juan

On Jul 23, 9:27 pm, Irmo Manie <irmo.ma...@gmail.com> wrote:
> The new AMQP stuff of 0.10 changes quite a bit yes, but i'ts worth it to
> change to it. If you need any help with that don't hesitate to ask your
> questions here on the mailing list. :-)
>
> On Fri, Jul 23, 2010 at 10:16 PM, Santi Manninen <santiksc...@gmail.com>wrote:
>
>
>
> > Looks great.
>
> > I've been having trouble killing 'temporary' (ie. ones I need to get
> > rid of cleanly later) consumers in 0.9.1 recently.
>
> > Sending a ! Stop to a consumer kills the entire process! After a lot
> > of trial and error, I have resorted to sending a new
> > MessageConsumerListener with a bad queue name and route key to get rid
> > of them in order to (maybe) replace them later. They won't go away (an
> > AMQP thread with a connection persists whatever I do), however.
>
> > This new AMQP stuff changes a lot, and I haven't tried this with the
> > master branch yet .. so I'm just saying.. and stuck with the current
> > badly leaking setup. :)
>
> > I'll look into it better after Monday, can't touch it before then.
> > Just thought I'd mention it (remember this concerns 0.9.1)
>
> > Santi
>
> > On Wed, Jul 14, 2010 at 10:31 AM, Jonas Bonér <jo...@jonasboner.com>
> > wrote:
> > > Great. Thanks.
>
> > > On 14 July 2010 10:28, Irmo Manie <irmo.ma...@gmail.com> wrote:
> > >> Hi Sebastian,
> > >> A fix for the manual acknowledgement is now on master. :-)
>
> >http://github.com/jboner/akka/commit/7c347637a001b70843207850ddf85fef...
> > >> So if you can't acknowledge, just let your actor fail, this will cause a
> > >> restart of the consumer and your actor.
> > >> If you want to re-consume the unacknowledged message again with the
> > >> restarted actors, be sure to use a named, non autodelete queue.
> > >> Cheers,
> > >> Irmo
>
> > >> On Tue, Jul 13, 2010 at 6:03 PM, Irmo Manie <irmo.ma...@gmail.com>
> > wrote:
>
> > >>> Mm, the usergroup got removed, so FYI :)
>
> > >>> ---------- Forwarded message ----------
> > >>> From: Irmo Manie <irmo.ma...@gmail.com>
> > >>> Date: Tue, Jul 13, 2010 at 6:02 PM
> > >>> Subject: Re: [akka-user] Re: Redesigned AMQP module now on master
> > (using
> > >>> RabbitMQ 1.8.0)
> > >>> To: Sebastian Latza <m...@sebastian-latza.de>
>
> > >>> Apparently,  making the delivery actor fail, doesn't make the consumer
> > >>> actor restart properly.
> > >>> I'll get this fixed and push this back into master later on.
> > >>> Thanks for spotting! :-)
>
> > >>> On Tue, Jul 13, 2010 at 5:46 PM, Irmo Manie <irmo.ma...@gmail.com>
> > >>>>> 2010/7/8 Irmo Manie <irmo.ma...@gmail.com>
> > >>>>> > On Tue, Jul 6, 2010 at 6:50 PM, Irmo Manie <irmo.ma...@gmail.com>
> ...
>
> read more »

Irmo Manie

unread,
Aug 12, 2010, 10:31:03 AM8/12/10
to akka...@googlegroups.com
Hi Juan,

I don't think that would really make sense, because you don't know what's going on then on that rabbitmq instance.
Who is using what exchanges, queue names, bindings, etc and then you can get weird unexplained behavior.

Especially for unit tests you want a clear and predicable environment to run your tests on.

Cheers,
Irmo


--

Santi Manninen

unread,
Aug 24, 2010, 1:37:48 PM8/24/10
to akka...@googlegroups.com
Hi,

Thought I'd just update this to say that the below problem has gone
away with the new AMQP API, because the stop()s work properly and the
connection is handled by the user, so I can stop() that too when I
need to. No more leaking threads or connections. Excellent work! :)

Cheers,

Santi

Irmo Manie

unread,
Aug 24, 2010, 1:46:50 PM8/24/10
to akka...@googlegroups.com
On Tue, Aug 24, 2010 at 7:37 PM, Santi Manninen <santi...@gmail.com> wrote:
Hi,

Thought I'd just update this to say that the below problem has gone
away with the new AMQP API, because the stop()s work properly and the
connection is handled by the user, so I can stop() that too when I
need to. No more leaking threads or connections.

Cool :)

And yes, stop on the connections stops the whole hierarchy of that connection (conn, channels, consumers, producers, etc).
If you have multiple connections you can also call AMQP.shutdownAll to stop all open connections.
The AMQP main supervisor will still stay alive so you can afterwards start new connections again.
 
Excellent work! :)

Cheers,

Santi


You're welcome. Thanks for using, testing and reporting your experiences here. Much appreciated! :)
Reply all
Reply to author
Forward
0 new messages