Delay within GraphStageLogic

154 views
Skip to first unread message

Gary Struthers

unread,
Aug 26, 2016, 1:28:46 AM8/26/16
to Akka User List
Hi,

I'm handling exceptions in a custom GraphStage, with some exceptions I want to retry after a delay. Is there a preferred way to do this? Do I just call Thread.sleep?

Gary

Konrad Malawski

unread,
Aug 26, 2016, 2:35:19 AM8/26/16
to akka...@googlegroups.com, Gary Struthers
Hi there,
you'd extend TimerGraphStageLogic instead and call scheduleOnce or schedulePeriodicallyWithInitialDelay.
The timer will call your onTimer method then.

Please read all the GraphStage docs to get a feel for it :-)

-- 
Konrad `ktoso` Malawski
Akka @ Lightbend
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Gary Struthers

unread,
Aug 26, 2016, 12:17:32 PM8/26/16
to Akka User List, agil...@earthlink.net
Thanks Konrad, but I posted because I tried scheduleOnce and got no delay

scheduleOnce(logger.debug("1 currentTimeMillis {}", System.currentTimeMillis()), FiniteDuration(100, MICROSECONDS))

scheduleOnce(logger.debug("2 currentTimeMillis {}", System.currentTimeMillis()), FiniteDuration(100, MILLISECONDS))

scheduleOnce(logger.debug("3 currentTimeMillis {}", System.currentTimeMillis()), FiniteDuration(100, SECONDS))

scheduleOnce(logger.debug("4 currentTimeMillis {}", System.currentTimeMillis()), FiniteDuration(100, MINUTES))

scheduleOnce(logger.debug("5 currentTimeMillis {}", System.currentTimeMillis()), FiniteDuration(100, DAYS))

Produced

2016-08-26 09:04:15,111 DEBUG ... akka.actor.default-dispatcher-3 - 1 currentTimeMillis 1472227455110

2016-08-26 09:04:15,115 DEBUG ... akka.actor.default-dispatcher-3 - 2 currentTimeMillis 1472227455114

2016-08-26 09:04:15,115 DEBUG ... akka.actor.default-dispatcher-3 - 3 currentTimeMillis 1472227455114

2016-08-26 09:04:15,116 DEBUG ... akka.actor.default-dispatcher-3 - 4 currentTimeMillis 1472227455114

2016-08-26 09:04:15,116 DEBUG ... akka.actor.default-dispatcher-3 - 5 currentTimeMillis 1472227455114

Akka version 2.4.9


Gary

Endre Varga

unread,
Aug 26, 2016, 1:13:49 PM8/26/16
to akka...@googlegroups.com, Gary Struthers
Hi Gary,

On Fri, Aug 26, 2016 at 6:17 PM, Gary Struthers <agil...@earthlink.net> wrote:
Thanks Konrad, but I posted because I tried scheduleOnce and got no delay

scheduleOnce(logger.debug("1 currentTimeMillis {}", System.currentTimeMillis()), FiniteDuration(100, MICROSECONDS))

scheduleOnce(logger.debug("2 currentTimeMillis {}", System.currentTimeMillis()), FiniteDuration(100, MILLISECONDS))

scheduleOnce(logger.debug("3 currentTimeMillis {}", System.currentTimeMillis()), FiniteDuration(100, SECONDS))

scheduleOnce(logger.debug("4 currentTimeMillis {}", System.currentTimeMillis()), FiniteDuration(100, MINUTES))

scheduleOnce(logger.debug("5 currentTimeMillis {}", System.currentTimeMillis()), FiniteDuration(100, DAYS))


Since logger.debug(...) returns Unit, you just scheduled the same timer with the key of the value "Unit" 5 times and not doing anything on the onTimer callback. 

Proper use of this API is

   scheduleOnce("myTimer", FiniteDuration(100, MILLISECONDS))

Then override onTimer where you will get the key so you can do multiple things depending on which timer fired. You can use the same key to cancel or reschedule timers.

-Endre
 

Produced

2016-08-26 09:04:15,111 DEBUG ... akka.actor.default-dispatcher-3 - 1 currentTimeMillis 1472227455110

2016-08-26 09:04:15,115 DEBUG ... akka.actor.default-dispatcher-3 - 2 currentTimeMillis 1472227455114

2016-08-26 09:04:15,115 DEBUG ... akka.actor.default-dispatcher-3 - 3 currentTimeMillis 1472227455114

2016-08-26 09:04:15,116 DEBUG ... akka.actor.default-dispatcher-3 - 4 currentTimeMillis 1472227455114

2016-08-26 09:04:15,116 DEBUG ... akka.actor.default-dispatcher-3 - 5 currentTimeMillis 1472227455114

Akka version 2.4.9


Gary

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

Gary Struthers

unread,
Aug 26, 2016, 6:44:39 PM8/26/16
to Akka User List, agil...@earthlink.net
Sorry, my onTimer() doesn't fire. It looks like all I need to do is 1. use TimerGraphStageLogic 2. call scheduleOnce 3. override onTimer. I am misusing Resume for retries but I've tried scheduleOnce in other places and it still doesn't fire. Here's an example, 

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {

    new TimerGraphStageLogic(shape) {


      private def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).

          getOrElse(Supervision.stoppingDecider)


      var retries = 1

      var duration = 100

      

      def myHandler(): Unit = {

          try {

            if(testException != null) throw testException

            if(iter.hasNext) {

              push(out, iter.next())

            }

          } catch {

            case NonFatal(e) => decider(e) match {

              case Supervision.Stop => {

                failStage(e)

              }

              case Supervision.Resume => {

                if(retries > 0) {

                  logger.debug("before scheduleOnce retries {} duration {}", retries, duration)

                  scheduleOnce(None, FiniteDuration(duration, MILLISECONDS))

                } else {

                  failStage(e) // too many retries

                }

              }

            }

          }        

      }

      setHandler(out, new OutHandler {

        override def onPull(): Unit = {

          myHandler()

        }

      })


      override protected def onTimer(timerKey: Any): Unit = {

        retries -= 1

        duration *= 2

        myHandler()

      }

    } 

  }

Endre Varga

unread,
Aug 27, 2016, 3:23:59 AM8/27/16
to akka...@googlegroups.com, Gary Struthers
On Sat, Aug 27, 2016 at 12:44 AM, Gary Struthers <agil...@earthlink.net> wrote:
Sorry, my onTimer() doesn't fire.

That would be surprising given that all of the built-in time based operators are implemented in terms of this.
 
It looks like all I need to do is 1. use TimerGraphStageLogic 2. call scheduleOnce 3. override onTimer. I am misusing Resume for retries but I've tried scheduleOnce in other places and it still doesn't fire. Here's an example, 

Is this a Source? You omitted the shape... Anyway, it might be that your stage is shut down earlier than the timer for some reason (for example downstream cancelled). Override postStop() and print something there to see.

Here are the implementations of various built-in time based stages for examples: https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/impl/Timers.scala

-Endre
 

--

Viktor Klang

unread,
Aug 27, 2016, 4:58:27 AM8/27/16
to Akka User List, Gary Struthers

I'd probably start by implementing the behavior through the built in combinators before venturing into creating custom stages.

--
Cheers,

Gary Struthers

unread,
Aug 27, 2016, 6:18:36 PM8/27/16
to Akka User List, agil...@earthlink.net

Thanks, It fires now. This was a MockSource just for working out how to do error handling. The problem was the tests completed before the timer fired.

Gary

On Saturday, August 27, 2016 at 12:23:59 AM UTC-7, drewhk wrote:


Is this a Source? You omitted the shape... Anyway, it might be that your stage is shut down earlier than the timer for some reason (for example downstream cancelled). Override postStop() and print something there to see.


-Endre


Reply all
Reply to author
Forward
0 new messages