We have a Alpakka kafka consumer that listens to a topic with multiple partitions. As we have found two issues.
If we use committable source as shown below. the consumer starts by listening to only one partition. Once the first batch commit happens it start reading from another partition. This behaviour continues until all partitions are read once and then it round robins between all partitions until all lags are caught up.(Single Thread consuming from all partitions so only partition at a time)
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName)
2.When replace above way of creating with commitable partitioned source we start with all partitions when consumer starts but still continue reading only from one partition at a time until lag zero on a given partition. This is the behaviour even if we use a mapAsync with flatMapMerge on source.(Single Thread consuming from all partitions so only partition at a time)
Source<Pair<TopicPartition, Source<CommittableMessage<String, String>, NotUsed>>, Control> source = Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics(TOPIC_NAME));
source.flatMapMerge(10,
Pair::second)
.mapAsync(10,
msg -> print(msg).thenApply(done -> msg.committableOffset()))
.toMat(Committer.sink(committerSettings.withMaxBatch(100)),
Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(materializer);
If we don't do a flatMapMerge and wite one stream by per source we are able to use multi-thread and parallelly process from all partitions.
Why is Default Alpakka consumer behaviour is single-threaded?
One Another interesting behaviour:
Why is Kafka consumer sharing group ids between topic gets rebalanced if one of that topic is reset?
Similar to the below issues but I don't think its the shovel between the delete to compact topic causing it rather its a bug. Alpakka/Kafka - Partitions consumed faster than others