akka stream kafka newbie question

253 views
Skip to first unread message

Shannon Ma

unread,
Apr 27, 2017, 7:36:44 PM4/27/17
to Akka User List
Hi,

I am new to akka stream kafka, and am trying to follow some the examples to get start.

I try to use

Source.runWith(Sink.ignore()) or Soruce.runWith(Producer.commitableSink(producerSettings) 

and get this compilation error

The method runWith(Graph<SinkShape<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>>,M>, Materializer) in the type Source<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>,Consumer.Control> is not applicable for the arguments (Sink<ProducerMessage.Message<SpecificRecord,SpecificRecord,ConsumerMessage.Committable>,CompletionStage<Done>>, null)


The method expects Graph<>, but examples put Sink, what am i missing?

Thanks
Shannon

Konrad Malawski

unread,
Apr 27, 2017, 9:52:03 PM4/27/17
to akka...@googlegroups.com, Shannon Ma
A Sink is a Graph; as is a Flow, and a Source - they're all graphs, of a specific shape.
Here, the method takes a Graph<SinkShape, so yeah you can pass in a Sink.

Seems you're not passing in the materializer?
In Java you have to do that explicitly:


someSource.runWith(sink, materializer)

-- 
Konrad `ktoso` Malawski
Akka @ Lightbend
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Shannon Ma

unread,
Apr 28, 2017, 9:32:56 AM4/28/17
to Akka User List, shan...@gmail.com
Thanks, yes i am putting the materializer, 

runWith(Producer.commitableSink(producerSettings), materializer);

Is this a java and/or scala version issue?

i am using 

<dependency>
 <groupId>com.typesafe.akka</groupId>
 <artifactId>akka-stream-kafka_2.11</artifactId>
 <version>0.11-RC2</version>
</dependency>


and scala library 2.11.4

Shannon Ma

unread,
Apr 28, 2017, 9:33:00 AM4/28/17
to Akka User List, shan...@gmail.com
Here is my code


final ActorSystem system = ActorSystem.create();
ActorMaterializer materializer = ActorMaterializer.create(system);
final ConsumerSettings<SpecificRecord, SpecificRecord> consumerSettings =
   ConsumerSettings.create(system, new SpecificAvroDeserializer(), new SpecificAvroDeserializer())
 .withBootstrapServers("localhost:9092")
 .withGroupId("group5")
 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final ProducerSettings<SpecificRecord, SpecificRecord> producerSettings =
ProducerSettings.create(system, new SpecificAvroSerializer(), new SpecificAvroSerializer())
 .withBootstrapServers("localhost:9092");
 
Source<CommittableMessage<SpecificRecord, SpecificRecord>, Control> s = Consumer.committableSource(consumerSettings, Subscriptions.topics("sanitation")).asJava();
//Graph<SinkShape<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>>,Materialize>, Materialize>
s.map(new Function<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>, ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>>() {

@Override
public CommittableMessage<SpecificRecord, SpecificRecord> apply(
CommittableMessage<SpecificRecord, SpecificRecord> msg)
throws Exception {
System.out.println("msg----------------" + msg);
return msg;
}
//}).via(Producer.flow(producerSettings)
//}).runWith(Producer.commitableSink(producerSettings), materializer);

Shannon Ma

unread,
Apr 28, 2017, 10:02:35 AM4/28/17
to Akka User List, shan...@gmail.com
Looks like it is the message type, from another example when i do this, it passes compilation.



s.map(new Function<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>, ProducerMessage.Message<SpecificRecord,SpecificRecord, ConsumerMessage.Committable>>() {

public Message<SpecificRecord, SpecificRecord, Committable> apply(
CommittableMessage<SpecificRecord, SpecificRecord> msg)
throws Exception {
System.out.println("msg----------------" + msg);
return new ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>(
       new ProducerRecord<>("topic2", msg.record().value()), msg.committableOffset());
}
}).runWith(Producer.commitableSink(producerSettings), materializer);

Shannon Ma

unread,
Apr 28, 2017, 10:19:40 AM4/28/17
to Akka User List, shan...@gmail.com
My Kafka version : 0.10.1.1, i should use akka-stream-kafka_2.12, right?

Shannon Ma

unread,
Apr 28, 2017, 10:38:04 AM4/28/17
to Akka User List, shan...@gmail.com
With 2.12, i had to update the code,


return new ProducerMessage.Message<SpecificRecord, SpecificRecord, ConsumerMessage.Committable>(
new ProducerRecord<SpecificRecord, SpecificRecord>("akkatest", msg.record().key(), msg.record().value()), msg.committableOffset());


but getting 

Exception in thread "main" java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V
at akka.util.Timeout.<init>(Timeout.scala:13)
at akka.actor.ActorSystem$Settings.<init>(ActorSystem.scala:328)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:683)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:245)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:288)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:233)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:224)
at akka.actor.ActorSystem$.create(ActorSystem.scala:159)
at akka.actor.ActorSystem.create(ActorSystem.scala)
at com.javacodegeeks.camel.AkkaTest.main(AkkaTest.java:36)

Akka Team

unread,
Apr 28, 2017, 11:07:55 AM4/28/17
to Akka User List
You should combine the same versions of Scala. If you use akka-stream-kafka_2.12 then the Akka version you use (and all other libraries written in Scala you use in fact) must have the same version of Scala since the major versions of Scala (2.11, 2.12) are not binary compatible. In addition to this you must also combine this with a version of Akka that the driver works with. Easiest is probably to just use the latest reactive-kaka and let it pull in the right versions of Akka as transitive dependencies, you can do that by just having:

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream-kafka_2.11</artifactId>
    <version>0.15</version>
</dependency>

In your maven pomfile, if you want to explicitly list Akka as well, you could then see in your IDE which versions that kaka-stream-kafka pulls in and add those versions.

--
Johan
Akka Team

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

Shannon Ma

unread,
Apr 28, 2017, 2:32:25 PM4/28/17
to Akka User List
Thanks, thats what i have


<dependency>
 <groupId>com.typesafe.akka</groupId>
 <artifactId>akka-stream-kafka_2.12</artifactId>
 <version>0.15</version>
</dependency>


and i can see i only have actor 2.12 (which is 2.4.17)

Thanks
Shannon
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

Shannon Ma

unread,
May 1, 2017, 1:09:32 PM5/1/17
to Akka User List
After i cleaned up my maven repository, it is working now, though i did not narrow down to which lib/jar has the conflict.


Thanks
Shannon
Reply all
Reply to author
Forward
0 new messages