Akka Kafka supervision hell

156 views
Skip to first unread message

Richard Rodseth

unread,
Oct 12, 2017, 1:31:03 PM10/12/17
to akka...@googlegroups.com
Apologies for the alarmist subject, but we're having a very difficult time getting supervision solid for a committablePartitionedSource.

Latest issue is that someone used the Kafka REST Proxy to post an invalid message (we *are* using the schema registry). When running our consumer, that resulted in

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 4

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

This then manifested in the stream's onFailure handler as 

Caused by: java.lang.Exception: Consumer actor terminated

at akka.kafka.internal.SubSourceLogic$SubSourceStage$$anon$1$$anonfun$preStart$2.apply(SubSourceLogic.scala:150) 

At that point there is no way to skip the element by resuming, and the stream is stuck because it is stopped (and restarted by our host actor) with no offset committed. The "Consumer actor terminated" exception has no cause value.

I've tried attaching a Supervision.Decider to the partition source, the intervals source and the merge thereof.

    val mergedSource: Source[CommittableNormalizedInterval, Consumer.Control] =

      sources.withAttributes(attributesForPartitionSource)

        .flatMapMerge(breadth = consumerParallelism, a => a._2.withAttributes(attributesForIntervalSource))

    val result = mergedSource.withAttributes(attributesForIntervalSource)

      .via(partitionFlow)

      .toMat(Sink.ignore)(Keep.both)

But those deciders are not reached, so I appear to have no way to detect the serialization problem and skip the element.

On Fri, Oct 6, 2017 at 10:30 AM, Richard Rodseth <rrod...@gmail.com> wrote:
I'm using akka-kafka with committablePartitionedSource, meaning I have a source of (partition, innersource) tuples.

I'm currently using the flatMapMerge recipe from


val done = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
  .flatMapMerge(maxPartitions, _._2)
  .via(business)
  .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) =>
    batch.updated(elem.committableOffset)
  }
  .mapAsync(3)(_.commitScaladsl())
  .runWith(Sink.ignore)
If I have a default supervision strategy which resumes, set at the materializer level, but want to stop the stream (which in my case restarts a host actor with backoff) at the source level, do I need to do a withAttributes(attributesForStoppingDecider) at both the outer source and inner source leves?
i.e.
val done = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
  .withAttributes(attributesForStoppingDecider)
.flatMapMerge(maxPartitions, _._2.withAttributes(attributesForStoppingDecider))
.via(business)
or can I do it after the flatMapMerge? Question motivated by this open ticket which I'm not sure I fully understand yet.


Reply all
Reply to author
Forward
0 new messages