Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 4
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
Caused by: java.lang.Exception: Consumer actor terminated
at akka.kafka.internal.SubSourceLogic$SubSourceStage$$anon$1$$anonfun$preStart$2.apply(SubSourceLogic.scala:150)
At that point there is no way to skip the element by resuming, and the stream is stuck because it is stopped (and restarted by our host actor) with no offset committed. The "Consumer actor terminated" exception has no cause value.
I've tried attaching a Supervision.Decider to the partition source, the intervals source and the merge thereof.
val mergedSource: Source[CommittableNormalizedInterval, Consumer.Control] =
sources.withAttributes(attributesForPartitionSource)
.flatMapMerge(breadth = consumerParallelism, a => a._2.withAttributes(attributesForIntervalSource))
val result = mergedSource.withAttributes(attributesForIntervalSource)
.via(partitionFlow)
.toMat(Sink.ignore)(Keep.both)
I'm using akka-kafka with committablePartitionedSource, meaning I have a source of (partition, innersource) tuples.I'm currently using the flatMapMerge recipe fromval done = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1")) .flatMapMerge(maxPartitions, _._2) .via(business) .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) => batch.updated(elem.committableOffset) } .mapAsync(3)(_.commitScaladsl()) .runWith(Sink.ignore)
If I have a default supervision strategy which resumes, set at the materializer level, but want to stop the stream (which in my case restarts a host actor with backoff) at the source level, do I need to do a withAttributes(attributesForStoppingDecider) at both the outer source and inner source leves?i.e.val done = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1")) .withAttributes(attributesForStoppingDecider) .flatMapMerge(maxPartitions, _._2.withAttributes(
attributesForStoppingDecider)) .via(business)or can I do it after the flatMapMerge? Question motivated by this open ticket which I'm not sure I fully understand yet.