I am using akka-stream-kafka 0.18. I have a flow that reads from one kafka topic, does some processing and then writes to a different kafka topic. The flow has been shutting down intermittently when kafka brokers fail.
Sometimes the brokers will fail repeatedly over a long period and the flow does not shut down and other times it shuts down as soon as the broker fails the first time. In the logs below, once the message 'Closing the Kafka producer' appears, we no longer receive any messages from the Kafka topic.
Here is the code:
private val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(requestGroup)
.withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")
private val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
private def decider(throwable: Throwable): Supervision.Directive = {
logger.error("Received error in request consumer - restarting", throwable)
Supervision.Restart
}
private implicit val materializer: Materializer = ActorMaterializer()
private val parallelism: Int = parallelismFactor * getRuntime.availableProcessors
private val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(requestTopic))
.mapAsync(parallelism)(messageProcessor.processMessage)
.withAttributes(supervisionStrategy(decider))
.via(Producer.flow(producerSettings).withAttributes(supervisionStrategy(decider)))
.map(_.message.passThrough)
.groupedWithin(batchSize, DurationUtils.toFiniteDuration(batchDelay))
.map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
.mapAsync(parallelism)(_.commitScaladsl())
source.runWith(Sink.ignore)
Am I missing something in the code that is necessary to keep the flow running when errors occur?
Here's the config:
akka.kafka.consumer {
wakeup-timeout = 10s
max-wakeups = 8640
kafka-clients {
enable.auto.commit = false
}
}
Here's the logs just before things stop working:
2018-01-12 16:01:55.899 kafka-coordinator-heartbeat-thread | sherlock INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator Marking the coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) dead for group sherlock
2018-01-12 16:02:02.815 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator Discovered coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) for group sherlock.
2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Revoking previously assigned partitions [dlp_request-9, dlp_request-11, dlp_request-10] for group sherlock
2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator (Re-)joining group sherlock
2018-01-12 16:02:03.015 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator (Re-)joining group sherlock
2018-01-12 16:02:04.864 application-akka.actor.default-dispatcher-308541 INFO org.apache.kafka.clients.producer.KafkaProducer Closing the Kafka producer with timeoutMillis = 60000 ms.
2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 WARN akka.kafka.KafkaConsumerActor Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 10000 milliseconds
2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than `commit-time-warning`: 22991676910 ms
2018-01-12 16:02:12.832 application-akka.actor.default-dispatcher-308541 WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than `commit-time-warning`: 10016185009 ms
2018-01-12 16:02:12.919 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator Successfully joined group sherlock with generation 257
2018-01-12 16:02:12.920 application-akka.kafka.default-dispatcher-23 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Setting newly assigned partitions [dlp_request-9, dlp_request-11, dlp_request-10] for group sherlock