Kafka consumer flow control

654 views
Skip to first unread message

Manikanta G

unread,
Apr 11, 2018, 12:51:53 PM4/11/18
to vert.x
Hi, 

I m new to vert.x and I've a doubt regarding my current requirement: 1) read records from kafka, 2) process those and 3) send the processed to web, 4) update the status of each processed record in DB.

I've 1 verticle for kafka consumer, and 4 processor verticles (as my laptop has 4 cores).

I m reading records in consumer verticle and send those to processor verticle (through eventbus using service proxies) where processing and HTTP client requests will happen and once processed, consumer verticle will commit the offset in kafka.

Problem: consumer verticle is reading all messages in topic very rapidly. For example, I've create 1 lakh records in Kafka and the consumer verticle read all 1 lakh in 2-3 seconds and creating all those 1 lakh events in Vert.x event bus, and expecting 'reply' from processor verticles in 30 secods (default setting). And as many records are not being processed correctly as all the records can't be processed in 30 secods.

I m thinking to implement rate flow control using pause() and resume() methods, may be with batchHandler() instead of current handler() usage.

What is the best way to implement my current requirement. And, is Reactive Stream API when used with ProjectReactor.io supports back pressure (which could solve the rate control problem then).

Thanks for any help.

Manikanta G

Manikanta G

unread,
Apr 14, 2018, 9:30:45 AM4/14/18
to vert.x


With 4 instances, I m getting eventbus timeout exception. So, to improve perf, I've deployed more instances of MessageProcessorVerticle more than 4. When I changed to first 16, and then 8. But the overall performance of kafka consumer and processor got reduced a lot compared to 4 instances. Is this expected? 

I tried to implement rate control using Kafka's pause()/resume() methods, but the perf got degraded even (rate control logic taken from here: https://github.com/manikantag/vertxtest/blob/master/src/main/java/com/manikanta/vertxtest/core/agent/MessagePollerVerticle.java#L69). I m using Kafka consumer.handler(), with auto.commit=false


Can you please give at least a clue on what could be wrong in my implementation? CPU is utilized only around 40-50 and 50% RAM is free. But when deploying processor verticles more than the cores count, performance is degraded to worse the overall TPS.

Thanks,
Manikanta G

Manikanta G

unread,
Apr 21, 2018, 8:58:35 AM4/21/18
to vert.x
Hi,

Can anyone give me clue on why the flow control logic is slowing down the consumer like that.

Thanks,
Manikanta G


--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/45407291-b65b-46ae-aa30-b7820eeb86cc%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--

Thanks,
Manikanta G

Gadi Eichhorn

unread,
May 10, 2018, 4:56:24 AM5/10/18
to vert.x
Hi,

I didnt review your code.. sorry, can you answer simple questions first?
1. Did you use kafka-client consumer group?
2. Do you have partitions on your kafka topics?
3. How much ordering is important for your messages?

Sounds like a better idea to let kafka control the flow and process the data in few consumers instead of using your processors verticls.
Kafka do what you wand out of the box, I would not reinvent the wheel for that in code. It is also recommended to use many kafka partitoins for scallability and resiliance so you can scale kafka up too.

Sounds like you don't care about the ordering so just make a "fake" partiton strategy and let this be your scaling proprty.

Manikanta G

unread,
May 12, 2018, 10:47:09 AM5/12/18
to vert.x
Thanks for the reply. Excuse me for delayed reply.


1. Did you use kafka-client consumer group?

Yes. I've one consumer verticle and 4 processor verticles (= number of cores). In my case, when the app is deployed in multiple nodes, each consumer will have same name to avoid duplicate processing.


2. Do you have partitions on your kafka topics?

Yes. Number of partitions will depend on number of nodes.


3. How much ordering is important for your messages?
 
Ordering is not important.


Sounds like a better idea to let kafka control the flow and process the data in few consumers instead of using your processors verticls.

Are you saying not to use processor verticles for processing and thus avoiding event bus altogether? and have multiple consumer verticles instead?


Kafka do what you wand out of the box, I would not reinvent the wheel for that in code.

Yes, I read this in Kafka docs - that consumer can poll only when it can handle more messages.
 

It is also recommended to use many kafka partitoins for scallability and resiliance so you can scale kafka up too.

Yes. We'll definitely use multiple partitions.
 

One more question: I'm using batchHandler() and some times the event thread is being blocked for more than 2 second (some times 13-15 seconds too). I need batch as once I read message from Kafka, I need to get related data from DB. If I query DB for each and every record one by one, I m getting worse performance from DB. If I use bulk query (using MySQL driver rewriteBatchedStatements=true option). Same goes with insertion after processing. Individual insert queries are way too slow.

Can I do any better here? One way I thought is to use handler() instead of batchHandler() and use a List to do manual batching myself. Any better thoughts about this approach?

Also, batchHandler() is not working unless handler() is also invoked on the consumer. Is this expected?


Thanks,
Manikanta G
Reply all
Reply to author
Forward
0 new messages