class StreamToKafkaFlowBuilder(config: Config)(implicit val materializer: akka.stream.Materializer) {
implicit val actorSystem = ActorSystem("ReactiveKafka")
private val kafka = new ReactiveKafka()
private val log = Logger.getLogger(this.getClass.getSimpleName)
private val bufferSize = 25000
private val source = Source.actorRef[ByteString](
bufferSize,
OverflowStrategy.dropHead
)
val decider: Supervision.Decider = {
case e: DataFormatException =>
log.log(Level.SEVERE, "Non GZIP format", e)
e.getStackTrace.foreach(println)
Supervision.resume
case e =>
FalconStatsD.increment("insideSupervisionDecider")
Logger.getLogger("some").log(Level.INFO, "INTO THE DECIDER")
e.getMessage
e.getStackTrace.foreach(println)
Supervision.resume
}
private val runnableByteStringProcessingFlow =
FlowGraph.closed(source) { implicit builder =>
byteStringSource =>
val tweetBroadCaster = builder.add(Broadcast[String](2))
val byteStringToTweetFlow = Flow[ByteString].
via(Framing.delimiter(
ByteString("\r\n"), maximumFrameLength = bufferSize, allowTruncation = true))
.map(_.utf8String + "\r\n")
val tweetPrintSink = Sink.foreach[String] {
case "\r\n" => FalconStatsD.increment("keepAliveReceivedFromGnipCompliance")
case a => {
log.log(Level.INFO, a)
FalconStatsD.increment("activityReceivedFromGnipCompliance")
}
}
val producerProperties = ProducerProperties(
brokerList = config.getStringList("kafka.brokers").toList.mkString(","),
topic = config.getString("kafka.topic"),
clientId = config.getString("kafka.clientId"),
encoder = new StringEncoder())
val kafkaSink = Sink(kafka.publish(producerProperties))
byteStringSource.outlet ~> Gzip.decoderFlow ~> byteStringToTweetFlow ~> tweetBroadCaster.in
tweetBroadCaster.out(0) ~> kafkaSink
tweetBroadCaster.out(1) ~> tweetPrintSink
}
def build: ActorRef = runnableByteStringProcessingFlow.withAttributes(ActorAttributes.supervisionStrategy(decider)).run()
}[ERROR] [09/18/2015 10:31:24.636] [ReactiveKafka-akka.actor.default-dispatcher-3] [akka://ReactiveKafka/user/$a] Read 6617 bytes which is more than 180 without seeing a line terminator
akka.stream.io.Framing$FramingException: Read 6617 bytes which is more than 180 without seeing a line terminator
at akka.stream.io.Framing$DelimiterFramingStage.doParse(Framing.scala:172)
at akka.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala:147)
at akka.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala:138)
at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:436)
at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:245)
at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:434)
at akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580)
at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(Interpreter.scala:241)
at akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(Interpreter.scala:666)
at akka.stream.stage.AbstractStage.enterAndPush(Stage.scala:66)
at akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:366)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Flow[someType].map{x => {throw new Exception; x}}--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Hey Endre,
Thanks for your reply.
Well, I would've expected that I could catch that FramingException and decide myself what to do instead of having my stream broken. There's probably something I'm missing with the supervisor. So far I've been using it inside my flows to deide on what to do with different exceptions within my stream. Things that go from DB exceptions to Json parse exceptions without any issue. My question is why doesn't this FramingException get 'caught' in the decider as well.
I'm going into the akka streams documentation again, to check if I'm not taking something into account. Is there any other resource that could be useful?
I think I've seen something similar with this FramingException when there's some exception with Gzip.decoderFlow due to some bad input. In this case, I'd expect the same, being able to decide myself what to do instead of having the stream terminated.
Hi Ivan,
On Fri, Sep 18, 2015 at 12:48 PM, Ivan Baisi <iv...@falconsocial.com> wrote:Hey Endre,
Thanks for your reply.
Well, I would've expected that I could catch that FramingException and decide myself what to do instead of having my stream broken. There's probably something I'm missing with the supervisor. So far I've been using it inside my flows to deide on what to do with different exceptions within my stream. Things that go from DB exceptions to Json parse exceptions without any issue. My question is why doesn't this FramingException get 'caught' in the decider as well.The reason is that the DelimiterFraming has no support for supervision. Not all stages support supervision because sometimes it is not possible to implement them meaningfully. What would you expect the DelimiterFraming to do after a line happens to be too large?
I'm going into the akka streams documentation again, to check if I'm not taking something into account. Is there any other resource that could be useful?
I think I've seen something similar with this FramingException when there's some exception with Gzip.decoderFlow due to some bad input. In this case, I'd expect the same, being able to decide myself what to do instead of having the stream terminated.What do you mean by "decide what to do"? Can you give an example what you can do with a malformed gzip stream or a too large line?
...
On Friday, September 18, 2015 at 12:51:35 PM UTC+2, drewhk wrote:Hi Ivan,On Fri, Sep 18, 2015 at 12:48 PM, Ivan Baisi <iv...@falconsocial.com> wrote:Hey Endre,
Thanks for your reply.
Well, I would've expected that I could catch that FramingException and decide myself what to do instead of having my stream broken. There's probably something I'm missing with the supervisor. So far I've been using it inside my flows to deide on what to do with different exceptions within my stream. Things that go from DB exceptions to Json parse exceptions without any issue. My question is why doesn't this FramingException get 'caught' in the decider as well.The reason is that the DelimiterFraming has no support for supervision. Not all stages support supervision because sometimes it is not possible to implement them meaningfully. What would you expect the DelimiterFraming to do after a line happens to be too large?
I'm going into the akka streams documentation again, to check if I'm not taking something into account. Is there any other resource that could be useful?
I think I've seen something similar with this FramingException when there's some exception with Gzip.decoderFlow due to some bad input. In this case, I'd expect the same, being able to decide myself what to do instead of having the stream terminated.What do you mean by "decide what to do"? Can you give an example what you can do with a malformed gzip stream or a too large line?
Drop the element and continue processing the next elements. If I get a too large line, for example, I'd like to drop it and add a metric where later I can see how many too long lines I'm getting.
Hi Ivan,On Fri, Sep 18, 2015 at 1:02 PM, Ivan Baisi <iv...@falconsocial.com> wrote:
On Friday, September 18, 2015 at 12:51:35 PM UTC+2, drewhk wrote:Hi Ivan,On Fri, Sep 18, 2015 at 12:48 PM, Ivan Baisi <iv...@falconsocial.com> wrote:Hey Endre,
Thanks for your reply.
Well, I would've expected that I could catch that FramingException and decide myself what to do instead of having my stream broken. There's probably something I'm missing with the supervisor. So far I've been using it inside my flows to deide on what to do with different exceptions within my stream. Things that go from DB exceptions to Json parse exceptions without any issue. My question is why doesn't this FramingException get 'caught' in the decider as well.The reason is that the DelimiterFraming has no support for supervision. Not all stages support supervision because sometimes it is not possible to implement them meaningfully. What would you expect the DelimiterFraming to do after a line happens to be too large?
I'm going into the akka streams documentation again, to check if I'm not taking something into account. Is there any other resource that could be useful?
I think I've seen something similar with this FramingException when there's some exception with Gzip.decoderFlow due to some bad input. In this case, I'd expect the same, being able to decide myself what to do instead of having the stream terminated.What do you mean by "decide what to do"? Can you give an example what you can do with a malformed gzip stream or a too large line?
Drop the element and continue processing the next elements. If I get a too large line, for example, I'd like to drop it and add a metric where later I can see how many too long lines I'm getting.You would map this behavior to the "Resume" directive? That might be possible, but I don't really like cramming all permutations of possible operations into the Akka Streams library (the similar Decoder in Netty also just fails on large frames).Btw where would the metric go? Should the framing materialize to some counter?Currently, you can take the Stage implementation and add the skipping behavior yourself: https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/io/Framing.scala#L138
...
--
...