Hi Huan,
What you need is a back-channel from the Cassandra Sink to the Kafka Source. In Akka Streams the way to solve this is to create an explicit backward channel with streams.
For example if you have an async Kafka API that gives back the result of your read as a Future, together with some metadata usable for acking, you can model it like this (pseudocode, I don't know how Kafka API looks like):
val kafkaReader: Flow[AckId, (MyData, AckId), Unit] = Flow[AckId].mapAsync(parallelism = 4) { ack =>
kafka.acknowledge(ack)
kafka.readAsync() // Returns Future[(MayData, AckId)]
}
val cassandraWriter: Flow[(MyData, AckId), AckId, Unit] = Flow[(MayData, AckId)].mapAsync(parallelism = 4) { case (data, ackid) =>
cassandra.writeAsync(data).map(_ => ackid) // Returns Future[AckId]
}
kafkaReader.join(cassandraWriter).run() // Close the loop and start
The above code does not handle failed acknowledgments, you need to add that handling yourself (for example instead of sending Ackid from cassandra Flow, you send a Success(Ackid) or Failure(AckId)). Also, the above cycle will not run, because there are no initial acknowledgments in the loop to start it. You can do this by adding 4 dummy acknowledgments:
kafkaReader.join(cassandraWriter ++ Source(List(dummyAck1, ...))).run()
(or you can alternatively concat kafkaReader with a Source that injects 4 reads from Kafka)
-Endre