--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
/** * Holds elements of type A for a given finite duration after a predicate p first yields true and as long as subsequent * elements matching that first element (e.g. are equal) still satisfy the predicate. If a matching element arrives during * the given FiniteDuration for which the predicate p does not hold, the original element will NOT be pushed downstream. * Only when the timer expires and no matching elements have been seen for which p does not hold, will elem be pushed * downstream. * * @param duration The polling interval during which p has to hold true * @param p The predicate that has to remain true during the duration * @param system implicit required to schedule timers * @tparam A type of the elements */ class FilterFor[A](duration : FiniteDuration)(p: A => Boolean)(implicit system: ActorSystem) extends PushStage[A,A] {
var state : Map[A,Cancellable] = Map.empty
override def onPush(elem: A, ctx: Context[A]): Directive = state.get(elem) match {
case Some(timer) if !p(elem) => // pending timer but condition no longer holds => cancel timer timer.cancel() state = state - elem ctx.pull()
case None if p(elem) => // no pending timer and predicate true -> start and cache new timer val timer = system.scheduler.scheduleOnce(duration) { // when timer fires, remove from state and push elem downstream state = state - elem ctx.push(elem); // is this safe? } state = state + (elem -> timer) ctx.pull()
case _ => ctx.pull() // otherwise simply wait for the next upstream element }
} val ticks = Source(1 second, 1 second, () => "Hello")
val flow = ticks.transform(() => new FilterFor[String](10 seconds)(x => true)).to(Sink.foreach(println(_)))
flow.run()java.lang.ArrayIndexOutOfBoundsException: -1 at akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$currentOp(Interpreter.scala:175) at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.push(Interpreter.scala:209) at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.push(Interpreter.scala:278) at experiments.streams.time$FilterFor$$anonfun$1.apply$mcV$sp(time.scala:46) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)2) How do I react to upstream or downstream finish or cancel events - do I have to?
Update, in a simple test scenario like soval ticks = Source(1 second, 1 second, () => "Hello")val flow = ticks.transform(() => new FilterFor[String](10 seconds)(x => true)).to(Sink.foreach(println(_)))flow.run()
I'm seeing the following error, so this doesn't work at all and I'm not sure it is because of threading:java.lang.ArrayIndexOutOfBoundsException: -1at akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$currentOp(Interpreter.scala:175)at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.push(Interpreter.scala:209)at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.push(Interpreter.scala:278)at experiments.streams.time$FilterFor$$anonfun$1.apply$mcV$sp(time.scala:46)at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I think I'm violating the one very important rule mentioned in the docs - when the timer fires it calls a push on the context but there is also a pull going on concurrently(?) - and this is indeed breaking in spectacular ways as expected....
I have no idea how to implement this correctly. It looked pretty simple at first, but alas...
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/QAJou4yCW3k/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.