Alpakka Kafka consumer read from One partition at a time and does Roudrobin between partitions

65 views
Skip to first unread message

Hareesh Jagannathan

unread,
Mar 8, 2020, 4:17:38 PM3/8/20
to Akka User List

We have a Alpakka kafka consumer that listens to a topic with multiple partitions. As we have found two issues.

  1. If we use committable source as shown below. the consumer starts by listening to only one partition. Once the first batch commit happens it start reading from another partition. This behaviour continues until all partitions are read once and then it round robins between all partitions until all lags are caught up.(Single Thread consuming from all partitions so only partition at a time)

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName)

2.When replace above way of creating with commitable partitioned source we start with all partitions when consumer starts but still continue reading only from one partition at a time until lag zero on a given partition. This is the behaviour even if we use a mapAsync with flatMapMerge on source.(Single Thread consuming from all partitions so only partition at a time)

Source<Pair<TopicPartition, Source<CommittableMessage<String, String>, NotUsed>>, Control> source = Consumer.committablePartitionedSource(consumerSettings,
                                                                                                                                              Subscriptions.topics(TOPIC_NAME));

    source.flatMapMerge(10,
                        Pair::second)
          .mapAsync(10,
                    msg -> print(msg).thenApply(done -> msg.committableOffset()))
          .toMat(Committer.sink(committerSettings.withMaxBatch(100)),
                 Keep.both())
          .mapMaterializedValue(Consumer::createDrainingControl)
          .run(materializer);

If we don't do a flatMapMerge and wite one stream by per source we are able to use multi-thread and parallelly process from all partitions.

Why is Default Alpakka consumer behaviour is single-threaded?

One Another interesting behaviour:

  • If there are multiple consumers each consuming from different Kafka topics but using the same group-id to consume. when one of the listeners is reset using manual assignment all other consumer are also getting rebalances and getting reset.

Why is Kafka consumer sharing group ids between topic gets rebalanced if one of that topic is reset?

Similar to the below issues but I don't think its the shovel between the delete to compact topic causing it rather its a bug. Alpakka/Kafka - Partitions consumed faster than others

Reply all
Reply to author
Forward
0 new messages