[akka-stream] Understanding conflate and zip

802 views
Skip to first unread message

Vladimir Koshelev

unread,
Oct 14, 2014, 5:07:47 PM10/14/14
to akka...@googlegroups.com
Hi,

I'm trying to use akka-stream and struggling to understand how conflate works. Here a snippet of code I'm using: 

object ConflateDemo extends App {

  import scala.concurrent.duration._  

  implicit val system = ActorSystem("demo")

  implicit val fm = FlowMaterializer()

  case object Tick

  val fast = FlowFrom(Duration.Zero100.millis, () => Tick).conflate[Int](_ => 1, (acc,_) => acc + 1)

  val slow = FlowFrom(Duration.Zero, 1.second, () => Tick)

  import akka.stream.scaladsl2.FlowGraphImplicits._

  val g = FlowGraph { implicit b =>

    val zip = Zip[Int, Tick.type]

    slow ~> zip.right

    fast ~> zip.left

    zip.out ~> FlowFrom[(Int, Tick.type)].map(_._1) ~> ForeachSink[Int]( v =>  print(" " + v))

  }.run

}

}

My intention is to create two streams, first produces events 10 times faster than the second one. Conflate on the fast stream should count events. Then I zip those streams and print results of conflate out. 

My understanding of how it should work:

  • conflate consumes events produced by fast stream as fast as they can be produced, so there is always demand and all ticks are emitted and counted.
  • As soon as an event from the slow flow arrives to the zip, zip demands an element from conflate and sends them into sink.
So, I have expected output looking like: 10 10 10 10 10 ...
What I get:  1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 55 1 1 1 1 1 1 1 73 1 1 1 1 1 1 1 73 1 1 1 1 1 1 1 73 1 1 1 1 1 1 1 73 1 1 1 1 1 1 1 72 1 1 1 1 1 1 1 74 1 1 1 1 1 1 1 72 1 1 ...

What am i missing? Is there some mistake in the code or do I misunderstand how conflate and zip are working?


Vladimir



Martynas Mickevičius

unread,
Oct 15, 2014, 6:38:31 AM10/15/14
to akka...@googlegroups.com
Hi Vladimir,

you have hit on the interesting issue. Your understanding is correct but the output of your example is not what you expect, because every stage in the stream works in batches. The size of the batch is the size of the input buffer which can be set in akka.stream.materializer.{initial, max}-input-buffer-size.

So when zip asks upstream for more elements it asks for more than one element. That request can be fulfilled by the faster source with an element every 100 milliseconds which it does and conflate does not have time to kick in. That is why you see lots of 1s printed out.

From time to time buffers align and zip's request for more elements is fulfilled first by accumulated value by the conflate and then again by 1s every 100 milliseconds.

A quick fix is setting input buffer size to 1 for a whole stream. However that makes the whole stream slower.

val set = MaterializerSettings(system).withInputBuffer(1, 1)
implicit val fm = FlowMaterializer(set)
A more proper fix would be to provide "synchronize" element which would be the same zip under the hood but with the buffer size of one. Could you create an issue in Akka issue tracker for that?

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Martynas Mickevičius
TypesafeReactive Apps on the JVM

Vladimir Koshelev

unread,
Oct 15, 2014, 4:03:22 PM10/15/14
to akka...@googlegroups.com
Hi Martynas,

thank you a lot for the clarification! Now it all make sense to me again :)
I'm not sure, that it worth of creating an issue. As you noticed, this is an unusual use-case - having conflate on the one side of a zip and a kind of timer on the other. I think a TimedTransform is a better fit for implementing such functionality (folding on stream and emitting intermediate result periodically). But even in the case of conflate and zip - downstream should be able to deal with those kind of elements anyway.
What do you think?

Alec Zorab

unread,
Oct 16, 2014, 4:49:16 AM10/16/14
to akka...@googlegroups.com
I think this is a very common use case for applications streaming time sensitive data (for example prices) where we would want to do an n-way zip and then receive the most recent set of data for those n things.

In that case you'd want to do several instruments conflating with (identity, _._2) and a synchronised zip - the timer here is a bit of a red herring, there are plenty of cases where I just want the most recent element of several streams when I pull from them.

Endre Varga

unread,
Oct 17, 2014, 8:50:33 AM10/17/14
to akka...@googlegroups.com
On Thu, Oct 16, 2014 at 10:49 AM, Alec Zorab <alec...@gmail.com> wrote:
I think this is a very common use case for applications streaming time sensitive data (for example prices) where we would want to do an n-way zip and then receive the most recent set of data for those n things.

I agree, and it is also useful in other timing related contexts (throttling, implementing traffic shaping, etc.). The simple solution is to have Zip as it is now for deterministic inputs (since it has higher throughput) and have a counterpart that is useful for time-oriented, non-deterministic streams. As Martynas said, this latter is simply a buffer-1 version of the Zip internally, so this is trivial to implement.

-Endre
Reply all
Reply to author
Forward
0 new messages