Framing.delimiter and supervisor

235 views
Skip to first unread message

Ivan Baisi

unread,
Sep 18, 2015, 4:36:55 AM9/18/15
to Akka User List
Hi All,

I'm having some problem with the Framing.delimiter when it exceeds the maximumFrameLength. In such case, I'm just expecting to fall into the decider but instead I just get a nasty exception and the termination of my stream processing.

class StreamToKafkaFlowBuilder(config: Config)(implicit val materializer: akka.stream.Materializer) {
 
implicit val actorSystem = ActorSystem("ReactiveKafka")
 
private val kafka = new ReactiveKafka()
 
private val log = Logger.getLogger(this.getClass.getSimpleName)

 
private val bufferSize = 25000
 
private val source = Source.actorRef[ByteString](
    bufferSize
,
   
OverflowStrategy.dropHead
 
)

  val decider
: Supervision.Decider = {
   
case e: DataFormatException =>
      log
.log(Level.SEVERE, "Non GZIP format", e)
      e
.getStackTrace.foreach(println)
     
Supervision.resume
   
case e =>
     
FalconStatsD.increment("insideSupervisionDecider")
     
Logger.getLogger("some").log(Level.INFO, "INTO THE DECIDER")
      e
.getMessage
      e
.getStackTrace.foreach(println)
     
Supervision.resume
 
}

 
private val runnableByteStringProcessingFlow =
   
FlowGraph.closed(source) { implicit builder =>
      byteStringSource
=>
        val tweetBroadCaster
= builder.add(Broadcast[String](2))

        val byteStringToTweetFlow
= Flow[ByteString].
          via
(Framing.delimiter(
           
ByteString("\r\n"), maximumFrameLength = bufferSize, allowTruncation = true))
         
.map(_.utf8String + "\r\n")

        val tweetPrintSink
= Sink.foreach[String] {
         
case "\r\n" => FalconStatsD.increment("keepAliveReceivedFromGnipCompliance")
         
case a => {
            log
.log(Level.INFO, a)
           
FalconStatsD.increment("activityReceivedFromGnipCompliance")
         
}
       
}

        val producerProperties
= ProducerProperties(
          brokerList
= config.getStringList("kafka.brokers").toList.mkString(","),
          topic
= config.getString("kafka.topic"),
          clientId
= config.getString("kafka.clientId"),
          encoder
= new StringEncoder())

        val kafkaSink
= Sink(kafka.publish(producerProperties))

        byteStringSource
.outlet ~> Gzip.decoderFlow ~> byteStringToTweetFlow ~> tweetBroadCaster.in
        tweetBroadCaster
.out(0) ~> kafkaSink
        tweetBroadCaster
.out(1) ~> tweetPrintSink
   
}

 
def build: ActorRef = runnableByteStringProcessingFlow.withAttributes(ActorAttributes.supervisionStrategy(decider)).run()
}

[ERROR] [09/18/2015 10:31:24.636] [ReactiveKafka-akka.actor.default-dispatcher-3] [akka://ReactiveKafka/user/$a] Read 6617 bytes which is more than 180 without seeing a line terminator
akka
.stream.io.Framing$FramingException: Read 6617 bytes which is more than 180 without seeing a line terminator
    at akka
.stream.io.Framing$DelimiterFramingStage.doParse(Framing.scala:172)
    at akka
.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala:147)
    at akka
.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala:138)
    at akka
.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:436)
    at akka
.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:245)
    at akka
.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:434)
    at akka
.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580)
    at akka
.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(Interpreter.scala:241)
    at akka
.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(Interpreter.scala:666)
    at akka
.stream.stage.AbstractStage.enterAndPush(Stage.scala:66)
    at akka
.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157)
    at scala
.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at akka
.stream.impl.SubReceive.apply(Transfer.scala:16)
    at akka
.stream.impl.SubReceive.apply(Transfer.scala:12)
    at scala
.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka
.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
    at scala
.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at akka
.actor.Actor$class.aroundReceive(Actor.scala:467)
    at akka
.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:366)
    at akka
.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka
.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka
.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka
.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka
.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala
.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala
.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala
.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala
.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I've tried multiple things, from striping the flow in little pieces in which I'd throw exceptions at different stages with some:

Flow[someType].map{x => {throw new Exception; x}}


I always end up inside the decider but when the problem is with this FraminException thrown from Framing

Any ideas?

Endre Varga

unread,
Sep 18, 2015, 5:29:54 AM9/18/15
to akka...@googlegroups.com
Hi Ivan,

Supervision is not something that automatically works on any stage, unfortunately, it always needs custom work, and there are cases where it can't work at all (not even theoretically). I am not sure what should a delimiter framing do when a line-size has been exceeded for example? What do you expect to happen?

-Endre

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Ivan Baisi

unread,
Sep 18, 2015, 6:48:11 AM9/18/15
to Akka User List
Hey Endre,

Thanks for your reply.

Well, I would've expected that I could catch that FramingException and decide myself what to do instead of having my stream broken. There's probably something I'm missing with the supervisor. So far I've been using it inside my flows to deide on what to do with different exceptions within my stream. Things that go from DB exceptions to Json parse exceptions without any issue. My question is why doesn't this FramingException get 'caught' in the decider as well.

I'm going into the akka streams documentation again, to check if I'm not taking something into account. Is there any other resource that could be useful?

I think I've seen something similar with this FramingException when there's some exception with Gzip.decoderFlow due to some bad input. In this case, I'd expect the same, being able to decide myself what to do instead of having the stream terminated.

Cheers!

Endre Varga

unread,
Sep 18, 2015, 6:51:35 AM9/18/15
to akka...@googlegroups.com
Hi Ivan,

On Fri, Sep 18, 2015 at 12:48 PM, Ivan Baisi <iv...@falconsocial.com> wrote:
Hey Endre,

Thanks for your reply.

Well, I would've expected that I could catch that FramingException and decide myself what to do instead of having my stream broken. There's probably something I'm missing with the supervisor. So far I've been using it inside my flows to deide on what to do with different exceptions within my stream. Things that go from DB exceptions to Json parse exceptions without any issue. My question is why doesn't this FramingException get 'caught' in the decider as well.

The reason is that the DelimiterFraming has no support for supervision. Not all stages support supervision because sometimes it is not possible to implement them meaningfully. What would you expect the DelimiterFraming to do after a line happens to be too large?
 

I'm going into the akka streams documentation again, to check if I'm not taking something into account. Is there any other resource that could be useful? 

I think I've seen something similar with this FramingException when there's some exception with Gzip.decoderFlow due to some bad input. In this case, I'd expect the same, being able to decide myself what to do instead of having the stream terminated.

What do you mean by "decide what to do"? Can you give an example what you can do with a malformed gzip stream or a too large line?

-Endre

Ivan Baisi

unread,
Sep 18, 2015, 7:02:13 AM9/18/15
to Akka User List


On Friday, September 18, 2015 at 12:51:35 PM UTC+2, drewhk wrote:
Hi Ivan,

On Fri, Sep 18, 2015 at 12:48 PM, Ivan Baisi <iv...@falconsocial.com> wrote:
Hey Endre,

Thanks for your reply.

Well, I would've expected that I could catch that FramingException and decide myself what to do instead of having my stream broken. There's probably something I'm missing with the supervisor. So far I've been using it inside my flows to deide on what to do with different exceptions within my stream. Things that go from DB exceptions to Json parse exceptions without any issue. My question is why doesn't this FramingException get 'caught' in the decider as well.

The reason is that the DelimiterFraming has no support for supervision. Not all stages support supervision because sometimes it is not possible to implement them meaningfully. What would you expect the DelimiterFraming to do after a line happens to be too large?
 

I'm going into the akka streams documentation again, to check if I'm not taking something into account. Is there any other resource that could be useful? 

I think I've seen something similar with this FramingException when there's some exception with Gzip.decoderFlow due to some bad input. In this case, I'd expect the same, being able to decide myself what to do instead of having the stream terminated.

What do you mean by "decide what to do"? Can you give an example what you can do with a malformed gzip stream or a too large line?

Drop the element and continue processing the next elements. If I get a too large line, for example, I'd like to drop it and add a metric where later I can see how many too long lines I'm getting.
 
...

Endre Varga

unread,
Sep 18, 2015, 7:15:04 AM9/18/15
to akka...@googlegroups.com
Hi Ivan,


On Fri, Sep 18, 2015 at 1:02 PM, Ivan Baisi <iv...@falconsocial.com> wrote:


On Friday, September 18, 2015 at 12:51:35 PM UTC+2, drewhk wrote:
Hi Ivan,

On Fri, Sep 18, 2015 at 12:48 PM, Ivan Baisi <iv...@falconsocial.com> wrote:
Hey Endre,

Thanks for your reply.

Well, I would've expected that I could catch that FramingException and decide myself what to do instead of having my stream broken. There's probably something I'm missing with the supervisor. So far I've been using it inside my flows to deide on what to do with different exceptions within my stream. Things that go from DB exceptions to Json parse exceptions without any issue. My question is why doesn't this FramingException get 'caught' in the decider as well.

The reason is that the DelimiterFraming has no support for supervision. Not all stages support supervision because sometimes it is not possible to implement them meaningfully. What would you expect the DelimiterFraming to do after a line happens to be too large?
 

I'm going into the akka streams documentation again, to check if I'm not taking something into account. Is there any other resource that could be useful? 

I think I've seen something similar with this FramingException when there's some exception with Gzip.decoderFlow due to some bad input. In this case, I'd expect the same, being able to decide myself what to do instead of having the stream terminated.

What do you mean by "decide what to do"? Can you give an example what you can do with a malformed gzip stream or a too large line?

Drop the element and continue processing the next elements. If I get a too large line, for example, I'd like to drop it and add a metric where later I can see how many too long lines I'm getting.

You would map this behavior to the "Resume" directive? That might be possible, but I don't really like cramming all permutations of possible operations into the Akka Streams library (the similar Decoder in Netty also just fails on large frames). 

Btw where would the metric go? Should the framing materialize to some counter?

Currently, you can take the Stage implementation and add the skipping behavior yourself: https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/io/Framing.scala#L138

-Endre

Ivan Baisi

unread,
Sep 18, 2015, 7:26:29 AM9/18/15
to Akka User List
Thanks Endre,


On Friday, September 18, 2015 at 1:15:04 PM UTC+2, drewhk wrote:
Hi Ivan,


On Fri, Sep 18, 2015 at 1:02 PM, Ivan Baisi <iv...@falconsocial.com> wrote:


On Friday, September 18, 2015 at 12:51:35 PM UTC+2, drewhk wrote:
Hi Ivan,

On Fri, Sep 18, 2015 at 12:48 PM, Ivan Baisi <iv...@falconsocial.com> wrote:
Hey Endre,

Thanks for your reply.

Well, I would've expected that I could catch that FramingException and decide myself what to do instead of having my stream broken. There's probably something I'm missing with the supervisor. So far I've been using it inside my flows to deide on what to do with different exceptions within my stream. Things that go from DB exceptions to Json parse exceptions without any issue. My question is why doesn't this FramingException get 'caught' in the decider as well.

The reason is that the DelimiterFraming has no support for supervision. Not all stages support supervision because sometimes it is not possible to implement them meaningfully. What would you expect the DelimiterFraming to do after a line happens to be too large?
 

I'm going into the akka streams documentation again, to check if I'm not taking something into account. Is there any other resource that could be useful? 

I think I've seen something similar with this FramingException when there's some exception with Gzip.decoderFlow due to some bad input. In this case, I'd expect the same, being able to decide myself what to do instead of having the stream terminated.

What do you mean by "decide what to do"? Can you give an example what you can do with a malformed gzip stream or a too large line?

Drop the element and continue processing the next elements. If I get a too large line, for example, I'd like to drop it and add a metric where later I can see how many too long lines I'm getting.

You would map this behavior to the "Resume" directive? That might be possible, but I don't really like cramming all permutations of possible operations into the Akka Streams library (the similar Decoder in Netty also just fails on large frames). 

Btw where would the metric go? Should the framing materialize to some counter?

Currently, you can take the Stage implementation and add the skipping behavior yourself: https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/io/Framing.scala#L138

Yes, this is one of the things I tried and worked. I was just wondering if I was missing something that didn't let me use the akka Framing and then drop the element when encountering an exception so I could add the metric into de decider and do a simple Supervision.resume.

Thanks :)
 
...

Endre Varga

unread,
Sep 18, 2015, 7:32:48 AM9/18/15
to akka...@googlegroups.com
I created a ticket for the idea though: https://github.com/akka/akka/issues/18515

If you have a working modification of the stage you are welcome to submit a PR!

-Endre

--

Ivan Baisi

unread,
Sep 18, 2015, 8:12:13 AM9/18/15
to Akka User List
I made it work in the sense I just dropped the element and got the next chunk of bytes. the issue there is that the next part had an incomplete message which I didn't care for. But I still to learn more about how directives and supervisors work in order to have something fully in place. Id I manage to do it, of course I'll submit a PR.
...
Reply all
Reply to author
Forward
0 new messages