We have a Alpakka kafka consumer that listens to a topic with multiple partitions. As we have found two issues.
2.When replace above way of creating with commitable partitioned source we start with all partitions when consumer starts but still continue reading only from one partition at a time until lag zero on a given partition. This is the behaviour even if we use a mapAsync with flatMapMerge on source.(Single Thread consuming from all partitions so only partition at a time)
Source<Pair<TopicPartition, Source<CommittableMessage<String, String>, NotUsed>>, Control> source = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(TOPIC_NAME)); source.flatMapMerge(10, Pair::second) .mapAsync(10, msg -> print(msg).thenApply(done -> msg.committableOffset())) .toMat(Committer.sink(committerSettings.withMaxBatch(100)), Keep.both()) .mapMaterializedValue(Consumer::createDrainingControl) .run(materializer);
If we don't do a flatMapMerge and wite one stream by per source we are able to use multi-thread and parallelly process from all partitions.
Why is Default Alpakka consumer behaviour is single-threaded?
One Another interesting behaviour:
Why is Kafka consumer sharing group ids between topic gets rebalanced if one of that topic is reset?
Similar to the below issues but I don't think its the shovel between the delete to compact topic causing it rather its a bug. Alpakka/Kafka - Partitions consumed faster than others