[Akka-Streams] Want to always receive latest element in Sink

94 views
Skip to first unread message

sal...@thoughtworks.com

unread,
Jan 25, 2018, 2:03:11 AM1/25/18
to Akka User List
Hello,

We are having a requirement that if a consumer is slower than producer then discard all the elements that cannot be consumed and whenever the consumer gets ready, feed the latest element from producer.

We tried an approach as follows:

Source.actorRef(0, OverflowStrategy.dropHead)       // actor receives data at every 10 milliseconds
.runWith {
   println("data received")
   Thread.sleep(1000)               // mimic consumer processing data in every 1 second
}

We shrank the buffer size to 1 (minimal possible) with following settings

private val actorMaterializerSettings = ActorMaterializerSettings(actorSystem).withInputBuffer(1, 1)

With this buffer size, Sink pulls data 1 to consume and data 2 to put in buffer at initialization.

While data 1 is getting processed we are dropping data from producer.

When data 1 gets processed after 1000 milliseconds (1 second) ideally I should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I receive data 2 from the buffer. data 2 in our domain is extremely useless as it is stale.

Is there a way to disable buffer at Sink totally and always pull latest data from Source ?


Tal Pressman

unread,
Jan 25, 2018, 2:33:41 AM1/25/18
to Akka User List
Hi,

I don't know if there is any built-in graph that does this, but you can implement your own graph stage for this fairly easily.
You just need a stage with an Option[Value], in the onPush you either push the value downstream or store it, and in any case you pull. In the onPull you just have to check the option, and if it's defined push it (you will also need to pull the first time onPull is called to "seed" the demand).

HTH,
Tal

Johannes Rudolph

unread,
Jan 25, 2018, 11:33:07 AM1/25/18
to Akka User List
Hi,

in akka-stream, processing is usually run in a fused fashion, i.e. without further configuration one stream will run in a single actor so all operations are run sequentially. In such a synchronous scenario, there's little room for elements to ever get dropped because the actorRef stage basically always needs to wait for the consumer stage to finish before it can do its own work. At that point the `foreach` stage already can process the next element. Fused processing also means that `Thread.sleep` is bad thing to do as it will block stream infrastructure and dispatcher threads from doing their work.

Try using `mapAsync()` with `akka.pattern.after` to wait (or actually do processing) without blocking infrastructure and it will probably start to work.

Johannes

sal...@thoughtworks.com

unread,
Jan 31, 2018, 10:11:20 AM1/31/18
to Akka User List
Thanks for the answers @Tel Pressman and @Johannes Rudolph. I will try these approaches and write it here if they work. Thanks again.

Arnout Engelen

unread,
Feb 7, 2018, 7:56:11 AM2/7/18
to akka...@googlegroups.com
Hello Saloni,

I think it would be helpful to have a more realistic example than doing "Thread.sleep(1000)" in the sink.

Could we unpack what this sleep is intended to mimic in your 'real' application? Is it for example doing CPU-intensive data parsing or perhaps some kind of IO?


Kind regards,

Arnout

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Arnout Engelen

Johan Andrén

unread,
Feb 8, 2018, 10:54:35 AM2/8/18
to Akka User List
You should be able to allow a faster upstream to continue, while emitting the latest value whenever downstream is read with conflate like so:

Source(0 to 1000) 
.throttle(10, 1.second, 1, ThrottleMode.shaping) // fast upstream
.conflate((in, prev) => in) // keep the latest value
.throttle(2, 1.second, 1, ThrottleMode.shaping) // slow downstream
.runForeach(println)

--
Johan
Akka Team
Arnout Engelen

sal...@thoughtworks.com

unread,
Feb 23, 2018, 5:22:16 AM2/23/18
to Akka User List
Hello Arnout,

We are providing api for developers to use. So, there is a callback written by other developers which will be executed instead of Thread.sleep in example. 

Hence, we cannot say whether they will write CPU intensive code or some kind of IO.

Hope this helps.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages