final ActorSystem system = ActorSystem.create();
ActorMaterializer materializer = ActorMaterializer.create(system);
final ConsumerSettings<SpecificRecord, SpecificRecord> consumerSettings =
ConsumerSettings.create(system, new SpecificAvroDeserializer(), new SpecificAvroDeserializer())
.withBootstrapServers("localhost:9092")
.withGroupId("group5")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final ProducerSettings<SpecificRecord, SpecificRecord> producerSettings =
ProducerSettings.create(system, new SpecificAvroSerializer(), new SpecificAvroSerializer())
.withBootstrapServers("localhost:9092");
Source<CommittableMessage<SpecificRecord, SpecificRecord>, Control> s = Consumer.committableSource(consumerSettings, Subscriptions.topics("sanitation")).asJava();
//Graph<SinkShape<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>>,Materialize>, Materialize>
s.map(new Function<ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>, ConsumerMessage.CommittableMessage<SpecificRecord,SpecificRecord>>() {
@Override
public CommittableMessage<SpecificRecord, SpecificRecord> apply(
CommittableMessage<SpecificRecord, SpecificRecord> msg)
throws Exception {
System.out.println("msg----------------" + msg);
return msg;
}
//}).via(Producer.flow(producerSettings)
//}).runWith(Producer.commitableSink(producerSettings), materializer);