object ConflateDemo extends App {
import scala.concurrent.duration._
implicit val system = ActorSystem("demo")
implicit val fm = FlowMaterializer()
case object Tick
val fast = FlowFrom(Duration.Zero, 100.millis, () => Tick).conflate[Int](_ => 1, (acc,_) => acc + 1)
val slow = FlowFrom(Duration.Zero, 1.second, () => Tick)
import akka.stream.scaladsl2.FlowGraphImplicits._
val g = FlowGraph { implicit b =>
val zip = Zip[Int, Tick.type]
slow ~> zip.right
fast ~> zip.left
zip.out ~> FlowFrom[(Int, Tick.type)].map(_._1) ~> ForeachSink[Int]( v => print(" " + v))
}.run
}
}
My intention is to create two streams, first produces events 10 times faster than the second one. Conflate on the fast stream should count events. Then I zip those streams and print results of conflate out.
My understanding of how it should work:
val set = MaterializerSettings(system).withInputBuffer(1, 1)
implicit val fm = FlowMaterializer(set)
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
I think this is a very common use case for applications streaming time sensitive data (for example prices) where we would want to do an n-way zip and then receive the most recent set of data for those n things.