[akka streams] question on some time related use cases

166 views
Skip to first unread message

Frank Sauer

unread,
Jan 14, 2015, 9:57:02 PM1/14/15
to akka...@googlegroups.com
I have two uses cases that I'm used to from using CEP systems like Esper and I'm trying to figure out if I can implements them (easily) with Akka Streams:

1) test if in a stream of events ALL new events satisfy some predicate during some finite interval of time, which starts at the time the predicate yields true the first time. This is useful to generate alerts on a stream of measurements but only if some faulty condition persists for some given time. 

2) test is some event does NOT occur after some other event within some finite duration 


My question is if these are supported by existing aka streams flow graph DSL elements or if a custom transformer is required. If the latter, I'd appreciate any pointers on how to approach writing it.

Thanks,

Frank

Akka Team

unread,
Jan 16, 2015, 11:52:03 AM1/16/15
to Akka User List
Hi Frank!
We do not have such operations off-the-shelf, however they are easily implementable by using custom stream processing stages:

Be sure to refer to the cookbook for some inspiration on how to implement your own stages:

Hope this helps, and feel free to ask for help in case you get stuck :-)

-- 
Konrad

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

Frank Sauer

unread,
Jan 21, 2015, 8:51:21 PM1/21/15
to akka...@googlegroups.com
Thanks, I came up with the following, but I have some questions:

/**
   * Holds elements of type A for a given finite duration after a predicate p first yields true and as long as subsequent
   * elements matching that first element (e.g. are equal) still satisfy the predicate. If a matching element arrives during
   * the given FiniteDuration for which the predicate p does not hold, the original element will NOT be pushed downstream.
   * Only when the timer expires and no matching elements have been seen for which p does not hold, will elem be pushed
   * downstream.
   *
   * @param duration The polling interval during which p has to hold true
   * @param p        The predicate that has to remain true during the duration
   * @param system   implicit required to schedule timers
   * @tparam A       type of the elements
   */
  class FilterFor[A](duration : FiniteDuration)(p: A => Boolean)(implicit system: ActorSystem) extends PushStage[A,A] {

    var state : Map[A,Cancellable] = Map.empty

    override def onPush(elem: A, ctx: Context[A]): Directive = state.get(elem) match {

      case Some(timer) if !p(elem) => // pending timer but condition no longer holds => cancel timer
         timer.cancel()
         state = state - elem
         ctx.pull()

       case None if p(elem) => // no pending timer and predicate true -> start and cache new timer
         val timer = system.scheduler.scheduleOnce(duration) {
           // when timer fires, remove from state and push elem downstream
           state = state - elem
           ctx.push(elem); // is this safe?
         }
         state = state + (elem -> timer)
         ctx.pull()

       case _ => ctx.pull() // otherwise simply wait for the next upstream element
    }

  }

My main concerns are these:

1) Is it safe to invoke ctx.push from the thread on which the timer fires?
2) How do I react to upstream or downstream finish or cancel events - do I have to?
3) Can I integrate this into the DSL without using transform, e.g. can I somehow add a filterFor method on something via a pimp my library?

Any and all pointers would be very much appreciated,

Thanks,

Frank

Frank Sauer

unread,
Jan 21, 2015, 11:07:05 PM1/21/15
to akka...@googlegroups.com
Update, in a simple test scenario like so 

  val ticks = Source(1 second, 1 second, () => "Hello")

  val flow = ticks.transform(() => new FilterFor[String](10 seconds)(x => true)).to(Sink.foreach(println(_)))

  flow.run()

I'm seeing the following error, so this doesn't work at all and I'm not sure it is because of threading:

java.lang.ArrayIndexOutOfBoundsException: -1
at akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$currentOp(Interpreter.scala:175)
at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.push(Interpreter.scala:209)
at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.push(Interpreter.scala:278)
at experiments.streams.time$FilterFor$$anonfun$1.apply$mcV$sp(time.scala:46)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I think I'm violating the one very important rule mentioned in the docs - when the timer fires it calls a push on the context but there is also a pull going on concurrently(?) - and this is indeed breaking in spectacular ways as expected.... 

I have no idea how to implement this correctly. It looked pretty simple at first, but alas... 

Endre Varga

unread,
Jan 22, 2015, 3:56:26 AM1/22/15
to akka...@googlegroups.com
Hi Frank,

No, it is absolutely forbidden. The golden rule for stages is, that in a handler:
 - *Exactly one* method should be called on the
 - *currently* passed Context
 - *exactly once*
 - *as the last statement* in the handler
 - *with the type matching* the expected return type of the handler

The only exceptions are isHolding and isFinished because they are query methods.

Calling any of these methods externally will not work, because the context is not thread-safe, and it violates the rules above.
 
You can approximate the behavior you want by instead of firing a timer, you just record the time of the first occurence of the event and then you check the elapsed time whenever a new incoming element arrives. Obviously this would only work if there are enough elements flowing, but you can inject some Filler elements easily, you can take this recipe and modify it to fit your needs: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Injecting_keep-alive_messages_into_a_stream_of_ByteStrings

We will have more flexible tools though to handle timers in the future. 

If you don't require your alerts to be a Stream itself, then you can alternatively use an actor and "ask" to process the events by using:

   myEvents.mapAsync(alertingActor ? ev)

The actor needs to reply to the incoming events so the stream continues to be pulled. The actor is free to schedule timers however it wants and fire alerts whenever it wants. Please note that events should be sequenced because mapAsync fires multiple asks parallelly. You can simply add a sequence number adding stage before the mapAsync if you don't have these on the events.



2) How do I react to upstream or downstream finish or cancel events - do I have to?

No, only if you want to do something special as a response for those events. Otherwise the default behavior is just to shut down the stage and propagate the termination signal.

-Endre

Endre Varga

unread,
Jan 22, 2015, 4:02:12 AM1/22/15
to akka...@googlegroups.com
On Thu, Jan 22, 2015 at 5:07 AM, Frank Sauer <fsau...@gmail.com> wrote:
Update, in a simple test scenario like so 

  val ticks = Source(1 second, 1 second, () => "Hello")

  val flow = ticks.transform(() => new FilterFor[String](10 seconds)(x => true)).to(Sink.foreach(println(_)))

  flow.run()

I'm seeing the following error, so this doesn't work at all and I'm not sure it is because of threading:

java.lang.ArrayIndexOutOfBoundsException: -1
at akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$currentOp(Interpreter.scala:175)
at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.push(Interpreter.scala:209)
at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.push(Interpreter.scala:278)
at experiments.streams.time$FilterFor$$anonfun$1.apply$mcV$sp(time.scala:46)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I think I'm violating the one very important rule mentioned in the docs - when the timer fires it calls a push on the context but there is also a pull going on concurrently(?) - and this is indeed breaking in spectacular ways as expected.... 

:)
 

I have no idea how to implement this correctly. It looked pretty simple at first, but alas... 

See my previous mail. The main problem here is mixing backpressured streams (your data) and non-backpressured events (timer triggers) in a safe fashion. Well, the main problem is not how to implement it, but how to expose an API to users which is as safe as possible. We have groupedWithin, takeWithin and dropWithin as timer based ops, but no customization for now.

-Endre

Frank Sauer

unread,
Jan 22, 2015, 8:42:11 PM1/22/15
to akka...@googlegroups.com
Thanks for the pointers Endre,  I’ll explore those ideas.

Frank

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/QAJou4yCW3k/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Jakub Liska

unread,
May 27, 2015, 7:32:23 PM5/27/15
to akka...@googlegroups.com
Hi,

btw can Stage by stateful? Is R/W from/to this in a PushPullStage thread safe? 

var state : Map[A,Cancellable] = Map.empty

Thanks, Jakub

Endre Varga

unread,
May 28, 2015, 5:58:53 AM5/28/15
to akka...@googlegroups.com
Hi Jakub, 

Every state that is encapsulated in the Stage is safe to being accessed from any of the Stage callbacks (onPull, onPush, etc.). In this regard it is like an Actor, where you can safely access its state from the receive block.


-Endre
Reply all
Reply to author
Forward
0 new messages