implicit val actorSystem = ActorSystem("benchmark-kafka")
implicit val actorMaterializer = ActorMaterializer()
val consumerSettings =
ConsumerSettings(actorSystem, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers("mybroker:9094")
.withGroupId("kafka-producer-bench")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "2000000")
Consumer
.committableSource(consumerSettings, Subscriptions.topics("benchmark-producer"))
.groupedWithin(5000, 1.seconds)
.mapAsync(1) { group =>
println((new java.sql.Timestamp(System.currentTimeMillis()) + " : Fetch " + group.size + " records"))
Future.successful(group)
}
.map {
group =>
group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) =>
batch.updated(elem.committableOffset)
}
}
.mapAsync(1) { msg =>
println((new Timestamp(System.currentTimeMillis())) + " Will commit : " + msg.getOffsets())
msg.commitScaladsl()
}
.runWith(Sink.ignore)
bin/kafka-producer-perf-test.sh --topic benchmark-producer --producer.config config/producer.properties --record-size 1322 --num-records 300000 --throughput 10000
[2017-06-22 18:29:47,554] WARN The configuration 'key.deserializer' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2017-06-22 18:29:47,555] WARN The configuration 'value.deserializer' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
28153 records sent, 5630,6 records/sec (7,10 MB/sec), 1187,7 ms avg latency, 2187,0 max latency.
29748 records sent, 5949,6 records/sec (7,50 MB/sec), 3279,2 ms avg latency, 4076,0 max latency.
30528 records sent, 6099,5 records/sec (7,69 MB/sec), 3946,9 ms avg latency, 4043,0 max latency.
28476 records sent, 5694,1 records/sec (7,18 MB/sec), 4278,0 ms avg latency, 4425,0 max latency.
29112 records sent, 5811,9 records/sec (7,33 MB/sec), 4168,8 ms avg latency, 4303,0 max latency.
28404 records sent, 5680,8 records/sec (7,16 MB/sec), 4486,7 ms avg latency, 4603,0 max latency.
29220 records sent, 5837,0 records/sec (7,36 MB/sec), 4081,9 ms avg latency, 4243,0 max latency.
28728 records sent, 5745,6 records/sec (7,24 MB/sec), 4381,9 ms avg latency, 4477,0 max latency.
29088 records sent, 5816,4 records/sec (7,33 MB/sec), 4089,1 ms avg latency, 4238,0 max latency.
28080 records sent, 5614,9 records/sec (7,08 MB/sec), 4472,6 ms avg latency, 4627,0 max latency.
300000 records sent, 5798,446016 records/sec (7,31 MB/sec), 3852,98 ms avg latency, 4627,00 ms max latency, 4103 ms 50th, 4585 ms 95th, 4615 ms 99th, 4623 ms 99.9th.
[info] 2017-06-22 18:38:28.456 : Fetch 12 records
[info] 2017-06-22 18:38:28.46 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22970806}
[info] 2017-06-22 18:38:29.456 : Fetch 372 records
[info] 2017-06-22 18:38:29.459 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22971178}
[info] 2017-06-22 18:38:30.456 : Fetch 773 records
[info] 2017-06-22 18:38:30.458 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22971951}
[info] 2017-06-22 18:38:31.456 : Fetch 773 records
[info] 2017-06-22 18:38:31.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22972724}
[info] 2017-06-22 18:38:32.456 : Fetch 773 records
[info] 2017-06-22 18:38:32.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22973497}
[info] 2017-06-22 18:38:33.456 : Fetch 1546 records
[info] 2017-06-22 18:38:33.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22975043}
[info] 2017-06-22 18:38:34.456 : Fetch 1546 records
[info] 2017-06-22 18:38:34.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22976589}
[info] 2017-06-22 18:38:35.456 : Fetch 1546 records
[info] 2017-06-22 18:38:35.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22978135}
[info] 2017-06-22 18:38:36.456 : Fetch 2319 records
[info] 2017-06-22 18:38:36.458 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22980454}
[info] 2017-06-22 18:38:37.456 : Fetch 1546 records
[info] 2017-06-22 18:38:37.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22982000}
[info] 2017-06-22 18:38:38.455 : Fetch 2383 records
[info] 2017-06-22 18:38:38.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22984383}
[info] 2017-06-22 18:38:39.456 : Fetch 1546 records
[info] 2017-06-22 18:38:39.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22985929}
[info] 2017-06-22 18:38:40.456 : Fetch 2319 records
[info] 2017-06-22 18:38:40.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22988248}
[info] 2017-06-22 18:38:41.456 : Fetch 1546 records
[info] 2017-06-22 18:38:41.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22989794}
[info] 2017-06-22 18:38:42.456 : Fetch 1546 records
[info] 2017-06-22 18:38:42.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22991340}
[info] 2017-06-22 18:38:43.455 : Fetch 2319 records
[info] 2017-06-22 18:38:43.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22993659}
[info] 2017-06-22 18:38:44.456 : Fetch 1546 records
[info] 2017-06-22 18:38:44.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22995205}
[info] 2017-06-22 18:38:45.456 : Fetch 2319 records
[info] 2017-06-22 18:38:45.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22997524}
[info] 2017-06-22 18:38:46.456 : Fetch 1546 records
[info] 2017-06-22 18:38:46.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22999070}
[info] 2017-06-22 18:38:47.456 : Fetch 1546 records
[info] 2017-06-22 18:38:47.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23000616}
[info] 2017-06-22 18:38:48.456 : Fetch 2319 records
[info] 2017-06-22 18:38:48.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23002935}
[info] 2017-06-22 18:38:49.456 : Fetch 1546 records
[info] 2017-06-22 18:38:49.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23004481}
[info] 2017-06-22 18:38:50.456 : Fetch 2319 records
[info] 2017-06-22 18:38:50.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23006800}
[info] 2017-06-22 18:38:51.456 : Fetch 1546 records
[info] 2017-06-22 18:38:51.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23008346}
[info] 2017-06-22 18:38:52.455 : Fetch 1546 records
[info] 2017-06-22 18:38:52.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23009892}
[info] 2017-06-22 18:38:53.455 : Fetch 2046 records
Reading the benchmark examples in GitHub plus the benchmarks result in your link didn't help me to catch what I am doigt wrong. I respect the way to use the Reactive API and I also tested with a consumer without commiting, I get the same results.
I suspected the server on which I run the code but we reached on the same kind of machine a rate of 5000 msg / sec with a Kafka Python library.
Really strange to get such results ..
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
*kafka broker is kafka_2.11-0.10.0.0
val kafkaV = "0.10.0.1"
val reactiveKafkaV = "0.13"libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % kafkaV,
"org.apache.kafka" % "kafka_2.11" % kafkaV intransitive,
"com.typesafe.akka" %% "akka-stream-kafka" % reactiveKafkaV,and so on...
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-kafka" % "0.16"
)
dependencyOverrides += (
"org.apache.kafka" % "kafka-clients" % "0.10.0.1"
)
scalaVersion := "2.11.7"
val kafkaV = "0.10.0.1"
val reactiveKafkaV = "0.13"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % kafkaV,
"org.apache.kafka" % "kafka_2.11" % kafkaV intransitive,
"com.typesafe.akka" %% "akka-stream-kafka" % reactiveKafkaV
)
publishMavenStyle in ThisBuild := false
[info] Loading project definition from /home/admin/reactive-hosted-kafka-benchmark/project
[info] Updating {file:/home/admin/reactive-hosted-kafka-benchmark/project/}reactive-hosted-kafka-benchmark-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] downloading https://repo1.maven.org/maven2/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar ...
[info] [SUCCESSFUL ] org.apache.commons#commons-compress;1.4.1!commons-compress.jar (110ms)
[info] downloading https://repo1.maven.org/maven2/org/apache/ant/ant/1.9.6/ant-1.9.6.jar ...
[info] [SUCCESSFUL ] org.apache.ant#ant;1.9.6!ant.jar (278ms)
[info] downloading https://repo1.maven.org/maven2/org/apache/ant/ant-launcher/1.9.6/ant-launcher-1.9.6.jar ...
[info] [SUCCESSFUL ] org.apache.ant#ant-launcher;1.9.6!ant-launcher.jar (47ms)
[info] Done updating.
[info] Set current project to reactive-hosted-kafka-benchmark (in build file:/home/admin/reactive-hosted-kafka-benchmark/)
[info] Updating {file:/home/admin/reactive-hosted-kafka-benchmark/}reactive-hosted-kafka-benchmark...
[info] Resolving jline#jline;2.12.1 ...
[info] downloading https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.0.1/kafka-clients-0.10.0.1.jar ...
[info] [SUCCESSFUL ] org.apache.kafka#kafka-clients;0.10.0.1!kafka-clients.jar (103ms)
[info] downloading https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.11/0.10.0.1/kafka_2.11-0.10.0.1.jar ...
[info] [SUCCESSFUL ] org.apache.kafka#kafka_2.11;0.10.0.1!kafka_2.11.jar (267ms)
[info] downloading https://repo1.maven.org/maven2/com/typesafe/ssl-config-akka_2.11/0.2.1/ssl-config-akka_2.11-0.2.1.jar ...
[info] [SUCCESSFUL ] com.typesafe#ssl-config-akka_2.11;0.2.1!ssl-config-akka_2.11.jar(bundle) (54ms)
[info] downloading https://repo1.maven.org/maven2/com/typesafe/config/1.3.0/config-1.3.0.jar ...
[info] [SUCCESSFUL ] com.typesafe#config;1.3.0!config.jar(bundle) (58ms)
[info] downloading https://repo1.maven.org/maven2/com/typesafe/ssl-config-core_2.11/0.2.1/ssl-config-core_2.11-0.2.1.jar ...
[info] [SUCCESSFUL ] com.typesafe#ssl-config-core_2.11;0.2.1!ssl-config-core_2.11.jar(bundle) (56ms)
[info] Done updating.
[warn] Scala version was updated by one of library dependencies:
[warn] * org.scala-lang:scala-library:(2.11.7, 2.11.6) -> 2.11.8
[warn] To force scalaVersion, add the following:
[warn] ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) }
[warn] Run 'evicted' to see detailed eviction warnings
[info] Compiling 4 Scala sources to /home/admin/reactive-hosted-kafka-benchmark/target/scala-2.11/classes...
[info] Running bioserenity_kafka_benchmark.Main
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[WARN] [06/23/2017 21:56:41.413] [toto-akka.kafka.default-dispatcher-12] [akka://toto/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
[WARN] [06/23/2017 21:56:44.500] [toto-akka.kafka.default-dispatcher-14] [akka://toto/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000007b4100000, 139460608, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 139460608 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/admin/reactive-hosted-kafka-benchmark/hs_err_pid1000.log
[WARN] [06/23/2017 21:56:44.500] [toto-akka.kafka.default-dispatcher-14] [akka://toto/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000007b4100000, 139460608, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 139460608 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/admin/reactive-hosted-kafka-benchmark/hs_err_pid1000.log
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)[ERROR] [06/23/2017 22]
[WARN] [06/23/2017 21:56:44.500] [toto-akka.kafka.default-dispatcher-14] <span style="color: #660;" class="styled-b
scalaVersion := "2.12.2",
val akkaV = "2.4.17"
val akkaHttpV = "10.0.4"
val scalaTestV = "3.0.3"
val scalaCheckV = "1.13.5"
val kafkaV = "0.10.0.1"
val reactiveKafkaV = "0.13"
val logbackV = "1.2.3"
val prometheusV = "0.0.23"
lazy val dyno = (project in file(".")).aggregate(
common,
analyzer,
aggregator,
inspector
).settings(CommonSettings.buildSettings)
lazy val common = (project in file("common"))
.settings(CommonSettings.buildSettings)
.settings(
libraryDependencies ++= Seq(
"org.apache.avro" % "avro" % "1.8.1",
"com.typesafe" % "config" % "1.3.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.5.0",
"io.spray" %% "spray-json" % "1.3.3",
"org.json4s" %% "json4s-jackson" % "3.5.2",
"org.apache.kafka" % "kafka-clients" % kafkaV,
"org.apache.kafka" % "kafka_2.11" % kafkaV intransitive,
"io.confluent" % "kafka-avro-serializer" % "3.2.0" excludeAll(ExclusionRule(organization = "ch.qos.logback"), ExclusionRule(organization = "org.slf4j")),
"nl.grons" %% "metrics-scala" % "3.5.6",
"io.prometheus" % "simpleclient" % prometheusV,
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckV % "test"
),
fork in run := true,
specificAvroSettings
)
lazy val analyzer = (project in file("analyzer"))
.settings(CommonSettings.buildSettings)
.settings(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-stream" % akkaV,
"com.typesafe.akka" %% "akka-stream-kafka" % reactiveKafkaV,
"com.typesafe.akka" %% "akka-http" % akkaHttpV,
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpV,
"com.typesafe.akka" %% "akka-slf4j" % akkaV,
"org.json4s" %% "json4s-jackson" % "3.5.2",
"ch.qos.logback" % "logback-classic" % logbackV,
"ch.qos.logback" % "logback-core" % logbackV,
"com.storm-enroute" %% "scalameter" % "0.8.2" excludeAll ExclusionRule(organization = "org.mongodb"), // exclusion necessary to avoid build error on 2.12
"io.prometheus" % "simpleclient" % prometheusV,
"io.prometheus" % "simpleclient_common" % prometheusV,
"io.prometheus" % "simpleclient_hotspot" % prometheusV,
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckV % "test",
"com.typesafe.akka" %% "akka-stream-testkit" % akkaV % "test"
)
)
.enablePlugins(SbtNativePackager, DockerPlugin, JavaAppPackaging)
.settings(
javaOptions in Universal ++= Seq(
"-J-Xmn4g",
"-J-Xms16g",
"-J-Xmx16g"
)
)
.dependsOn(common)
scalaVersion := "2.12.2"
val akkaV = "2.4.17"
val kafkaV = "0.10.0.1"
val reactiveKafkaV = "0.13"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % kafkaV,
"org.apache.kafka" % "kafka_2.11" % kafkaV intransitive,
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-stream" % akkaV,
"com.typesafe.akka" %% "akka-stream-kafka" % reactiveKafkaV
)
fork in run := true
publishMavenStyle in ThisBuild := false
error] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[info] [WARN] [06/24/2017 23:15:22.403] [toto-akka.kafka.default-dispatcher-12] [akka://toto/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
[info] [WARN] [06/24/2017 23:15:25.492] [toto-akka.kafka.default-dispatcher-14] [akka://toto/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
[error] Uncaught error from thread [toto-akka.kafka.default-dispatcher-16] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[toto]
[info] [ERROR] [SECURITY][06/24/2017 23:15:26.299] [toto-akka.kafka.default-dispatcher-16] [akka.actor.ActorSystemImpl(toto)] Uncaught error from thread [toto-akka.kafka.default-dispatcher-16] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
[error] java.lang.OutOfMemoryError: Direct buffer memory
[error] at java.nio.Bits.reserveMemory(Bits.java:693)
[error] at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
[error] at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
[error] at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
[error] at sun.nio.ch.IOUtil.read(IOUtil.java:195)
receive.buffer.bytes = 131072 # (128 * 1024)