Actors consuming messages from JMS queue

1,618 views
Skip to first unread message

Birju

unread,
Apr 3, 2012, 9:36:48 AM4/3/12
to Akka User List
Hello, I am new to Akka and have a question on its usage for a
following use case.

Use case: one common JMS queue has a lot of messages that need to
consumed and put into dedicated message queues based on the message
type. It feels like Akka can fit here to consume messages from a
common queue and parallelizing work to put into distinguish queues
based on the message type. since use case deals with putting messages
from one common queue to a specialized queue, should actors be used
within a transaction? since this usecase involve transactional
resource, actor might take bit longer than dealing with a simple
usecase with no transactions? Any inputs how best to design?

Thanks

√iktor Ҡlang

unread,
Apr 3, 2012, 9:54:19 AM4/3/12
to akka...@googlegroups.com
Sounds like a job for Apache Camel. (We have a port of the Akka Camel module coming up soon, but it's not officially released yet)

Cheers,



--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.




--
Viktor Klang

Akka Tech Lead
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

Birju

unread,
Apr 3, 2012, 9:57:57 AM4/3/12
to Akka User List
Thanks Viktor for prompt response.

I saw that camel integration has not been ported over to Akka 2.0 and
we needed to get this implementation going. Was really reluctant to
use lower version of Akka given 2.0 is a much improved version
(gathered that sense from reading documentation). In this situation
what best solution you can think of with Akka 2.0?

Thanks,
B
> Typesafe <http://www.typesafe.com/> - The software stack for applications
> that scale
>
> Twitter: @viktorklang

√iktor Ҡlang

unread,
Apr 3, 2012, 10:09:23 AM4/3/12
to akka...@googlegroups.com
On Tue, Apr 3, 2012 at 3:57 PM, Birju <birj...@gmail.com> wrote:
Thanks Viktor for prompt response.

You're most welcome
 

I saw that camel integration has not been ported over to Akka 2.0 and
we needed to get this implementation going. Was really reluctant to
use lower version of Akka given 2.0 is a much improved version
(gathered that sense from reading documentation). In this situation
what best solution you can think of with Akka 2.0?

Don't know the current status of the Camel port for 2.0, but build it locally and try it out?

Cheers,



--
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

Birju

unread,
Apr 3, 2012, 10:17:24 AM4/3/12
to Akka User List
Thanks. Unfortunately building locally to trying it out isn't an
option given tighten policies of the firm where I work. They have
blocked repos and really come after us if we deploy softwares that
aren't released. I appreciate your suggestion though.

Anything else you can think of? Can this not be achieved w/o camel
using Akka 2.0? Like having a Worker actor which consume message from
a common queue and then with pattern matching find out message type,
put the message into appropriate destination queue?

On Apr 3, 10:09 am, √iktor Ҡlang <viktor.kl...@gmail.com> wrote:

Akka Team

unread,
Apr 3, 2012, 10:20:38 AM4/3/12
to akka...@googlegroups.com
On Tue, Apr 3, 2012 at 4:17 PM, Birju <birj...@gmail.com> wrote:
Thanks. Unfortunately building locally to trying it out isn't an
option given tighten policies of the firm where I work. They have
blocked repos and really come after us if we deploy softwares that
aren't released. I appreciate your suggestion though.

Anything else you can think of? Can this not be achieved w/o camel
using Akka 2.0? Like having a Worker actor which consume message from
a common queue and then with pattern matching find out message type,
put the message into appropriate destination queue?

Sure, if you want you can always use raw JMS APIs.

Cheers,



--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

Birju

unread,
Apr 3, 2012, 10:31:54 AM4/3/12
to Akka User List
Your rock. I am impressed with your response time Victor:) Appreciate
it.

Now with this, I'll have to make actor transactional to perform this
use case? e.g. retrieve message from queue A-->check message type--
>put message into queue B. Should this be a synchronous actor?

On Apr 3, 10:20 am, Akka Team <akka.offic...@gmail.com> wrote:

Akka Team

unread,
Apr 3, 2012, 10:39:48 AM4/3/12
to akka...@googlegroups.com
On Tue, Apr 3, 2012 at 4:31 PM, Birju <birj...@gmail.com> wrote:
Your rock. I am impressed with your response time Victor:) Appreciate
it.

There's a rumor that "Warp speed" was originally named "Klang speed", but that I had my lawyers force them to change it.
 

Now with this, I'll have to make actor transactional to perform this
use case? e.g. retrieve message from queue A-->check message type--
>put message into queue B. Should this be a synchronous actor?

There is no such thing as a transactional mailbox (yet/still), so whatever transactionality you want needs to be either provided by you or the underlying JMS API you use. I'd probably start out with doing:

1) In preStart, send myself a ReceiveTimeout message
2) When I get a ReceiveTimeout message, I transactionally poll the queue, if I get something, I put it into the other queue and ack, then I send myself a new ReceiveTimeout message. If I get nothing, I set my context.receiveTimeout to my polling frequency as not to hog my thread forever.

Happy hAkking.

Birju

unread,
Apr 5, 2012, 12:23:17 AM4/5/12
to Akka User List
Eventually I ended up downgrading to settle with 1.3 to leverage akka
camel integration.

I have a question on creating mulitple instances of Mediation and
Producer actors for the following type of code (influenced from Akka
camel 1.3 doc)

class ConsumerActor(mediator: ActorRef) extends Actor with Consumer {
def receive = {

case msg: Message => mediator.forward(msg.setBodyAs[String])
}
....
}
class Mediator(producer: ActorRef) extends Actor {
def receive ={
....
case msg:Message =>producer.forward
}
}

And producer is
class ProducerActor extends Actor with Producer {
def endpointUri = "direct:welcome"
}

These actors are started as

val producer = actorOf[ProducerActor]
val mediator = actorOf(new Mediator(producer))
val consumer = actorOf(new ConsumerActor(mediator))

producer.start
mediator.start
consumer.start

I need to create multiple instances of producers and mediators actors.
With above setting (class parameters) it gets tricky to do following
since Mediator has class parameters of Producers.

val mediators = Vector.fill(5)(actorOf[???].start())

Should I find an another way to forward messages to avoid class
parameters and also achieve load balancing?
On Apr 3, 10:39 am, Akka Team <akka.offic...@gmail.com> wrote:

Viktor Klang

unread,
Apr 5, 2012, 6:42:44 AM4/5/12
to akka...@googlegroups.com
On Thu, Apr 5, 2012 at 6:23 AM, Birju <birj...@gmail.com> wrote:
Eventually I ended up downgrading to settle with 1.3 to leverage akka
camel integration.

I have a question on creating mulitple instances of Mediation and
Producer actors for the following type of code (influenced from Akka
camel 1.3 doc)

class ConsumerActor(mediator: ActorRef) extends Actor with Consumer {
def receive = {

   case msg: Message => mediator.forward(msg.setBodyAs[String])
 }
....
}
class Mediator(producer: ActorRef) extends Actor {
         def receive ={
            ....
            case msg:Message =>producer.forward
         }
}

And producer is
class ProducerActor extends Actor with Producer {
 def endpointUri = "direct:welcome"
}

These actors are started as

val producer = actorOf[ProducerActor]
 val mediator = actorOf(new Mediator(producer))
 val consumer = actorOf(new ConsumerActor(mediator))

Why doesn't the producer create the mediator and the mediator the consumer?

Cheers,

Birju

unread,
Apr 5, 2012, 7:49:15 AM4/5/12
to Akka User List
Viktor, thanks for the response.

consumer is a first endpoint which will retrieve the message from JMS
queue (as soon as other side puts a message in there) and then it'll
pass it on to a mediator which will have some business logic/
validations etc. And finally mediator will pass it on to Producer
actor which will have logic to figure out how many subscribers are
there and finally publish it to one or more queues. Consumer is just a
pass thru so okay to have one but now thinking why not have multiple
instances of that as well? let me know if flow is still unclear. Now
given earlier code snippet how to loadbalance them with class
parameters? forward is required as you can gather from the complete
flow I described above.


On Apr 5, 6:42 am, Viktor Klang <viktor.kl...@gmail.com> wrote:

Sean W

unread,
Apr 5, 2012, 9:04:42 AM4/5/12
to Akka User List
We were successful with akka/camel/jms using akka 1.3.1

We also have traditionally used camel's spring integration and you can
also do it that way by autowiring producerTemplates or
consumerTemplates into your akka actors and making those actors
@Component or @Configurable bean.
> > > To unsubscribe from this group, send email...
>
> read more »

Birju

unread,
Apr 5, 2012, 10:14:27 AM4/5/12
to Akka User List
Are you able to share any sample Sean? Also, I need to make JMS
consumer fault-tolerant. I am still sailing through Akka docs.

Thanks,
> > > > > > You received this message because you are...
>
> read more »

Sean W

unread,
Apr 5, 2012, 1:28:20 PM4/5/12
to Akka User List
The integrated akka/camel solution is best but this should work as
well in the absence of that.

1) You need to add spring 3.1x and camel-spring and camel libraries to
your project.
2) You will need runtime and test time spring xml context that also
contains a camel-context that would look like this if you use active
mq for your messaging.

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="http://www.springframework.org/schema/
beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">

<!-- Set up spring to scan your classpath for beans etc -->
<context:spring-configured/>
<context:annotation-config/>
<context:component-scan base-package="com.myco.myapp"/>

<!-- base JMS connection factory -->
<bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="localhost:8161"
p:userName="user"
p:password="password" />

<!-- base JMS pooled connection factory -->
<bean id="jmsPooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory" destroy-
method="stop"
p:maxConnections="8"
p:maximumActive="8"
p:connectionFactory-ref="jmsConnectionFactory"/>

<bean id="jmsConfig"
class="org.apache.camel.component.jms.JmsConfiguration"
p:connectionFactory-ref="jmsPooledConnectionFactory"
p:transacted="!{jms.transacted}"
p:concurrentConsumers="8"/>

<!-- camel activemq component to connect to the broker -->
<bean id="jms"
class="org.apache.activemq.camel.component.ActiveMQComponent"
p:configuration-ref="jmsConfig"/>

<!-- The camel context -->
<camel:camelContext id="my-context">

<camel:template id="producerTemplate"/>

<!-- Sample JMS Endpoint -->
<camel:endpoint id="q-my-jms-queue" uri="jms:my.jms.queue"/>

<!-- If this were a test context located in src/test/resources you
can test your logic using this: -->
<camel:endpoint id="q-my-jms-queue" uri="seda:my.jms.queue"/>
</camel:camelContext>
</beans>

3) An actor that consumed a message would look like this:

package com.mycom.myapp

import akka.actor._

@Configurable
class ConsumerActor extends Actor {

/**
* This is where the actor receives its messages.
*/
def receive = {
case someObject: SomeObject => {
// Do your work
}
}

@Consume(ref = "q-my-jms-queue")
def consumeMessage(someObject: SomeObject) {
// Send to self for processing
self ! someObject
> > > > > > > Groups...
>
> read more »

Birju

unread,
Apr 5, 2012, 2:46:11 PM4/5/12
to Akka User List
Victor,

Below is how latest snippets looks like. Now ConsumerActor (first end
point) creates a mediator actor and mediator actor creates a producer
actor. Now I'll have concurrent Actors both for Mediator and Producer
when ConsumerActor sends messages to Mediator. so concurrent actors
for both mediator and producer should work fine but have another
question if you can help.

ConsumerActor picks up message from a camel JMS endpoint, if producer
was supposed to publish to three queues and it was able to publish to
2 queues out of 3 (publishing to third queue failed for some reason)
then we need to keep that message in JMS queue and redeliver (w/o any
filtering logic it'll redeliver to all the queues, which is fine). Do
I need to have some kind of aggregator actor for producers and send
some ACK back to mediator and all the way to consumer? Any inputs?

Thanks,

class ConsumerActor extends Actor with Consumer{
def reveive={
case msg:Message=>{
val mediator = actorOf[Mediator].start
mediator ! msg.setBodyAs[String]
}

}

}

class Mediator extends Actor{
def receive={
case msg:Message =>{
....some logic
val queueNames = ListOfQueueNames;
for(queueName <- queueNames){
val producer = actorOf(new
ProducerActor(queueName)).start
producer ! msg
}
}
}
}
class ProducerActor(uri:String) extends Actor with Producer{
def endpointUri = uri
}

object Booter extends Application{
actorOf[ConsumerActor].start
}


On Apr 5, 6:42 am, Viktor Klang <viktor.kl...@gmail.com> wrote:

Akka Team

unread,
Apr 6, 2012, 5:25:17 PM4/6/12
to akka...@googlegroups.com
Use forward to retain the original sender so you can reply to the correct place?

Cheers,

Piotr Gabryanczyk

unread,
Apr 19, 2012, 3:55:12 PM4/19/12
to akka...@googlegroups.com
This is scary...
Way too much code for my taste... :)

Check this out:
class JmsConsumer extends Consumer{

def endpointUri = "jms:queue:Orders"

protected def receive = {
case msg => println("Received [%s]" format msg)
}
}

object JmsConsumerApp extends App{
val sys = ActorSystem("test")
val connectionFactory = ...
CamelExtension(sys).context.addComponent(new JmsComponent(new JmsConfiguration(connectionFactory)))
sys.actorOf(Props[JmsConsumer])
}

And more here:
http://skillsmatter.com/podcast/scala/akka-2-x

Piotr Gabryanczyk
-
Blog: http://blog.scala4java.com
Twitter: @piotrga

Jonas Boner

unread,
Apr 19, 2012, 5:11:52 PM4/19/12
to akka...@googlegroups.com
On Thu, Apr 19, 2012 at 9:55 PM, Piotr Gabryanczyk <pio...@gmail.com> wrote:
This is scary...
Way too much code for my taste... :)

Check this out:
class JmsConsumer extends Consumer{

 def endpointUri = "jms:queue:Orders"

 protected def receive = {
   case msg => println("Received [%s]" format msg)
 }
}

object JmsConsumerApp extends App{
 val sys = ActorSystem("test")
 val connectionFactory = ...
 CamelExtension(sys).context.addComponent(new JmsComponent(new JmsConfiguration(connectionFactory)))
 sys.actorOf(Props[JmsConsumer])
}

Very slick. 



--
Jonas Bonér
CTO
Typesafe - The software stack for applications that scale
Phone: +46 733 777 123
Twitter: @jboner

Reply all
Reply to author
Forward
0 new messages