[grpc-java 0.14.0] How to catch an exception upon StreamObserver's onNext?

427 views
Skip to first unread message

gold.d...@gmail.com

unread,
May 23, 2016, 12:06:51 AM5/23/16
to grpc.io
Hello. I am using a StreamObserver to return a stream of response. I found out that calling StreamObserver's onNext doesn't throw an exception when it fails to send a response. It throws an exception internally and prints out a stack trace like below.

java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_40]
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_40]
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_40]
        at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_40]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_40]
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) ~[netty-buffer-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1054) ~[netty-buffer-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:245) ~[netty-transport-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:106) ~[netty-transport-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:527) [netty-transport-4.1.0.CR7.jar:4.1.0.CR7]
[error] i.g.n.NettyServerTransport in grpc-default-worker-ELG-14-5 - Transport failed
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_40]
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_40]
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_40]
        at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_40]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_40]
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) ~[netty-buffer-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1054) ~[netty-buffer-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:245) ~[netty-transport-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:106) ~[netty-transport-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:527) [netty-transport-4.1.0.CR7.jar:4.1.0.CR7]


I was wondering if there is any way that I can catch this exception as soon as I call onNext. CancellationListener doesn't work in this case as CancellationListener.cancelled doesn't get triggered right away.

Eric Anderson

unread,
May 23, 2016, 5:19:23 PM5/23/16
to Taehyun Park, grpc.io
On Sun, May 22, 2016 at 9:06 PM, <gold.d...@gmail.com> wrote:
Hello. I am using a StreamObserver to return a stream of response.

Does that mean this was logged on the server?

I found out that calling StreamObserver's onNext doesn't throw an exception when it fails to send a response.

Yes. It is completely asynchronous. On server-side, the call will be cancelled. (You can cast the response StreamObserver to ServerCallStreamObserver (new!).)

I was wondering if there is any way that I can catch this exception as soon as I call onNext.

On server-side, when the call is cancelled, it should cause onNext to throw. Note that this doesn't mean previous messages were successfully sent, since the call returns immediately and the message transmission is asynchronous.
 
CancellationListener doesn't work in this case as CancellationListener.cancelled doesn't get triggered right away.

That doesn't sound right. It should. Maybe the listener is scheduled to an executor whose threads are busy? Or maybe it's a bug. How reproducible is this? 

Taehyun Park

unread,
May 23, 2016, 8:58:22 PM5/23/16
to grpc.io, gold.d...@gmail.com

Does that mean this was logged on the server?
Yes this was logged on the server.

That doesn't sound right. It should. Maybe the listener is scheduled to an executor whose threads are busy? Or maybe it's a bug. How reproducible is this?  
The listener was scheduled to an executor that is instantiated to handle cancelled requests. I tested my server with only two clients so it was not a case.
To reproduce this,
1. Client A connects to the gRPC server that returns a stream of responses. The server adds a CancellationListener to a given context at this time. 
2. Client A becomes offline by turning on air plane mode. (First, I hoped the server should calls cancelled as soon as Client A became offline but it didn't happen here. When I force close my app, the server calls cancelled method so I'm not sure why losing network connection doesn't call it right away)
3. The server calls onNext on Client A's StreamObserver multiple times. (I casted my StreamObserver to ServerCallStreamObserver and isCancelled was still true.)
4. CancellationListener.cancelled is called after a few seconds.
5. Connection Error is printed out after a few seconds.

[warn] i.g.n.NettyServerHandler in grpc-default-worker-ELG-3-3 - Connection Error
java.io.IOException: Operation timed out
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_40]
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_40]
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_40]
        at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_40]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_40]
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) ~[netty-buffer-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1054) ~[netty-buffer-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:245) ~[netty-transport-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:106) ~[netty-transport-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:527) [netty-transport-4.1.0.CR7.jar:4.1.0.CR7]
[error] i.g.n.NettyServerTransport in grpc-default-worker-ELG-3-3 - Transport failed
java.io.IOException: Operation timed out
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_40]
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_40]
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_40]
        at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_40]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_40]
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) ~[netty-buffer-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1054) ~[netty-buffer-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:245) ~[netty-transport-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:106) ~[netty-transport-4.1.0.CR7.jar:4.1.0.CR7]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:527) [netty-transport-4.1.0.CR7.jar:4.1.0.CR7]

Above is a full stack trace logged on the server.
Reply all
Reply to author
Forward
0 new messages