What is the best way to integrate kafka and akka

3,712 views
Skip to first unread message

Filippo De Luca

unread,
Apr 12, 2014, 5:17:29 AM4/12/14
to akka...@googlegroups.com
Hi,
I need to have actors reacting to some events arriving from some kafka topics. I would like leverage on the akka supervisor mechanism to keep the kafka consumer app and running. 

I know that an actor should never block waiting for some IO, but the kafka consumer is blocking so having an actor running a kafka consumer doesn't seem a good option, also because it will never release the thread.

by other hand if I will have a thread, not controlled by akka, to run the kafka consumer, it will not leverage on the akka supervision.

My idea is to use an actor, with a dedicated dispatcher, to run the kafka consumer. The problem is: if this actor is blocked by the kafka stream, how can I make this actor reacting to some messages? I am thinking using a timeout on kafka consumer and resent a message like Consume when the timeout is elapsed, so I can consume other messages. It works if the kafka stream doesn't have always data available (in the timeout window), but doesn't work otherwise. 

Another option may implement a mailbox using kafka.

What is the best approach in your opinion? Thanks.  

Filippo De Luca

unread,
Apr 14, 2014, 11:15:19 AM4/14/14
to akka...@googlegroups.com

Hi,
No one can help or maybe the question is too silly?

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

√iktor Ҡlang

unread,
Apr 14, 2014, 11:28:25 AM4/14/14
to Akka User List
Should it poll if nobody is interested in reading anything?
--
Cheers,

Filippo De Luca

unread,
Apr 14, 2014, 12:54:31 PM4/14/14
to akka...@googlegroups.com

Yes, it should.

Filippo De Luca

unread,
Apr 14, 2014, 1:59:20 PM4/14/14
to akka...@googlegroups.com

I think I will need something very similar to zeromq extension, what do you think?

√iktor Ҡlang

unread,
Apr 14, 2014, 2:47:26 PM4/14/14
to Akka User List
Do you want to read & write etc?

Filippo De Luca

unread,
Apr 14, 2014, 4:02:11 PM4/14/14
to akka...@googlegroups.com

Yes polling messages and send

√iktor Ҡlang

unread,
Apr 14, 2014, 4:09:22 PM4/14/14
to Akka User List
Sounds like an IO extension to me!

Scott Clasen

unread,
Apr 14, 2014, 4:14:18 PM4/14/14
to akka...@googlegroups.com
Im actually working on an actor based kafka consumer as we speak

You will have to configure the consumer to not block indefinitely 

    "consumer.timeout.ms" -> "500"

then you can have a non blocking hasNext()

 def hasNext() = try {
    msgIterator.hasNext()
  } catch {
    case cte: ConsumerTimeoutException => false
  }

once you have these its down to modeling how you want the actors to react to messages being available (or not). FSMs are great here but not knowing what you want to do its harder to offer more advice


On Monday, April 14, 2014 1:02:11 PM UTC-7, Filippo De Luca wrote

Richard Rodseth

unread,
Apr 14, 2014, 4:29:16 PM4/14/14
to akka...@googlegroups.com
Here's the Kafka TCP protocol


I believe they're re-doing the high-level consumer and produce APIs as we speak.

Lee Mighdoll

unread,
Apr 14, 2014, 5:35:03 PM4/14/14
to akka...@googlegroups.com
Nice idea with the timeout. I wrote a kafka -> rx.observable wrapper the other week too (KafkaReader.scala). 

I'll ponder adding your timeout trick to that interface too. Have you tried it under loaded conditions? Our ops folks have been in the habit of tuning up the kafka consumer timeout - I'm not sure if a short timeout will create other problems.

Hopefully a truly async interface will come around from the kafka core team.

Cheers,
Lee 

On Mon, Apr 14, 2014 at 1:14 PM, Scott Clasen <scott....@gmail.com> wrote:

Scott Clasen

unread,
Apr 14, 2014, 5:59:03 PM4/14/14
to akka...@googlegroups.com, l...@underneath.ca
Hey man! 

So far so good, under load you dont have to worry about the timeout ever happening really.

The client is structured as a supervisor actor that is responsible for committing the offsets periodically or after N messages, which spins up a number of child Stream actors, each of which manage a single KafkaStream/MessageIterator.  

I have tested with a topic with 32 partitions, in 2 cases. One where all topics have a few million messages, and one where only a single partiton of the 32 has a few million messages, and both cases are working just fine.

Cheers!

Filippo De Luca

unread,
Apr 15, 2014, 2:37:54 PM4/15/14
to akka...@googlegroups.com, l...@underneath.ca
The thing is, under the load, the actor will have always messages so it will never process other messages. That may fit the user case anyway, but may also not.

Btw it was one of my initial idea.
Filippo De Luca
---------------------

Scott Clasen

unread,
Apr 15, 2014, 2:59:13 PM4/15/14
to akka...@googlegroups.com, l...@underneath.ca
Not necesarily true depending how you model it.  Im not suggesting sending all messages received from the stream to the actor, but to send control messages to the actor to tell it what to do with the stream.

For instance, model such that...

streamActor ! Receive  

will make the actor poll the stream and do something if there is a result, for instance.  Once you receive a message, do something with it, then send

self ! Receive

to continue processing the stream.

in the mean time between processing the received message and the `self ! Receive`, the actor could have received other messages which it will process before processing the next Receive message.

Filippo De Luca

unread,
Apr 15, 2014, 4:08:13 PM4/15/14
to akka...@googlegroups.com, l...@underneath.ca
Oh I see, 
Of course between one receive and another one, the mailbox could receive other messages. And it will be process one for each Receive. 

Got it, It is cool. Thanks
Reply all
Reply to author
Forward
0 new messages