Kafka Stream Performance Issue

805 views
Skip to first unread message

Sicheng Liu

unread,
Sep 24, 2017, 2:55:44 PM9/24/17
to Confluent Platform
Hi Confluent Platform Team,

Currently we are using Kafka Stream to do aggregations for our metrics data with 96 partitions. The topology is as simple as one source and two aggregations (using DSL). The aggregation logic is very simple: just some basic math operations like sum and max. We have already get rid of serialization/deserialization by doing byte operations. Key size is about 50 bytes after compression and value size is about 20 bytes. We run our stream workers on ec2 instances with type m4.xlarge (8 cores, 16GB memory) and the throughput we achieve is only about 4k metrics per core per second. 

The CPU is already 100% so increasing threads would not help. During profiling we saw that one time consuming method "rocksIterator.seek" (average 20 microsecond) but we tried to tune rocksdb with hash index and plain table but seeing no improvement. I'm not sure what I can do to improve the performance. Could you give some help?

Thanks,
Sicheng

Damian Guy

unread,
Sep 25, 2017, 9:14:00 AM9/25/17
to Confluent Platform
Hi,

Is it window operations? Can you provide the topology that you are running?

Have you had a look here for some sizing guidlines: https://docs.confluent.io/current/streams/sizing.html#stateful-applications

Do you have any io stats?

Thanks,
Damian 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7b6f31df-2629-405d-93ce-c4cbf924a7fb%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Message has been deleted
Message has been deleted

Sicheng Liu

unread,
Sep 25, 2017, 2:50:08 PM9/25/17
to Confluent Platform
Hi Damian,

Thanks for your reply. The topology is defined as the following:

KGroupedStream<byte[], byte[]> inputStream = builder.stream(byteArraySerde, byteArraySerde, inputTopic)
.filter((key, value) -> ...
)
.groupByKey(
byteArraySerde, byteArraySerde);

// 5 min aggregation also on `inputStream`
KStream<Windowed<byte[]>, byte[]> kStream1 = inputStream.aggregate(() -> StreamAggregation.build(),
(aggKey, value, aggregation) -> StreamAggregation.aggregate(aggregation, StreamValue.getValue(value)),
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)).until(TimeUnit.HOURS.toMillis(3)),
byteArraySerde,
"metrics-store-name-5m")
.toStream();

// 1 hour aggregation also on `inputStream`
KStream<Windowed<byte[]>, byte[]> kStream2 = inputStream.aggregate(() -> StreamAggregation.build(),
(aggKey, value, aggregation) -> StreamAggregation.aggregate(aggregation, StreamValue.getValue(value)),
TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.HOURS.toMillis(3)),
byteArraySerde,
"metrics-store-name-1h")
.toStream()
        .filter();

kStream1.map((key, value) -> ...).filter((key, value) -> filter1 ).to(byteArraySerde, byteArraySerde, outputTopic);

kStream1.map(
(key, value) -> ...).filter((key, value) -> filter2 ).to(byteArraySerde, byteArraySerde, outputTopic);

kStream2.map((key, value) -> ...).filter((key, value) -> filter1 ).to(byteArraySerde, byteArraySerde, outputTopic);

kStream2.map(
(key, value) -> ...).filter((key, value) -> filter2 ).to(byteArraySerde, byteArraySerde, outputTopic);


For the stats, our cpu is always 100%. Memory usage is about 80%. Network In is about 1 MiB/Sec for the stream workers. The message injection rate is about 0.3 million messages per sec.

Let me know if there is anything else I can provide.

Thanks,
Sicheng Liu


在 2017年9月25日星期一 UTC-7上午6:14:00,Damian Guy写道:
Hi,

Is it window operations? Can you provide the topology that you are running?

Have you had a look here for some sizing guidlines: https://docs.confluent.io/current/streams/sizing.html#stateful-applications

Do you have any io stats?

Thanks,
Damian 

On Sun, 24 Sep 2017 at 19:55 Sicheng Liu <lzd...@gmail.com> wrote:
Hi Confluent Platform Team,

Currently we are using Kafka Stream to do aggregations for our metrics data with 96 partitions. The topology is as simple as one source and two aggregations (using DSL). The aggregation logic is very simple: just some basic math operations like sum and max. We have already get rid of serialization/deserialization by doing byte operations. Key size is about 50 bytes after compression and value size is about 20 bytes. We run our stream workers on ec2 instances with type m4.xlarge (8 cores, 16GB memory) and the throughput we achieve is only about 4k metrics per core per second. 

The CPU is already 100% so increasing threads would not help. During profiling we saw that one time consuming method "rocksIterator.seek" (average 20 microsecond) but we tried to tune rocksdb with hash index and plain table but seeing no improvement. I'm not sure what I can do to improve the performance. Could you give some help?

Thanks,
Sicheng

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Damian Guy

unread,
Sep 26, 2017, 5:01:14 AM9/26/17
to Confluent Platform
Do you have disk stats, too?

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Sicheng Liu

unread,
Sep 26, 2017, 2:00:15 PM9/26/17
to Confluent Platform
Hi Damian,

For the disk, we are using SSD and there is almost no operations on disk for both read and write.

Thanks,
Sicheng Liu

在 2017年9月26日星期二 UTC-7上午2:01:14,Damian Guy写道:
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Matthias J. Sax

unread,
Sep 26, 2017, 5:37:09 PM9/26/17
to confluent...@googlegroups.com
What is your cache size? Are there any configs that are non-default?

Also, it seems that you are doing the same aggregation based on two time
windows of different size. You might want to consider to compute the
second one as roll-up of the first one. With caching enabled, you would
reduce the load of the second operator. You can find a sketch of this
idea here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputewindowedaggregationsoversuccessivelyincreasingtimedwindows?


-Matthias
> confluent-platf...@googlegroups.com.
>
> To post to this group, send email to
> confluent...@googlegroups.com.
>
>
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/7b6f31df-2629-405d-93ce-c4cbf924a7fb%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/7b6f31df-2629-405d-93ce-c4cbf924a7fb%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit
> https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from
> it, send an email to
> confluent-platf...@googlegroups.com <javascript:>.
> To post to this group, send email to
> confluent...@googlegroups.com <javascript:>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/058404af-c7c7-47ec-bdb1-53b9b5d2f7e9%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/058404af-c7c7-47ec-bdb1-53b9b5d2f7e9%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/9142b01e-28de-480a-b135-6d52327de2d7%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/9142b01e-28de-480a-b135-6d52327de2d7%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Damian Guy

unread,
Sep 27, 2017, 5:08:26 AM9/27/17
to confluent...@googlegroups.com
Hi, It could be that caching as actually the cause of the rocksdb seeks. You might want to try using the overloaded aggregate methods that have a StateStoreSupplier as a param and turn caching off. You can create the StateStoreSupplier like so:
Stores.create(storeName)
    .withKeys(Serdes.Bytes())
    .withValues(Serdes.ByteArray())
    .persistent()
    .windowed(windowSize, windowRetention, 3, false);

The windowSize should be the same as what is specified in your TimeWindow. The windowRetention is the until part of the TimeWindow. 


Sicheng Liu

unread,
Sep 27, 2017, 6:44:44 PM9/27/17
to Confluent Platform
Hi Matthias and Damian,

Thanks for your helpful suggestions. The only customizations we did is to increase the COMMIT_INTERVAL_MS_CONFIG to 5 minutes and CACHE_MAX_BYTES_BUFFERING_CONFIG to 2GB. After we followed the "usage patterns" Matthias provided, we see our throughput of messages almost doubled. We still need some time to verify the performance improvement and the correctness. We do see one issue that aggregators always rejoining groups after every commit. We are still investigating. I will update our final result later.

Thanks,
Sicheng Liu

在 2017年9月26日星期二 UTC-7下午2:37:09,Matthias J. Sax写道:
>
>                 To post to this group, send email to
>                 confluent...@googlegroups.com.
>
>
>                 To view this discussion on the web visit
>                 https://groups.google.com/d/msgid/confluent-platform/7b6f31df-2629-405d-93ce-c4cbf924a7fb%40googlegroups.com
>                 <https://groups.google.com/d/msgid/confluent-platform/7b6f31df-2629-405d-93ce-c4cbf924a7fb%40googlegroups.com?utm_medium=email&utm_source=footer>.
>                 For more options, visit
>                 https://groups.google.com/d/optout
>                 <https://groups.google.com/d/optout>.
>
>         --
>         You received this message because you are subscribed to the
>         Google Groups "Confluent Platform" group.
>         To unsubscribe from this group and stop receiving emails from
>         it, send an email to
>         confluent-platform+unsub...@googlegroups.com <javascript:>.
>         To post to this group, send email to
>         confluent...@googlegroups.com <javascript:>.
>         To view this discussion on the web visit
>         https://groups.google.com/d/msgid/confluent-platform/058404af-c7c7-47ec-bdb1-53b9b5d2f7e9%40googlegroups.com
>         <https://groups.google.com/d/msgid/confluent-platform/058404af-c7c7-47ec-bdb1-53b9b5d2f7e9%40googlegroups.com?utm_medium=email&utm_source=footer>.
>         For more options, visit https://groups.google.com/d/optout
>         <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
Reply all
Reply to author
Forward
0 new messages