Kafka flow shutting down unexpectedly when brokers fail

1,358 views
Skip to first unread message

Sean Rohead

unread,
Jan 12, 2018, 5:41:04 PM1/12/18
to Akka User List
I am using akka-stream-kafka 0.18. I have a flow that reads from one kafka topic, does some processing and then writes to a different kafka topic. The flow has been shutting down intermittently when kafka brokers fail. 

Sometimes the brokers will fail repeatedly over a long period and the flow does not shut down and other times it shuts down as soon as the broker fails the first time. In the logs below, once the message 'Closing the Kafka producer' appears, we no longer receive any messages from the Kafka topic.

Here is the code:

  private val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId(requestGroup)
    .withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")

  private val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  private def decider(throwable: Throwable): Supervision.Directive = {
    logger.error("Received error in request consumer - restarting", throwable)
    Supervision.Restart
  }

  private implicit val materializer: Materializer = ActorMaterializer()

  private val parallelism: Int = parallelismFactor * getRuntime.availableProcessors

  private val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(requestTopic))
    .mapAsync(parallelism)(messageProcessor.processMessage)
    .withAttributes(supervisionStrategy(decider))
    .via(Producer.flow(producerSettings).withAttributes(supervisionStrategy(decider)))
    .map(_.message.passThrough)
    .groupedWithin(batchSize, DurationUtils.toFiniteDuration(batchDelay))
    .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
    .mapAsync(parallelism)(_.commitScaladsl())
  source.runWith(Sink.ignore)

Am I missing something in the code that is necessary to keep the flow running when errors occur?

Here's the config:

akka.kafka.consumer {
  wakeup-timeout = 10s
  max-wakeups = 8640
  kafka-clients {
    enable.auto.commit = false
  }
}

Here's the logs just before things stop working:

2018-01-12 16:01:55.899 kafka-coordinator-heartbeat-thread | sherlock INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator Marking the coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) dead for group sherlock
2018-01-12 16:02:02.815 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator Discovered coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) for group sherlock.
2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Revoking previously assigned partitions [dlp_request-9, dlp_request-11, dlp_request-10] for group sherlock
2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator (Re-)joining group sherlock
2018-01-12 16:02:03.015 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator (Re-)joining group sherlock
2018-01-12 16:02:04.864 application-akka.actor.default-dispatcher-308541 INFO org.apache.kafka.clients.producer.KafkaProducer Closing the Kafka producer with timeoutMillis = 60000 ms.
2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 WARN akka.kafka.KafkaConsumerActor Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 10000 milliseconds
2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than `commit-time-warning`: 22991676910 ms
2018-01-12 16:02:12.832 application-akka.actor.default-dispatcher-308541 WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than `commit-time-warning`: 10016185009 ms
2018-01-12 16:02:12.919 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator Successfully joined group sherlock with generation 257
2018-01-12 16:02:12.920 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Setting newly assigned partitions [dlp_request-9, dlp_request-11, dlp_request-10] for group sherlock

Sean Rohead

unread,
Jan 22, 2018, 4:13:49 PM1/22/18
to Akka User List

Item 6 in the article says that applying a supervision strategy using withAttributes should be done at the END of the stream definition, unless you are changing async boundaries.

If I'm understanding this correctly, I should move the withAttributes() call AFTER the map call containing the foldLeft and then do I need an additional withAttributes() AFTER the mapAsync?

I've also seen code that adds a supervision strategy to the materializer. If I add it to the materializer, do I also need to add it to the stages using withAttributes or do I just need it in one or the other? Is one approach preferred to the other?

Thanks!

Michal Borowiecki

unread,
Jan 25, 2018, 10:37:35 AM1/25/18
to Akka User List
I use RestartSource.withBackoff to recover from broker outages.
Reply all
Reply to author
Forward
0 new messages