Question about Reactive Kafka performances

390 views
Skip to first unread message

Kilic Ali-Firat

unread,
Jun 22, 2017, 1:03:27 PM6/22/17
to Akka User List
Hi everyone, 

I was using the classical kafka-clients from apache and I did a little program to compare the consumer throughput using reactive-kafka (I'm using the last one). 

My current use case is to listen a kafka topic, group the data by batch, consume it and then commit. Below my code : 

    implicit  val actorSystem = ActorSystem("benchmark-kafka")
   
implicit val actorMaterializer = ActorMaterializer()
   
    val consumerSettings
=
     
ConsumerSettings(actorSystem, new StringDeserializer, new ByteArrayDeserializer)
       
.withBootstrapServers("mybroker:9094")
       
.withGroupId("kafka-producer-bench")
       
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
       
.withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "2000000")


   
Consumer
     
.committableSource(consumerSettings, Subscriptions.topics("benchmark-producer"))
     
.groupedWithin(5000, 1.seconds)
     
.mapAsync(1) { group =>
        println
((new java.sql.Timestamp(System.currentTimeMillis()) + " : Fetch " + group.size + " records"))
       
Future.successful(group)
     
}
     
.map {
       
group =>
         
group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) =>
            batch
.updated(elem.committableOffset)
         
}
     
}
     
.mapAsync(1) { msg =>
        println
((new Timestamp(System.currentTimeMillis())) + " Will commit : " + msg.getOffsets())
        msg
.commitScaladsl()
     
}
     
.runWith(Sink.ignore)


To make a little benchmark, I'm using a script founded in the kafka libs called: "kafka-producer-perf-test.sh'. 

There is something that I cannot understand : I cannot get the same throughput than the kakfa producer. 

Kafka perfs trace 

bin/kafka-producer-perf-test.sh --topic benchmark-producer --producer.config config/producer.properties --record-size 1322 --num-records 300000 --throughput 10000
[2017-06-22 18:29:47,554] WARN The configuration 'key.deserializer' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2017-06-22 18:29:47,555] WARN The configuration '
value.deserializer' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
28153 records sent, 5630,6 records/sec (7,10 MB/sec), 1187,7 ms avg latency, 2187,0 max latency.
29748 records sent, 5949,6 records/sec (7,50 MB/sec), 3279,2 ms avg latency, 4076,0 max latency.
30528 records sent, 6099,5 records/sec (7,69 MB/sec), 3946,9 ms avg latency, 4043,0 max latency.
28476 records sent, 5694,1 records/sec (7,18 MB/sec), 4278,0 ms avg latency, 4425,0 max latency.
29112 records sent, 5811,9 records/sec (7,33 MB/sec), 4168,8 ms avg latency, 4303,0 max latency.
28404 records sent, 5680,8 records/sec (7,16 MB/sec), 4486,7 ms avg latency, 4603,0 max latency.
29220 records sent, 5837,0 records/sec (7,36 MB/sec), 4081,9 ms avg latency, 4243,0 max latency.
28728 records sent, 5745,6 records/sec (7,24 MB/sec), 4381,9 ms avg latency, 4477,0 max latency.
29088 records sent, 5816,4 records/sec (7,33 MB/sec), 4089,1 ms avg latency, 4238,0 max latency.
28080 records sent, 5614,9 records/sec (7,08 MB/sec), 4472,6 ms avg latency, 4627,0 max latency.
300000 records sent, 5798,446016 records/sec (7,31 MB/sec), 3852,98 ms avg latency, 4627,00 ms max latency, 4103 ms 50th, 4585 ms 95th, 4615 ms 99th, 4623 ms 99.9th.


Reactive kafka stream trace 

[info] 2017-06-22 18:38:28.456 : Fetch 12 records
[info] 2017-06-22 18:38:28.46 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22970806}
[info] 2017-06-22 18:38:29.456 : Fetch 372 records
[info] 2017-06-22 18:38:29.459 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22971178}
[info] 2017-06-22 18:38:30.456 : Fetch 773 records
[info] 2017-06-22 18:38:30.458 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22971951}
[info] 2017-06-22 18:38:31.456 : Fetch 773 records
[info] 2017-06-22 18:38:31.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22972724}
[info] 2017-06-22 18:38:32.456 : Fetch 773 records
[info] 2017-06-22 18:38:32.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22973497}
[info] 2017-06-22 18:38:33.456 : Fetch 1546 records
[info] 2017-06-22 18:38:33.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22975043}
[info] 2017-06-22 18:38:34.456 : Fetch 1546 records
[info] 2017-06-22 18:38:34.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22976589}
[info] 2017-06-22 18:38:35.456 : Fetch 1546 records
[info] 2017-06-22 18:38:35.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22978135}
[info] 2017-06-22 18:38:36.456 : Fetch 2319 records
[info] 2017-06-22 18:38:36.458 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22980454}
[info] 2017-06-22 18:38:37.456 : Fetch 1546 records
[info] 2017-06-22 18:38:37.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22982000}
[info] 2017-06-22 18:38:38.455 : Fetch 2383 records
[info] 2017-06-22 18:38:38.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22984383}
[info] 2017-06-22 18:38:39.456 : Fetch 1546 records
[info] 2017-06-22 18:38:39.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22985929}
[info] 2017-06-22 18:38:40.456 : Fetch 2319 records
[info] 2017-06-22 18:38:40.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22988248}
[info] 2017-06-22 18:38:41.456 : Fetch 1546 records
[info] 2017-06-22 18:38:41.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22989794}
[info] 2017-06-22 18:38:42.456 : Fetch 1546 records
[info] 2017-06-22 18:38:42.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22991340}
[info] 2017-06-22 18:38:43.455 : Fetch 2319 records
[info] 2017-06-22 18:38:43.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22993659}
[info] 2017-06-22 18:38:44.456 : Fetch 1546 records
[info] 2017-06-22 18:38:44.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22995205}
[info] 2017-06-22 18:38:45.456 : Fetch 2319 records
[info] 2017-06-22 18:38:45.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22997524}
[info] 2017-06-22 18:38:46.456 : Fetch 1546 records
[info] 2017-06-22 18:38:46.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22999070}
[info] 2017-06-22 18:38:47.456 : Fetch 1546 records
[info] 2017-06-22 18:38:47.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23000616}
[info] 2017-06-22 18:38:48.456 : Fetch 2319 records
[info] 2017-06-22 18:38:48.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23002935}
[info] 2017-06-22 18:38:49.456 : Fetch 1546 records
[info] 2017-06-22 18:38:49.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23004481}
[info] 2017-06-22 18:38:50.456 : Fetch 2319 records
[info] 2017-06-22 18:38:50.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23006800}
[info] 2017-06-22 18:38:51.456 : Fetch 1546 records
[info] 2017-06-22 18:38:51.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23008346}
[info] 2017-06-22 18:38:52.455 : Fetch 1546 records
[info] 2017-06-22 18:38:52.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23009892}
[info] 2017-06-22 18:38:53.455 : Fetch 2046 records

I cannot understand the throughput difference between the akka stream kafka consumer and the kafka benchmark. Maybe my process on the input source is the cause of this latency ? 

If you have advices to increase the throughput of my consumer, I'm listening :) 

Patrik Nordwall

unread,
Jun 22, 2017, 1:17:29 PM6/22/17
to Akka User List
Around 2000 msg/s sounds very slow. Committing is typically the slowest part but you do the right batching as far as I can see. You can compare with these benchmarks http://akka.io/blog/2016/09/10/akka-stream-kafka

Source for them are in the github repo.

/Patrik

Kilic Ali-Firat

unread,
Jun 22, 2017, 6:15:05 PM6/22/17
to Akka User List
Like you said, 2000 msg / sec is really reaaaally slow. The reactive Kafka consumer should consume at the same speed than the producer.

Reading the benchmark examples in GitHub plus the benchmarks result in your link didn't help me to catch what I am doigt wrong. I respect the way to use the Reactive API and I also tested with a consumer without commiting, I get the same results.

I suspected the server on which I run the code but we reached on the same kind of machine a rate of 5000 msg / sec with a Kafka Python library.

Really strange to get such results ..

Patrik Nordwall

unread,
Jun 23, 2017, 4:51:09 AM6/23/17
to akka...@googlegroups.com
I'd recommend that you create a minimized example without addition external dependencies. Then I or someone can run it and perhaps see what is the culprit.

/Patrik
--
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Kilic Ali-Firat

unread,
Jun 23, 2017, 6:13:45 AM6/23/17
to Akka User List
Hi Patrick,

I attached a tarball with only the reactive kafka lib and a Main class with how I'm using the reactive API. 

There is a README explaining how to reproduce the throughput issue that I have in my code. 

I can reproduce the issue on a local kafka too. 

Thanking you in advance ! 
reactive-kafka-benchmark.tar.gz

Kilic Ali-Firat

unread,
Jun 23, 2017, 9:43:18 AM6/23/17
to Akka User List
Hi Patrick,

Running in a local kafka, I can have the same throughput than the kafka producer. 

But running in a hosted kafka, I cannot get the same throughput even if I use the same parameter than in local. 

The main difference between the hosted and local is that I'm using a secured connection to Kafka (the hosted one).

Kilic Ali-Firat

unread,
Jun 23, 2017, 12:34:51 PM6/23/17
to Akka User List
It's me again !

I reached a better throughput (not as the producer). 

The version of my kafka version is kafka_211-0.10.0.0 and using reactive-kafka (0.12), I can reach a throughput between 3000 and 4000 messages / sec. 

It seems that the reactive kafka version should be aligned with (or closed) to the kafka broker version. 

Kilic Ali-Firat

unread,
Jun 23, 2017, 1:10:20 PM6/23/17
to Akka User List
*kafka broker is kafka_2.11-0.10.0.0

Alex Cozzi

unread,
Jun 23, 2017, 4:24:21 PM6/23/17
to Akka User List
We observed a similar slowdown when i use reactive kafka (which uses kafka 0.10.2.0 as dependency) connecting to a 0.10.0.0 server, but we solved by forcing compiling against the 0.10.0.1 release of the kafka libraries (kafka-clients and kafka_2.11) we get the same speedup. I think it has to do with 0.10.2.0 client libraries running in compatibility mode). 

On Friday, June 23, 2017 at 10:10:20 AM UTC-7, Kilic Ali-Firat wrote:
*kafka broker is kafka_2.11-0.10.0.0

Kilic Ali-Firat

unread,
Jun 23, 2017, 4:50:33 PM6/23/17
to Akka User List
Oh so I'm not the only one to observe a such throughput.

Maybe a stupid question but how did you force the compilation against the 0.10.0.1 release of kafka libs ?

Alex Cozzi

unread,
Jun 23, 2017, 5:27:43 PM6/23/17
to Akka User List
yes, in our dependencies we do not only include reactive kafka, but also explicitly the dependencies on kafka libs so that we can force 0.10.0.1.

Alex Cozzi

unread,
Jun 23, 2017, 5:30:06 PM6/23/17
to Akka User List
essentially you put this in your build.sbt:

val kafkaV = "0.10.0.1"
val reactiveKafkaV = "0.13"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % kafkaV,
"org.apache.kafka" % "kafka_2.11" % kafkaV intransitive,
"com.typesafe.akka" %% "akka-stream-kafka" % reactiveKafkaV,

and so on...

Kilic Ali-Firat

unread,
Jun 23, 2017, 5:30:54 PM6/23/17
to Akka User List
Did you do something like this ? 

libraryDependencies ++= Seq(
 
"com.typesafe.akka" %% "akka-stream-kafka" % "0.16"
)

dependencyOverrides
+= (
 
"org.apache.kafka" % "kafka-clients" % "0.10.0.1"
)

Using dependencyOverrides on kafka-clients should not be enough to get the same speedup .. 

Kilic Ali-Firat

unread,
Jun 23, 2017, 5:38:17 PM6/23/17
to Akka User List
Thank you Alex, 

I will give a try :)

Kilic Ali-Firat

unread,
Jun 23, 2017, 6:03:39 PM6/23/17
to Akka User List
Pretty weird, I have the same build.sbt than you : 

scalaVersion := "2.11.7"



val kafkaV
= "0.10.0.1"
val reactiveKafkaV
= "0.13"


libraryDependencies
++= Seq(
 
"org.apache.kafka" % "kafka-clients" % kafkaV,
 
"org.apache.kafka" % "kafka_2.11" % kafkaV intransitive,
 
"com.typesafe.akka" %% "akka-stream-kafka" % reactiveKafkaV
)


publishMavenStyle
in ThisBuild := false


But I get fatal exceptions when running my consumer : 

[info] Loading project definition from /home/admin/reactive-hosted-kafka-benchmark/project
[info] Updating {file:/home/admin/reactive-hosted-kafka-benchmark/project/}reactive-hosted-kafka-benchmark-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] downloading https://repo1.maven.org/maven2/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar ...
[info] [SUCCESSFUL ] org.apache.commons#commons-compress;1.4.1!commons-compress.jar (110ms)
[info] downloading https://repo1.maven.org/maven2/org/apache/ant/ant/1.9.6/ant-1.9.6.jar ...
[info] [SUCCESSFUL ] org.apache.ant#ant;1.9.6!ant.jar (278ms)
[info] downloading https://repo1.maven.org/maven2/org/apache/ant/ant-launcher/1.9.6/ant-launcher-1.9.6.jar ...
[info] [SUCCESSFUL ] org.apache.ant#ant-launcher;1.9.6!ant-launcher.jar (47ms)
[info] Done updating.
[info] Set current project to reactive-hosted-kafka-benchmark (in build file:/home/admin/reactive-hosted-kafka-benchmark/)
[info] Updating {file:/home/admin/reactive-hosted-kafka-benchmark/}reactive-hosted-kafka-benchmark...
[info] Resolving jline#jline;2.12.1 ...
[info] downloading https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.0.1/kafka-clients-0.10.0.1.jar ...
[info] [SUCCESSFUL ] org.apache.kafka#kafka-clients;0.10.0.1!kafka-clients.jar (103ms)
[info] downloading https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.11/0.10.0.1/kafka_2.11-0.10.0.1.jar ...
[info] [SUCCESSFUL ] org.apache.kafka#kafka_2.11;0.10.0.1!kafka_2.11.jar (267ms)
[info] downloading https://repo1.maven.org/maven2/com/typesafe/ssl-config-akka_2.11/0.2.1/ssl-config-akka_2.11-0.2.1.jar ...
[info] [SUCCESSFUL ] com.typesafe#ssl-config-akka_2.11;0.2.1!ssl-config-akka_2.11.jar(bundle) (54ms)
[info] downloading https://repo1.maven.org/maven2/com/typesafe/config/1.3.0/config-1.3.0.jar ...
[info] [SUCCESSFUL ] com.typesafe#config;1.3.0!config.jar(bundle) (58ms)
[info] downloading https://repo1.maven.org/maven2/com/typesafe/ssl-config-core_2.11/0.2.1/ssl-config-core_2.11-0.2.1.jar ...
[info] [SUCCESSFUL ] com.typesafe#ssl-config-core_2.11;0.2.1!ssl-config-core_2.11.jar(bundle) (56ms)
[info] Done updating.
[warn] Scala version was updated by one of library dependencies:
[warn] * org.scala-lang:scala-library:(2.11.7, 2.11.6) -> 2.11.8
[warn] To force scalaVersion, add the following:
[warn] ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) }
[warn] Run 'evicted' to see detailed eviction warnings
[info] Compiling 4 Scala sources to /home/admin/reactive-hosted-kafka-benchmark/target/scala-2.11/classes...
[info] Running bioserenity_kafka_benchmark.Main
SLF4J
: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J
: Defaulting to no-operation (NOP) logger implementation
SLF4J
: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[WARN] [06/23/2017 21:56:41.413] [toto-akka.kafka.default-dispatcher-12] [akka://toto/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
[WARN] [06/23/2017 21:56:44.500] [toto-akka.kafka.default-dispatcher-14] [akka://toto/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000007b4100000, 139460608, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 139460608 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/admin/reactive-hosted-kafka-benchmark/hs_err_pid1000.log

Did you meet the same issue than me ? 

Kilic Ali-Firat

unread,
Jun 23, 2017, 6:05:52 PM6/23/17
to Akka User List
Sometimes I get : 

[WARN] [06/23/2017 21:56:44.500] [toto-akka.kafka.default-dispatcher-14] [akka://toto/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000007b4100000, 139460608, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 139460608 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/admin/reactive-hosted-kafka-benchmark/hs_err_pid1000.log


Another times I get : 

java.lang.OutOfMemoryError: Direct buffer memory
 at java
.nio.Bits.reserveMemory(Bits.java:693)
 at java
.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
 at java
.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
 at sun
.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
 at sun
.nio.ch.IOUtil.read(IOUtil.java:195)
 at sun
.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 at org
.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
 at org
.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
 at org
.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
 at org
.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
 at org
.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
 at org
.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
 at org
.apache.kafka.common.network.Selector.poll(Selector.java:283)
 at org
.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
 at org
.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
 at org
.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)[ERROR] [06/23/2017 22]

[WARN] [06/23/2017 21:56:44.500] [toto-akka.kafka.default-dispatcher-14] <span style="color: #660;" class="styled-b

Alex Cozzi

unread,
Jun 24, 2017, 6:29:55 PM6/24/17
to Akka User List
I am actually using scala 2.12. My build is a bit more complex, but here is an extract of it:
scalaVersion := "2.12.2",



val akkaV = "2.4.17"
val akkaHttpV = "10.0.4"
val scalaTestV = "3.0.3"
val scalaCheckV = "1.13.5"
val kafkaV = "0.10.0.1"
val reactiveKafkaV = "0.13"
val logbackV = "1.2.3"
val prometheusV = "0.0.23"

lazy val dyno = (project in file(".")).aggregate(
common,
analyzer,
aggregator,
inspector
).settings(CommonSettings.buildSettings)

lazy val common = (project in file("common"))
.settings(CommonSettings.buildSettings)
.settings(
libraryDependencies ++= Seq(
"org.apache.avro" % "avro" % "1.8.1",
"com.typesafe" % "config" % "1.3.1",

"com.typesafe.scala-logging" %% "scala-logging" % "3.5.0",
"io.spray" %% "spray-json" % "1.3.3",
"org.json4s" %% "json4s-jackson" % "3.5.2",
      "org.apache.kafka" % "kafka-clients" % kafkaV,
"org.apache.kafka" % "kafka_2.11" % kafkaV intransitive,
      "io.confluent" % "kafka-avro-serializer" % "3.2.0" excludeAll(ExclusionRule(organization = "ch.qos.logback"), ExclusionRule(organization = "org.slf4j")),
"nl.grons" %% "metrics-scala" % "3.5.6",
"io.prometheus" % "simpleclient" % prometheusV,

"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckV % "test"
),
fork in run := true,
specificAvroSettings
)


lazy val analyzer = (project in file("analyzer"))
.settings(CommonSettings.buildSettings)
.settings(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-stream" % akkaV,
      "com.typesafe.akka" %% "akka-stream-kafka" % reactiveKafkaV,
      "com.typesafe.akka" %% "akka-http" % akkaHttpV,
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpV,
"com.typesafe.akka" %% "akka-slf4j" % akkaV,
"org.json4s" %% "json4s-jackson" % "3.5.2",
"ch.qos.logback" % "logback-classic" % logbackV,
"ch.qos.logback" % "logback-core" % logbackV,
"com.storm-enroute" %% "scalameter" % "0.8.2" excludeAll ExclusionRule(organization = "org.mongodb"), // exclusion necessary to avoid build error on 2.12
"io.prometheus" % "simpleclient" % prometheusV,
"io.prometheus" % "simpleclient_common" % prometheusV,
"io.prometheus" % "simpleclient_hotspot" % prometheusV,
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckV % "test",
"com.typesafe.akka" %% "akka-stream-testkit" % akkaV % "test"
)
)
.enablePlugins(SbtNativePackager, DockerPlugin, JavaAppPackaging)
.settings(
javaOptions in Universal ++= Seq(
"-J-Xmn4g",
"-J-Xms16g",
"-J-Xmx16g"
)
)
.dependsOn(common)

Kilic Ali-Firat

unread,
Jun 24, 2017, 7:19:11 PM6/24/17
to Akka User List
Keeping the minimal dependencies of your build.sbt didn't change the error that I get :

scalaVersion := "2.12.2"


val akkaV
= "2.4.17"

val kafkaV
= "0.10.0.1"
val reactiveKafkaV
= "0.13"



libraryDependencies
++= Seq(

 
"org.apache.kafka" % "kafka-clients" % kafkaV,
 
"org.apache.kafka" % "kafka_2.11" % kafkaV intransitive,

 
"com.typesafe.akka" %% "akka-actor" % akkaV,
 
"com.typesafe.akka" %% "akka-stream" % akkaV,
 
"com.typesafe.akka" %% "akka-stream-kafka" % reactiveKafkaV
)


fork
in run := true



publishMavenStyle
in ThisBuild := false



error] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[info] [WARN] [06/24/2017 23:15:22.403] [toto-akka.kafka.default-dispatcher-12] [akka://toto/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
[info] [WARN] [06/24/2017 23:15:25.492] [toto-akka.kafka.default-dispatcher-14] [akka://toto/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
[error] Uncaught error from thread [toto-akka.kafka.default-dispatcher-16] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[toto]
[info] [ERROR] [SECURITY][06/24/2017 23:15:26.299] [toto-akka.kafka.default-dispatcher-16] [akka.actor.ActorSystemImpl(toto)] Uncaught error from thread [toto-akka.kafka.default-dispatcher-16] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
[error] java.lang.OutOfMemoryError: Direct buffer memory
[error] at java.nio.Bits.reserveMemory(Bits.java:693)
[error] at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
[error] at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
[error] at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
[error] at sun.nio.ch.IOUtil.read(IOUtil.java:195)

The error comes when I produce some data in the kafka topic.

Alex Cozzi

unread,
Jun 25, 2017, 12:44:05 PM6/25/17
to Akka User List
your problem seems related to running out of memory. Have you tried removing:

    .withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "2000000")

but if you are running on java8 you should not hit the out-of-memory error.

Kilic Ali-Firat

unread,
Jun 25, 2017, 2:50:06 PM6/25/17
to Akka User List
I tried to add this bound on the fetch bytes but I didn't work too. 

I'm using java 1.8.0_121 and I tested something else : I'm listening a topic in which no data has been send and I get the same error ! 

I also tried with my old consumer in scala 2.11.7 and I have the same error. 

Kilic Ali-Firat

unread,
Jun 26, 2017, 6:03:31 AM6/26/17
to Akka User List
Hi Alex, 

I forget to include a security files to listen my kafka broker. 

Using your build.sbt (scala version 2.11 and reactive kafka 0.12) that I changed, I can reach a througput between 4 000 abd 5000 msg / sec. 

For now, it enough for me. Many thanks for sharing your problems and how you fixed it ! 

Alex Cozzi

unread,
Jun 26, 2017, 1:40:27 PM6/26/17
to Akka User List
Great! 
about performance: I tried to tweak a lot of parameters, but what I found has most influence on the throughput on the reading side seems to be "receive.buffer.bytes". The optimum varies depending on your image size and other factors, but you can try to do a bit of parameter search. In my case this seems to be the best. Also, if it is acceptable to your application, using autocommit speeds this up quite a bit, but you must accept a bit of uncertainty as to when you commit an offset.

receive.buffer.bytes = 131072  # (128 * 1024)

Kilic Ali-Firat

unread,
Jun 27, 2017, 7:20:01 AM6/27/17
to Akka User List

Alex,

You're right about the parameter receive.buffer.bytes. When using his default value, my consumer throughput is about 3 000 msg / sec when my producer send about 5000 msg / sec. 

When I setting the receive.buffer.bytes to a higher value, I have the same throughput than the producer. This is a good new for us and my team, many thanks for sharing your experience with reactive kafka !

In my case, I also put "fetch.max.wait.ms" to "1000". 
Reply all
Reply to author
Forward
0 new messages