InProcessTransport Deadlock with gRPC Java 1.9

134 views
Skip to first unread message

prod...@google.com

unread,
Jan 30, 2018, 8:30:05 AM1/30/18
to grpc.io
I'm working on some unit tests for a Bidi Streaming service implementation. I'm trying to determine if my design is the issue, or if I just need to do something to guard against the deadlock condition.

When the client makes the initial request, the StreamObserver implementation for my service starts a new thread which polls the backend of my system repeatedly until the stream is closed. Each time it finds new messages in the backend, it calls the onNext method of the response StreamObserver to send the message back to the client. In my unit tests, things work properly most of the time, but sporadically enter a deadlock situation. I can't seem to understand why, particularly after adding print statements that seem to indicate that I am not encountering situations where the two threads are trying to write concurrently to the stream.

The interaction pattern shown below is what I expect. Client makes request, server finishes the initial request, then the alternate thread sends the client a response. Client gets the response and then acks it, which the server then receives. This is all as it should be, but it somehow results in the deadlock shown below.

1517317747082 Client makes initial request
1517317747082 Server receives request subscription: "subscription-1-to-test-topic-1"
1517317747092 Server finishes with client request
1517317747105 Server sends response with 6 messages
1517317747105 Client receives response with 6 messages
1517317747106 Client makes second request
1517317747106 Server receives request modify_deadline_seconds: 60
modify_deadline_ack_ids: "2-0"
1517317747110 Server finishes with client request


"subscription-1-to-test-topic-1-0":
  waiting to lock monitor 0x00007f2fa0005248 (object 0x00000005cf7e5ee0, a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream),
  which is held by "main"
"main":
  waiting to lock monitor 0x00007f2fa0003b98 (object 0x00000005cf7e61e0, a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream),
  which is held by "subscription-1-to-test-topic-1-0"
Java stack information for the threads listed above:
===================================================
"subscription-1-to-test-topic-1-0":
at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.request(InProcessTransport.java:516)
- waiting to lock <0x00000005cf7e5ee0> (a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream)
at io.grpc.internal.ClientCallImpl.request(ClientCallImpl.java:383)
at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37)
at io.grpc.ForwardingClientCall.request(ForwardingClientCall.java:37)
at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.request(ClientCalls.java:356)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:410)
at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36)
at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:530)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.messagesAvailable(ClientCallImpl.java:547)
at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.writeMessage(InProcessTransport.java:378)
- locked <0x00000005cf7e61e0> (a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream)
at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:134)
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341)
at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$StreamingPullStreamObserver.streamMessages(SubscriberImpl.java:372)
at com.google.cloud.partners.pubsub.kafka.SubscriberImpl$StreamingPullStreamObserver$$Lambda$13/179779934.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"main":
at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.request(InProcessTransport.java:325)
- waiting to lock <0x00000005cf7e61e0> (a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream)
at io.grpc.internal.ServerCallImpl.request(ServerCallImpl.java:78)
at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:252)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:252)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:626)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.messagesAvailable(ServerImpl.java:637)
at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.writeMessage(InProcessTransport.java:564)
- locked <0x00000005cf7e5ee0> (a io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream)
at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:438)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:320)
at com.google.cloud.partners.pubsub.kafka.SubscriberImplTest.streamingPullSingleClientPullModifyAndAck(SubscriberImplTest.java:548)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)

I've yet to see an example similar to mine where the streaming server handler uses a separate thread to generate and send messages back to the client, so that's why I'm curious as to whether that is my primary issue or not. I appreciate any help and thanks in advance.

Spencer Fang

unread,
Jan 30, 2018, 1:04:06 PM1/30/18
to prod...@google.com, grpc.io
A possible caveat is that the StreamObserver is not synchronized, and will require the application to provide thread safety if shared across threads. A workaround is to use the ServerCallStreamObserver's onReadyHandler to push messages:

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscribe@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/d1df2ba7-fab1-4aa4-a8bd-95e40ded3217%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Spencer Fang

Kun Zhang

unread,
Jan 30, 2018, 5:20:53 PM1/30/18
to grpc.io
Are you using direct executor? If so, switching to a normal executor should avoid the deadlock.

Jason Prodonovich

unread,
Jan 30, 2018, 5:31:51 PM1/30/18
to grp...@googlegroups.com

Yes I'm using the direct executor that comes as part of the GrpcServerRule @Rule for testing.

I also tried Spencer's suggestion but the onReady handler only seemed to fire once upon the stream's initialization, so I was not able to get the repeated polling I needed to send messages to the client.

--
You received this message because you are subscribed to a topic in the Google Groups "grpc.io" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/grpc-io/oS1Eb8UkIjM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to grpc-io+unsubscribe@googlegroups.com.

To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
Reply all
Reply to author
Forward
0 new messages