Hello,
We are having a requirement that if a consumer is slower than producer then discard all the elements that cannot be consumed and whenever the consumer gets ready, feed the latest element from producer.
We tried an approach as follows:
Source.actorRef(0, OverflowStrategy.dropHead) // actor receives data at every 10 milliseconds
.runWith {
println("data received")
Thread.sleep(1000) // mimic consumer processing data in every 1 second
}
We shrank the buffer size to 1 (minimal possible) with following settings
private val actorMaterializerSettings = ActorMaterializerSettings(actorSystem).withInputBuffer(1, 1)
With this buffer size, Sink pulls data 1 to consume and data 2 to put in buffer at initialization.
While data 1 is getting processed we are dropping data from producer.
When data 1 gets processed after 1000 milliseconds (1 second) ideally I should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I receive data 2 from the buffer. data 2 in our domain is extremely useless as it is stale.
Is there a way to disable buffer at Sink totally and always pull latest data from Source ?