akka kafka stream parallel processing

541 views
Skip to first unread message

Shannon Ma

unread,
May 1, 2017, 1:22:04 PM5/1/17
to Akka User List
Hi,

How does akka kafka stream (or how can i do)  handle parallel processing based on topic partitions, if i run multiple threads within an instance or run multiple instances, how does it rebalance and/or failover if one thread dies or if i add/remove instances.


Thanks
Shannon 

Shannon Ma

unread,
May 1, 2017, 3:48:51 PM5/1/17
to Akka User List
In my testing i got this simple source->map->sink working, 


Source<CommittableMessage<SpecificRecord, SpecificRecord>, Control> s = Consumer.committableSource(consumerSettings, Subscriptions.topics("sanitation")).asJava();
s.map(new Function<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>, ProducerMessage.Message<SpecificRecord,SpecificRecord, ConsumerMessage.Committable>>() {

public Message<SpecificRecord, SpecificRecord, Committable> apply(
CommittableMessage<SpecificRecord, SpecificRecord> msg)
throws Exception {
System.out.println(Thread.currentThread().getName()"msg----------------" + msg);
return new ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>(
new ProducerRecord<SpecificRecord, SpecificRecord>("akkatest", msg.record().key(), msg.record().value()), msg.committableOffset());
}
}).runWith(Producer.commitableSink(producerSettings), materializer);




now i want to test parallel by partition, i dont know what to put in the CompletionStage, can someone help or point me some doc/examples?


TopicPartition par = new TopicPartition("test", 3);

Source<Tuple2<TopicPartition, akka.stream.scaladsl.Source<CommittableMessage<SpecificRecord, SpecificRecord>, NotUsed>>, Control> s 
= Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("test")).asJava();
s.mapAsync(3, new Function<Tuple2<TopicPartition,Source<CommittableMessage<SpecificRecord,SpecificRecord>,NotUsed>>, 
CompletionStage<ProducerMessage.Message<SpecificRecord,SpecificRecord, ConsumerMessage.Committable>>>() {

public CompletionStage<Message<SpecificRecord, SpecificRecord, Committable>> apply(
Tuple2<TopicPartition, Source<CommittableMessage<SpecificRecord, SpecificRecord>, NotUsed>> param)
throws Exception {
// TODO Auto-generated method stub
return null;
}

});

Thanks
Shannon

Shannon Ma

unread,
May 1, 2017, 6:02:05 PM5/1/17
to Akka User List
---made some changes, still with mapAsync, in the apply() the input is Source not Message, how can i get the message from it, i cannot find anything.


TopicPartition par = new TopicPartition("test", 3);
 
Source<Tuple2<TopicPartition, akka.stream.scaladsl.Source<CommittableMessage<SpecificRecord, SpecificRecord>, NotUsed>>, Control> s 
= Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("test")).asJava();
  
s.mapAsync(3, new Function<Tuple2<TopicPartition,akka.stream.scaladsl.Source<CommittableMessage<SpecificRecord,SpecificRecord>,NotUsed>>, 
CompletionStage<ProducerMessage.Message<SpecificRecord,SpecificRecord, ConsumerMessage.Committable>>>() {

public CompletionStage<Message<SpecificRecord, SpecificRecord, Committable>> apply(
Tuple2<TopicPartition, akka.stream.scaladsl.Source<CommittableMessage<SpecificRecord, SpecificRecord>, NotUsed>> param)
throws Exception {
System.out.println("thread == " + Thread.currentThread().getName() + " part is ----------------" + param._1);
System.out.println("thread == " + Thread.currentThread().getName() + " value  is ----------------" + param._2.asJava());
//System.out.println("thread == " + Thread.currentThread().getName() + "msg----------------" + msg);
return null;
//return new ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>(
// new ProducerRecord<SpecificRecord, SpecificRecord>("akkatest", param._2.record().key(), msg.record().value()), msg.committableOffset());
}

}).runWith(Producer.commitableSink(producerSettings), materializer);

Michal Borowiecki

unread,
May 1, 2017, 6:29:39 PM5/1/17
to akka...@googlegroups.com

I think you got your imports wrong. Given the call to .asJava(), I'm guessing you've imported Consumer from the scaladsl.

If you use javadsl you'll get akka.stream.javadsl.Source, not akka.stream.scaladsl.Source in the Tuple2.


committablePartitionedSource() is giving you a Source of tuples, one per partition, the second element of each tuple is the actual Source of messages.

If you just want to a simple app, then committableSource() directly gives you a Source of messages, without the breakdown per partition which you probably don't need at this stage, but it's up to you.


Examples are in the docs:

http://doc.akka.io/docs/akka-stream-kafka/current/consumer.html


Hope that helps,

Michal

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
Michal Borowiecki
Senior Software Engineer L4
T: +44 208 742 1600


+44 203 249 8448


 
E: michal.b...@openbet.com
W: www.openbet.com
OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK

This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postm...@openbet.com and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612

Shannon Ma

unread,
May 2, 2017, 12:37:04 PM5/2/17
to Akka User List
Thanks you are right, once i used all javadsl classes it gets compiled. 

I got the simple app works, i like to try how the partition works. The inner apply() is similar to the simple logic (no partition), now what i am not sure if 

1, what should completionStage contain, i am putting

CompletionStage<ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>>>

not sure if this is right.If so how/what should i return. Or should  i return CompletionStage<Done>, that leads to my 2nd question

2, where should i put runwith(), in the outer apply() or inner(), i have been looking at the examples, but still not clear to me.



Source<Pair<TopicPartition, Source<CommittableMessage<SpecificRecord, SpecificRecord>, NotUsed>>, Control> s 
= Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("sanitation"));

s.mapAsync(3, new Function<Pair<TopicPartition,Source<CommittableMessage<SpecificRecord,SpecificRecord>,NotUsed>>, 
CompletionStage<ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>>>() {

public CompletionStage<ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>> apply(
Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<SpecificRecord, SpecificRecord>, NotUsed>> param)
throws Exception {
param.second().map(new Function<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>, 
ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>>() {

public ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable> apply(
CommittableMessage<SpecificRecord, SpecificRecord> msg)
throws Exception {
System.out.println(Thread.currentThread().getName() + "msg----------------" + msg);
return new ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>(
new ProducerRecord<SpecificRecord, SpecificRecord>("akkatest", msg.record().key(), msg.record().value()), msg.committableOffset());
}
});
 
CompletableFuture<ProducerMessage.Message<SpecificRecord,SpecificRecord,Committable>> fu = new CompletableFuture<ProducerMessage.Message<SpecificRecord,SpecificRecord,Committable>>();;
return fu;
}
}).runWith(Producer.commitableSink(producerSettings), materializer);
   

Shannon Ma

unread,
May 2, 2017, 1:23:44 PM5/2/17
to Akka User List
The above code does not seem to consume any message, from the log 

12:04| INFO | AbstractCoordinator.java 349 | Successfully joined group part1 with generation 1
12:04| INFO | ConsumerCoordinator.java 225 | Setting newly assigned partitions [sanitation-1, sanitation-0, sanitation-2] for group part1
12:04| DEBUG | KafkaConsumer.java 1358 | Pausing partition sanitation-1
12:04| DEBUG | KafkaConsumer.java 1358 | Pausing partition sanitation-0
12:04| DEBUG | KafkaConsumer.java 1358 | Pausing partition sanitation-2

12:04| DEBUG | ConsumerCoordinator.java 708 | Group part1 fetching committed offsets for partitions: [sanitation-1, sanitation-0, sanitation-2]
12:04| DEBUG | ConsumerCoordinator.java 751 | Group part1 has no committed offset for partition sanitation-1
12:04| DEBUG | ConsumerCoordinator.java 751 | Group part1 has no committed offset for partition sanitation-0
12:04| DEBUG | ConsumerCoordinator.java 751 | Group part1 has no committed offset for partition sanitation-2

12:04| DEBUG | Fetcher.java 340 | Resetting offset for partition sanitation-1 to earliest offset.
12:04| DEBUG | Fetcher.java 340 | Resetting offset for partition sanitation-0 to earliest offset.
12:04| DEBUG | Fetcher.java 340 | Resetting offset for partition sanitation-2 to earliest offset.

12:04| DEBUG | Fetcher.java 583 | Fetched {timestamp=-1, offset=58} for partition sanitation-1
12:04| DEBUG | Fetcher.java 583 | Fetched {timestamp=-1, offset=16} for partition sanitation-0
12:04| DEBUG | Fetcher.java 583 | Fetched {timestamp=-1, offset=16} for partition sanitation-2

12:04| DEBUG | KafkaConsumer.java 1358 | Pausing partition sanitation-1
12:04| DEBUG | KafkaConsumer.java 1358 | Pausing partition sanitation-0
12:04| DEBUG | KafkaConsumer.java 1358 | Pausing partition sanitation-2

Shannon Ma

unread,
May 3, 2017, 11:21:39 AM5/3/17
to Akka User List
Many examples using lambda expression, like this


Consumer.Control c =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
.map(pair -> pair.second().via(business()).toMat(Sink.ignore(), Keep.both()).run(materializer))
.mapAsyncUnordered(maxPartitions, (pair) -> pair.second()).to(Sink.ignore()).run(materializer);


but mapAsyncUnordered returns CompletionStage<Object>, while pair.second() is Source, did i miss anything?

Thanks
Shannon

Shannon Ma

unread,
May 4, 2017, 2:31:10 PM5/4/17
to Akka User List
Got it working with this, still try to understand the last part (runForEach)




s.mapAsyncUnordered(3, new Function<Pair<TopicPartition,Source<CommittableMessage<SpecificRecord,SpecificRecord>,NotUsed>>, 
CompletionStage<Done>>() {

public CompletionStage<Done> apply(
Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<SpecificRecord, SpecificRecord>, NotUsed>> param)
throws Exception {
System.out.println("----------------part -----------------" + param.first().partition());
   Source<Message<SpecificRecord, SpecificRecord, Committable>, NotUsed> f = 
    param.second().map(new Function<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>, 
ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>>() {

public ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable> apply(
CommittableMessage<SpecificRecord, SpecificRecord> msg)
throws Exception {
//System.out.println(Thread.currentThread().getName() + "msg---------------------------------------------------" + msg);
System.out.println(Thread.currentThread().getName() + "msg---------------------------------------------------" + msg.record().value().getClass());
return new ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>(
new ProducerRecord<SpecificRecord, SpecificRecord>("akkatest", msg.record().key(), msg.record().value()), msg.committableOffset());
}
});
 
   return f.runWith(Producer.commitableSink(producerSettings), materializer);
   
}
}).runForeach(new Procedure<Done>() {
@Override
public void apply(Done param) throws Exception {
}
}, materializer);

Shannon Ma

unread,
May 4, 2017, 6:21:18 PM5/4/17
to Akka User List
Hi,

Some more newbie questions, so to keep my testing/learning, i want to use actor to process message. So in my map() i invoke the actor. 

1, is this a good use of actor here?
2, Await.result is blocking, in this case is it a good idea to do non-blocking (or possible)?


Thanks
Shannon






public ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable> apply(
CommittableMessage<SpecificRecord, SpecificRecord> msg)
throws Exception {
//logger.debug(Thread.currentThread().getName() + "msg---------------------------------------------------" + msg);
logger.debug(Thread.currentThread().getName() + "  msg---------------------------------------------------" + msg.record().value().getClass());
final Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
Future<Object> future = Patterns.ask(tpactor, msg, timeout);
logger.debug(Thread.currentThread().getName() + "  asking---------------------------------------------------" + msg.record().value().getClass());
CommittableMessage<SpecificRecord, SpecificRecord> outmsg = (CommittableMessage<SpecificRecord, SpecificRecord>) Await.result(future, timeout.duration());
logger.debug(Thread.currentThread().getName() + "  back---------------------------------------------------" + outmsg.record().value().getClass());
return new ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>(
new ProducerRecord<SpecificRecord, SpecificRecord>("akkatest2", outmsg.record().key(), outmsg.record().value()), outmsg.committableOffset());
}

Shannon Ma

unread,
May 16, 2017, 12:43:38 PM5/16/17
to Akka User List
hi, 

In the below code, my outmsg from the result, in one case, i need to flat it to multiple output messages, i am stuck as where/how can i do it, really appreciate if someone can help.


Thanks
Shannon



Source<Pair<TopicPartition, Source<CommittableMessage<SpecificRecord, SpecificRecord>, NotUsed>>, Control> s 
= Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("akkatest"));

s.mapAsyncUnordered(3, new Function<Pair<TopicPartition,Source<CommittableMessage<SpecificRecord,SpecificRecord>,NotUsed>>, 
CompletionStage<Done>>() {

public CompletionStage<Done> apply(
Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<SpecificRecord, SpecificRecord>, NotUsed>> param)
throws Exception {
logger.debug("----------------part -----------------" + param.first().partition());
   Source<Message<SpecificRecord, SpecificRecord, Committable>, NotUsed> f = 
    param.second().map(new Function<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>, 
ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>>() {

public ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable> apply(
CommittableMessage<SpecificRecord, SpecificRecord> msg)
throws Exception {
//logger.debug(Thread.currentThread().getName() + "msg---------------------------------------------------" + msg);
logger.debug(Thread.currentThread().getName() + "  msg---------------------------------------------------" + msg.record().value().getClass());
final Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
Future<Object> future = Patterns.ask(tpperactor, msg, timeout);
logger.debug(Thread.currentThread().getName() + "  asking---------------------------------------------------" + msg.record().value().getClass());
List<CommittableMessage<SpecificRecord, SpecificRecord>> outlist = new ArrayList<>();
CommittableMessage<SpecificRecord, SpecificRecord> outmsg = (CommittableMessage<SpecificRecord, SpecificRecord>) Await.result(future, timeout.duration());
logger.debug(Thread.currentThread().getName() + "  back---------------------------------------------------" + outmsg.record().value().getClass());
ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable> result =
new ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>(
new ProducerRecord<SpecificRecord, SpecificRecord>("akkatest2", outmsg.record().key(), outmsg.record().value()), outmsg.committableOffset());
return result; 
Reply all
Reply to author
Forward
0 new messages