We have some code for benchmarking kafka throughput.
The code is simple. It posts 300 messages to rest proxy. A HTTP POST request is sent every 20 ms.
This scenario works well on confluent 1.0.1.
In the beginning several messages are received by Rest Proxy. We get notified by http response.
And then, the client code is hung. No exception is thrown.
From the console of Rest Proxy, I can only found some logs of post request with status code 200. Nothing else.
However, if we set the interval to 200 ms. It works in 2.0.0.
The client code ends normally.
It seems that 2.0.0 can not process high rate http request.
We toggle the debug mode in kafka-rest.property for more info.
FYI, the test is based on the full confluent stack on localhost. The client code is just using akka-http to send http request.
--------------------------------------------------------------------------------------------------------------------------------------------------------
$ jstack 18896
2016-01-29 14:59:05
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.72-b15 mixed mode):
"Attach Listener" #34 daemon prio=9 os_prio=0 tid=0x00007fd5e4001000 nid=0x4add waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"DestroyJavaVM" #33 prio=5 os_prio=0 tid=0x00007fd628014000 nid=0x49d1 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"ForkJoinPool-2-worker-7" #31 daemon prio=5 os_prio=0 tid=0x00007fd58c004800 nid=0x4a11 waiting on condition [0x00007fd60c66d000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000775603f20> (a scala.concurrent.forkjoin.ForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"ForkJoinPool-2-worker-1" #30 daemon prio=5 os_prio=0 tid=0x0000000001a31800 nid=0x4a0f waiting on condition [0x00007fd60c76e000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000775603f20> (a scala.concurrent.forkjoin.ForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.actor.default-dispatcher-14" #27 prio=5 os_prio=0 tid=0x00007fd5bc072800 nid=0x4a0c waiting on condition [0x00007fd60cc71000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.actor.default-dispatcher-13" #26 prio=5 os_prio=0 tid=0x00007fd59c002800 nid=0x4a0b waiting on condition [0x00007fd60cd72000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.actor.default-dispatcher-12" #25 prio=5 os_prio=0 tid=0x00007fd5c4033000 nid=0x4a0a waiting on condition [0x00007fd60ce73000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.actor.default-dispatcher-11" #24 prio=5 os_prio=0 tid=0x00007fd5c4031800 nid=0x4a09 waiting on condition [0x00007fd60cf74000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.actor.default-dispatcher-10" #23 prio=5 os_prio=0 tid=0x00007fd5a4002000 nid=0x4a08 waiting on condition [0x00007fd60d075000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.actor.default-dispatcher-9" #22 prio=5 os_prio=0 tid=0x00007fd5c0004000 nid=0x4a07 waiting on condition [0x00007fd60d176000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.actor.default-dispatcher-8" #21 prio=5 os_prio=0 tid=0x00007fd5c0001000 nid=0x4a06 waiting on condition [0x00007fd60d277000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.io.pinned-dispatcher-7" #20 prio=5 os_prio=0 tid=0x00007fd5c401f000 nid=0x4a05 runnable [0x00007fd60d377000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x000000076d24e548> (a sun.nio.ch.Util$2)
- locked <0x000000076d24e4c0> (a java.util.Collections$UnmodifiableSet)
- locked <0x000000076d24e340> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.tryRun(SelectionHandler.scala:114)
at akka.io.SelectionHandler$ChannelRegistryImpl$Task.run(SelectionHandler.scala:215)
at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.run(SelectionHandler.scala:147)
at akka.util.SerializedSuspendableExecutionContext.run$1(SerializedSuspendableExecutionContext.scala:68)
at akka.util.SerializedSuspendableExecutionContext.run(SerializedSuspendableExecutionContext.scala:72)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"metrics-meter-tick-thread-2" #19 daemon prio=5 os_prio=0 tid=0x00007fd628fbb000 nid=0x4a04 waiting on condition [0x00007fd60e0fd000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c6803b58> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"metrics-meter-tick-thread-1" #18 daemon prio=5 os_prio=0 tid=0x00007fd628f61000 nid=0x4a02 waiting on condition [0x00007fd60e3fe000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c6803b58> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"default-akka.actor.default-dispatcher-5" #14 prio=5 os_prio=0 tid=0x00007fd5c8001000 nid=0x49f4 waiting on condition [0x00007fd60ec1a000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.actor.default-dispatcher-4" #13 prio=5 os_prio=0 tid=0x00007fd5c4001000 nid=0x49f3 waiting on condition [0x00007fd60ed1b000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.actor.default-dispatcher-3" #12 prio=5 os_prio=0 tid=0x00007fd628e46800 nid=0x49f2 waiting on condition [0x00007fd60ee1c000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-akka.actor.default-dispatcher-2" #11 prio=5 os_prio=0 tid=0x00007fd628e34000 nid=0x49f1 waiting on condition [0x00007fd60f11d000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"default-scheduler-1" #10 prio=5 os_prio=0 tid=0x00007fd628da2000 nid=0x49f0 waiting on condition [0x00007fd60f21e000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at akka.actor.LightArrayRevolverScheduler.waitNanos(Scheduler.scala:226)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:405)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
"Monitor Ctrl-Break" #9 daemon prio=5 os_prio=0 tid=0x00007fd628b31000 nid=0x49ee runnable [0x00007fd60f736000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at com.intellij.rt.execution.application.AppMain$1.run(AppMain.java:90)
at java.lang.Thread.run(Thread.java:745)
"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007fd6280d3000 nid=0x49dd runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007fd6280c8000 nid=0x49dc waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fd6280c6000 nid=0x49db waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fd6280c3000 nid=0x49da waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007fd6280c1800 nid=0x49d9 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007fd62808e800 nid=0x49d8 in Object.wait() [0x00007fd615094000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c68138d0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000006c68138d0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007fd628089800 nid=0x49d7 in Object.wait() [0x00007fd615195000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c680ad40> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000006c680ad40> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
"VM Thread" os_prio=0 tid=0x00007fd628082000 nid=0x49d6 runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007fd628029800 nid=0x49d2 runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007fd62802b000 nid=0x49d3 runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007fd62802d000 nid=0x49d4 runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007fd62802e800 nid=0x49d5 runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007fd6280d6000 nid=0x49de waiting on condition
JNI global references: 310
--------------------------------------------------------------------------------------------------------------------------------------------------------
It seems that akka closes the connexion and rest proxy is still waiting. But this problem does not exist in 1.0.1
Any help is highly appreciated.