Aftre upgrading to 2.0.0, can not post data to Rest Proxy in a high rate

86 views
Skip to first unread message

Hao Ren

unread,
Jan 29, 2016, 9:13:56 AM1/29/16
to Confluent Platform
Hi,

We have some code for benchmarking kafka throughput.

The code is simple. It posts 300 messages to rest proxy. A HTTP POST request is sent every 20 ms.

This scenario works well on confluent 1.0.1.

After upgrading to 2.0.0, we encountered the following problem:

In the beginning several messages are received by Rest Proxy. We get notified by http response.

And then, the client code is hung. No exception is thrown.

From the console of Rest Proxy, I can only found some logs of post request with status code 200. Nothing else.

However, if we set the interval to 200 ms. It works in 2.0.0. 

The client code ends normally.

It seems that 2.0.0 can not process high rate http request.

We toggle the debug mode in kafka-rest.property for more info.

Here is what we got:

[2016-01-29 14:17:50,913] INFO Got user-level KeeperException when processing sessionid:0x1528d87f0e20000 type:setData cxid:0xab zxid:0x79 txntype:-1 reqpath:n/a Error Path:/config/topics/lbc.job.multiposting.input Error:KeeperErrorCode = NoNode for /config/topics/lbc.job.multiposting.input (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-01-29 14:17:50,915] INFO Got user-level KeeperException when processing sessionid:0x1528d87f0e20000 type:create cxid:0xac zxid:0x7a txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-01-29 14:17:50,921] INFO Topic creation {"version":1,"partitions":{"0":[0]}} (kafka.admin.AdminUtils$)
[2016-01-29 14:17:50,932] INFO [KafkaApi-0] Auto creation of topic lbc.job.multiposting.input with 1 partitions and replication factor 1 is successful! (kafka.server.KafkaApis)
[2016-01-29 14:17:50,934] INFO Got user-level KeeperException when processing sessionid:0x1528d87f0e20000 type:create cxid:0xb4 zxid:0x7d txntype:-1 reqpath:n/a Error Path:/brokers/topics/lbc.job.multiposting.input/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/lbc.job.multiposting.input/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-01-29 14:17:50,936] INFO Got user-level KeeperException when processing sessionid:0x1528d87f0e20000 type:create cxid:0xb5 zxid:0x7e txntype:-1 reqpath:n/a Error Path:/brokers/topics/lbc.job.multiposting.input/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/lbc.job.multiposting.input/partitions (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-01-29 14:17:50,957] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [lbc.job.multiposting.input,0] (kafka.server.ReplicaFetcherManager)
[2016-01-29 14:17:50,960] INFO Completed load of log lbc.job.multiposting.input-0 with log end offset 0 (kafka.log.Log)
[2016-01-29 14:17:50,962] INFO Created log for partition [lbc.job.multiposting.input,0] in /tmp/kafka-logs with properties {compression.type -> producer, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)

FYI, the test is based on the full confluent stack on localhost. The client code is just using akka-http to send http request.

$ netstat -pnt | grep 8082
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 127.0.0.1:8082          127.0.0.1:50227         TIME_WAIT   -               
tcp6       0      0 127.0.0.1:8082          127.0.0.1:50224         TIME_WAIT   -               
tcp6       0      0 127.0.0.1:8082          127.0.0.1:50226         TIME_WAIT   -               
tcp6       0      0 127.0.0.1:8082          127.0.0.1:50225         TIME_WAIT   -

And here is the jstack (sorry for the length)

--------------------------------------------------------------------------------------------------------------------------------------------------------

$ jstack 18896
2016-01-29 14:59:05
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.72-b15 mixed mode):

"Attach Listener" #34 daemon prio=9 os_prio=0 tid=0x00007fd5e4001000 nid=0x4add waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"DestroyJavaVM" #33 prio=5 os_prio=0 tid=0x00007fd628014000 nid=0x49d1 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"ForkJoinPool-2-worker-7" #31 daemon prio=5 os_prio=0 tid=0x00007fd58c004800 nid=0x4a11 waiting on condition [0x00007fd60c66d000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000775603f20> (a scala.concurrent.forkjoin.ForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"ForkJoinPool-2-worker-1" #30 daemon prio=5 os_prio=0 tid=0x0000000001a31800 nid=0x4a0f waiting on condition [0x00007fd60c76e000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000775603f20> (a scala.concurrent.forkjoin.ForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.actor.default-dispatcher-14" #27 prio=5 os_prio=0 tid=0x00007fd5bc072800 nid=0x4a0c waiting on condition [0x00007fd60cc71000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.actor.default-dispatcher-13" #26 prio=5 os_prio=0 tid=0x00007fd59c002800 nid=0x4a0b waiting on condition [0x00007fd60cd72000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.actor.default-dispatcher-12" #25 prio=5 os_prio=0 tid=0x00007fd5c4033000 nid=0x4a0a waiting on condition [0x00007fd60ce73000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.actor.default-dispatcher-11" #24 prio=5 os_prio=0 tid=0x00007fd5c4031800 nid=0x4a09 waiting on condition [0x00007fd60cf74000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.actor.default-dispatcher-10" #23 prio=5 os_prio=0 tid=0x00007fd5a4002000 nid=0x4a08 waiting on condition [0x00007fd60d075000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.actor.default-dispatcher-9" #22 prio=5 os_prio=0 tid=0x00007fd5c0004000 nid=0x4a07 waiting on condition [0x00007fd60d176000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.actor.default-dispatcher-8" #21 prio=5 os_prio=0 tid=0x00007fd5c0001000 nid=0x4a06 waiting on condition [0x00007fd60d277000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.io.pinned-dispatcher-7" #20 prio=5 os_prio=0 tid=0x00007fd5c401f000 nid=0x4a05 runnable [0x00007fd60d377000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x000000076d24e548> (a sun.nio.ch.Util$2)
- locked <0x000000076d24e4c0> (a java.util.Collections$UnmodifiableSet)
- locked <0x000000076d24e340> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.tryRun(SelectionHandler.scala:114)
at akka.io.SelectionHandler$ChannelRegistryImpl$Task.run(SelectionHandler.scala:215)
at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.run(SelectionHandler.scala:147)
at akka.util.SerializedSuspendableExecutionContext.run$1(SerializedSuspendableExecutionContext.scala:68)
at akka.util.SerializedSuspendableExecutionContext.run(SerializedSuspendableExecutionContext.scala:72)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"metrics-meter-tick-thread-2" #19 daemon prio=5 os_prio=0 tid=0x00007fd628fbb000 nid=0x4a04 waiting on condition [0x00007fd60e0fd000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c6803b58> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"metrics-meter-tick-thread-1" #18 daemon prio=5 os_prio=0 tid=0x00007fd628f61000 nid=0x4a02 waiting on condition [0x00007fd60e3fe000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c6803b58> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"default-akka.actor.default-dispatcher-5" #14 prio=5 os_prio=0 tid=0x00007fd5c8001000 nid=0x49f4 waiting on condition [0x00007fd60ec1a000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.actor.default-dispatcher-4" #13 prio=5 os_prio=0 tid=0x00007fd5c4001000 nid=0x49f3 waiting on condition [0x00007fd60ed1b000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.actor.default-dispatcher-3" #12 prio=5 os_prio=0 tid=0x00007fd628e46800 nid=0x49f2 waiting on condition [0x00007fd60ee1c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-akka.actor.default-dispatcher-2" #11 prio=5 os_prio=0 tid=0x00007fd628e34000 nid=0x49f1 waiting on condition [0x00007fd60f11d000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006c681ae68> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"default-scheduler-1" #10 prio=5 os_prio=0 tid=0x00007fd628da2000 nid=0x49f0 waiting on condition [0x00007fd60f21e000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at akka.actor.LightArrayRevolverScheduler.waitNanos(Scheduler.scala:226)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:405)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)

"Monitor Ctrl-Break" #9 daemon prio=5 os_prio=0 tid=0x00007fd628b31000 nid=0x49ee runnable [0x00007fd60f736000]
   java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at com.intellij.rt.execution.application.AppMain$1.run(AppMain.java:90)
at java.lang.Thread.run(Thread.java:745)

"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007fd6280d3000 nid=0x49dd runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007fd6280c8000 nid=0x49dc waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fd6280c6000 nid=0x49db waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fd6280c3000 nid=0x49da waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007fd6280c1800 nid=0x49d9 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007fd62808e800 nid=0x49d8 in Object.wait() [0x00007fd615094000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c68138d0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000006c68138d0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007fd628089800 nid=0x49d7 in Object.wait() [0x00007fd615195000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c680ad40> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000006c680ad40> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"VM Thread" os_prio=0 tid=0x00007fd628082000 nid=0x49d6 runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007fd628029800 nid=0x49d2 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007fd62802b000 nid=0x49d3 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007fd62802d000 nid=0x49d4 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007fd62802e800 nid=0x49d5 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007fd6280d6000 nid=0x49de waiting on condition 

JNI global references: 310

--------------------------------------------------------------------------------------------------------------------------------------------------------

It seems that akka closes the connexion and rest proxy is still waiting. But this problem does not exist in 1.0.1

Any help is highly appreciated.

Hao

Hao Ren

unread,
Feb 3, 2016, 12:12:39 PM2/3/16
to Confluent Platform
Hi,

We have executed the benchmark code on cluster. We have the same problem with CP 2.0.0.
Connexion is in wait state....

Has anyone encountered this kind of problem ?
Please let me know if you need any info.

Hao

Ewen Cheslack-Postava

unread,
Feb 4, 2016, 10:15:56 PM2/4/16
to Confluent Platform
Hao,

I'm not aware of any issues. We run some benchmarks to compare the native and REST producers and they are pretty close to comparable still on the same hardware (distributed across multiple machines).

A couple of questions -- when you say it doesn't happen with 1.0.1, is that on the exact same setup (i.e. same test, literally just moving between directories for 1.0.1 and 2.0.0)? And it looks like the jstack is for the client since it has Akka in it. Do you have a stack trace for the REST proxy, which might make clearer what's going on?

Does the REST proxy recover and simply accept data slowly, or does it get stuck indefinitely? TIME_WAIT isn't necessarily wrong, it just indicates the connection was closed and it's in a state meant to handle any extra packets that may be delivered late. Since there are only 4 connections there, I'm surprised if it was getting stuck due to the TIME_WAIT state.

-Ewen



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/9e0bb9ce-aa04-40e2-85ad-fdfb0982a4f4%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Hao Ren

unread,
Feb 5, 2016, 4:42:39 AM2/5/16
to Confluent Platform
Ewen,

Thank you for the information.

I confirm that it was on the exact same setup. I have run the same code on both of the CP versions. Just v2.0.0 makes client code hang.

Yes, the jstack is for client side. The following is the jstack for kafka-rest: (still some words after)

------------------------------------------------------------------------------------------------------------------------------------------------

$ jstack 4125
2016-02-05 10:15:12
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.72-b15 mixed mode):

"Attach Listener" #33 daemon prio=9 os_prio=0 tid=0x00007ff690001000 nid=0x2634 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Scheduler-337799666" #31 prio=5 os_prio=0 tid=0x00007ff65c004800 nid=0x1727 waiting on condition [0x00007ff647dfe000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000f5c96ab0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"org.eclipse.jetty.server.session.HashSessionManager@1338fb5Timer" #30 daemon prio=5 os_prio=0 tid=0x00007ff6d0b8b000 nid=0x104c waiting on condition [0x00007ff6b03ee000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000f5a4b058> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"qtp1119622337-29" #29 prio=5 os_prio=0 tid=0x00007ff6d0b7d800 nid=0x104b waiting on condition [0x00007ff6b04ef000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000f56fb0d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:389)
at org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:531)
at org.eclipse.jetty.util.thread.QueuedThreadPool.access$700(QueuedThreadPool.java:47)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:590)
at java.lang.Thread.run(Thread.java:745)

"qtp1119622337-28" #28 prio=5 os_prio=0 tid=0x00007ff6d0b7b800 nid=0x104a waiting on condition [0x00007ff6b05f0000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000f56fb0d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:389)
at org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:531)
at org.eclipse.jetty.util.thread.QueuedThreadPool.access$700(QueuedThreadPool.java:47)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:590)
at java.lang.Thread.run(Thread.java:745)

"qtp1119622337-27" #27 prio=5 os_prio=0 tid=0x00007ff6d0b7a000 nid=0x1049 waiting on condition [0x00007ff6b06f1000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000f56fb0d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:389)
at org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:531)
at org.eclipse.jetty.util.thread.QueuedThreadPool.access$700(QueuedThreadPool.java:47)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:590)
at java.lang.Thread.run(Thread.java:745)

"qtp1119622337-26" #26 prio=5 os_prio=0 tid=0x00007ff6d0b78000 nid=0x1048 waiting on condition [0x00007ff6b07f2000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000f56fb0d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:389)
at org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:531)
at org.eclipse.jetty.util.thread.QueuedThreadPool.access$700(QueuedThreadPool.java:47)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:590)
at java.lang.Thread.run(Thread.java:745)

"qtp1119622337-25" #25 prio=5 os_prio=0 tid=0x00007ff6d0b76800 nid=0x1047 waiting on condition [0x00007ff6b08f3000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000f56fb0d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:389)
at org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:531)
at org.eclipse.jetty.util.thread.QueuedThreadPool.access$700(QueuedThreadPool.java:47)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:590)
at java.lang.Thread.run(Thread.java:745)

"qtp1119622337-24" #24 prio=5 os_prio=0 tid=0x00007ff6d0b74800 nid=0x1046 waiting on condition [0x00007ff6b09f4000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000f56fb0d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:389)
at org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:531)
at org.eclipse.jetty.util.thread.QueuedThreadPool.access$700(QueuedThreadPool.java:47)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:590)
at java.lang.Thread.run(Thread.java:745)

"qtp1119622337-23-selector-ServerConnectorManager@6327d70d/1" #23 prio=5 os_prio=0 tid=0x00007ff6d0b71000 nid=0x1045 runnable [0x00007ff6b0af5000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f5c96af8> (a sun.nio.ch.Util$2)
- locked <0x00000000f5c96ae0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f5cba300> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
at org.eclipse.jetty.io.SelectorManager$ManagedSelector.select(SelectorManager.java:600)
at org.eclipse.jetty.io.SelectorManager$ManagedSelector.run(SelectorManager.java:549)
at org.eclipse.jetty.util.thread.NonBlockingThread.run(NonBlockingThread.java:52)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)

"qtp1119622337-22-selector-ServerConnectorManager@6327d70d/0" #22 prio=5 os_prio=0 tid=0x00007ff6d0b70800 nid=0x1044 runnable [0x00007ff6b0df6000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f5c96b58> (a sun.nio.ch.Util$2)
- locked <0x00000000f5c96b40> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f5c57a98> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
at org.eclipse.jetty.io.SelectorManager$ManagedSelector.select(SelectorManager.java:600)
at org.eclipse.jetty.io.SelectorManager$ManagedSelector.run(SelectorManager.java:549)
at org.eclipse.jetty.util.thread.NonBlockingThread.run(NonBlockingThread.java:52)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)

"Consumer Expiration Thread" #19 daemon prio=5 os_prio=0 tid=0x00007ff6d0ae0800 nid=0x1039 in Object.wait() [0x00007ff6b0ef7000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000f583f878> (a io.confluent.kafkarest.ConsumerManager)
at io.confluent.kafkarest.ConsumerManager$ExpirationThread.run(ConsumerManager.java:400)
- locked <0x00000000f583f878> (a io.confluent.kafkarest.ConsumerManager)

"Thread-2" #18 prio=5 os_prio=0 tid=0x00007ff6d0a95800 nid=0x1038 in Object.wait() [0x00007ff6b0ff8000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000f5969858> (a io.confluent.kafkarest.ConsumerWorker)
at io.confluent.kafkarest.SystemTime.waitOn(SystemTime.java:22)
at io.confluent.kafkarest.ConsumerWorker.run(ConsumerWorker.java:76)
- locked <0x00000000f5969858> (a io.confluent.kafkarest.ConsumerWorker)

"kafka-producer-network-thread | producer-3" #17 daemon prio=5 os_prio=0 tid=0x00007ff6d0ac2000 nid=0x1037 runnable [0x00007ff6b10f9000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f5a4b028> (a sun.nio.ch.Util$2)
- locked <0x00000000f5a4b040> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f566b998> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:425)
at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)

"kafka-producer-network-thread | producer-2" #16 daemon prio=5 os_prio=0 tid=0x00007ff6d0a35800 nid=0x1036 runnable [0x00007ff6b13fa000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f56fb100> (a sun.nio.ch.Util$2)
- locked <0x00000000f56fb118> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f572f110> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:425)
at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)

"kafka-producer-network-thread | producer-1" #15 daemon prio=5 os_prio=0 tid=0x00007ff6d09ee800 nid=0x1035 runnable [0x00007ff6b14fa000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f594e510> (a sun.nio.ch.Util$2)
- locked <0x00000000f594e528> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f566bdd0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:425)
at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)

"main-EventThread" #14 daemon prio=5 os_prio=0 tid=0x00007ff6d083a800 nid=0x1034 waiting on condition [0x00007ff6b19fc000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000f557d5f0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:493)

"main-SendThread(localhost:2181)" #13 daemon prio=5 os_prio=0 tid=0x00007ff6d084c000 nid=0x1033 runnable [0x00007ff6b1afd000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f5581c50> (a sun.nio.ch.Util$2)
- locked <0x00000000f5581c68> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f557db90> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:274)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1035)

"ZkClient-EventThread-12-localhost:2181" #12 daemon prio=5 os_prio=0 tid=0x00007ff6d0830000 nid=0x1032 waiting on condition [0x00007ff6b1dfe000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000f565d0f0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)

"RMI TCP Accept-0" #11 daemon prio=5 os_prio=0 tid=0x00007ff6d060c000 nid=0x1030 runnable [0x00007ff6c0260000]
   java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:52)
at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
at java.lang.Thread.run(Thread.java:745)

"Service Thread" #9 daemon prio=9 os_prio=0 tid=0x00007ff6d0140800 nid=0x102f runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #8 daemon prio=9 os_prio=0 tid=0x00007ff6d013d800 nid=0x102e waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #7 daemon prio=9 os_prio=0 tid=0x00007ff6d013b800 nid=0x102d waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #6 daemon prio=9 os_prio=0 tid=0x00007ff6d0139000 nid=0x102c waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #5 daemon prio=9 os_prio=0 tid=0x00007ff6d0137000 nid=0x102b runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Surrogate Locker Thread (Concurrent GC)" #4 daemon prio=9 os_prio=0 tid=0x00007ff6d0136000 nid=0x102a waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007ff6d0102800 nid=0x1029 in Object.wait() [0x00007ff6d4752000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000f557d638> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000000f557d638> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007ff6d00fe000 nid=0x1028 in Object.wait() [0x00007ff6d4853000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000f557f150> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000000f557f150> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=0 tid=0x00007ff6d000e000 nid=0x1021 in Object.wait() [0x00007ff6d9770000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000f5bb1688> (a java.lang.Object)
at java.lang.Object.wait(Object.java:502)
at org.eclipse.jetty.util.thread.QueuedThreadPool.join(QueuedThreadPool.java:381)
- locked <0x00000000f5bb1688> (a java.lang.Object)
at org.eclipse.jetty.server.Server.join(Server.java:556)
at io.confluent.rest.Application.join(Application.java:224)
at io.confluent.kafkarest.KafkaRestMain.main(KafkaRestMain.java:39)

"VM Thread" os_prio=0 tid=0x00007ff6d00f6000 nid=0x1027 runnable 

"Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007ff6d001f000 nid=0x1022 runnable 

"Gang worker#1 (Parallel GC Threads)" os_prio=0 tid=0x00007ff6d0021000 nid=0x1023 runnable 

"Gang worker#2 (Parallel GC Threads)" os_prio=0 tid=0x00007ff6d0022800 nid=0x1024 runnable 

"Gang worker#3 (Parallel GC Threads)" os_prio=0 tid=0x00007ff6d0024800 nid=0x1025 runnable 

"Concurrent Mark-Sweep GC Thread" os_prio=0 tid=0x00007ff6d0067000 nid=0x1026 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007ff6d0621800 nid=0x1031 waiting on condition 

JNI global references: 368

------------------------------------------------------------------------------------------------------------------------------------------------

Rest proxy does not accept data. just stuck indefinitely.

I have recently found problem. I think it is caused by akka http's back pressure mechanism.

Because I have tried to write a script to call "curl" in loop without any interval between requests. It works with v2.0.0.

That made me re-check my client code, and I found the following:

val http = Http()
def postRequest(request: HttpRequest, index: Int): Future[String] = {
http.singleRequest(request) flatMap {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
entity.dataBytes.runWith(Sink.head).map(x => new String(x.toArray)) // not OK
     // entity.dataBytes.runWith(Sink.last).map(x => new String(x.toArray)) // Ok
case HttpResponse(statusCode, _, entity, _) =>
entity.dataBytes.runWith(Sink.ignore)
Promise.failed(new KafkaRestProxyException(statusCode.toString())).future
}
}

For each response of kafka rest proxy, I just use `Sink.head` to materialize the response since each connexion is a stream with one element. It works for v1.0.1 but not v2.0.0
Then, I changed Sink.head to `Sink.last` (as the commented line shows), it works for both of CP versions. 
It seems that there are multiple responses in one connexion. If the entities of some responses are not consumed in one connexion(stream), then Akka http will consider it as a signal for back pressure, which is described in akka http doc (in the end of the page): http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/scala/http/client-side/request-level.html
And `Sink.last` here just forces to materialize all the entities of a stream of responses, then no back pressure here.

It makes me wonder if Kafka-Proxy 2.0.0 group multiple tcp requests in one connexion(stream) and it does the same thing for responses, while v1.0.1 does not.
I am still trying to understand the problem. If something is wrong, please correct me.

Thank you,
Hao




To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

Hao Ren

unread,
Feb 5, 2016, 4:46:50 AM2/5/16
to Confluent Platform
The end of the previous post is trimmed, please open it.


On Friday, January 29, 2016 at 3:13:56 PM UTC+1, Hao Ren wrote:

Hao Ren

unread,
Feb 5, 2016, 10:09:26 AM2/5/16
to Confluent Platform
I confirm that it was an ancient problem is akka-http-2.0.3_2.10 which is the latest version for scala 2.10.
Since our stack is based on scala 2.10, we have not choice.
It was a coincidence that updating to CP 2.0.0 hits the problem while CP 1.0.1 does not.

In short, two solutions:
1) In scala 2.10, use Sink.last instead of Sink.head.
2) Use latest version of akka http for scala 2.11 and upgrade the stack to scala 2.11

Thank you for your help.
Reply all
Reply to author
Forward
0 new messages