Getting io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write coul

873 views
Skip to first unread message

Avinash Dongre

unread,
Sep 20, 2016, 10:01:01 AM9/20/16
to grpc.io
I am getting following exception on Server

Server started, listening on 50051
Sep 20, 2016 7:24:07 PM io.grpc.netty.NettyServerHandler onStreamError
WARNING: Stream Error
io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147)
at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:487)
at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:468)
at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:103)
at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:343)
at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1151)
at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:1099)
at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:521)
at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:527)
at io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:522)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onRstStreamRead(DefaultHttp2ConnectionDecoder.java:396)
at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onRstStreamRead(Http2InboundFrameLogger.java:80)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readRstStreamFrame(DefaultHttp2FrameReader.java:490)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:253)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:155)
at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:113)
at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:333)
at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:393)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)

Following is my Server Code 


package io.test;

import com.google.protobuf.ByteString;
import io.test.generated.GreeterGrpc;
import io.test.generated.GreeterOuterClass;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.internal.ServerImpl;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;

import java.util.ArrayList;
import java.util.List;

/**
* Hello world!
*/
public class GrpcServer {

/* The port on which the server should run */
private int port = 50051;
private Server server;

private void start() throws Exception {
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build().start();
System.out.println("Server started, listening on " + port);
}

private void stop() {
if (server != null) {
server.shutdown();
}
}

public static void main(String[] args) throws Exception {
final GrpcServer server = new GrpcServer();
server.start();
System.in.read();
server.stop();
}

private class GreeterImpl extends GreeterGrpc.GreeterImplBase {

@Override
public void sayHello(GreeterOuterClass.HelloRequest req,
StreamObserver<GreeterOuterClass.HelloReply> responseObserver) {

GreeterOuterClass.HelloReply.Builder builder = GreeterOuterClass.HelloReply.newBuilder();
final ServerCallStreamObserver<GreeterOuterClass.HelloReply> scso =
(io.grpc.stub.ServerCallStreamObserver<GreeterOuterClass.HelloReply>) responseObserver;

final long BATCH_SIZE = 100;

Runnable drain = new Runnable() {
long remaining = 20_000_000L;

@Override
public void run() {
if (remaining == 0L) return;
for (; remaining > 0L && scso.isReady(); remaining--) {
builder.addMessage(ByteString.copyFrom(new byte[32 * 1024]));
scso.onNext(builder.build());
}
if (remaining == 0) {
scso.onCompleted();
}
}
};
scso.setOnReadyHandler(drain);
drain.run();
}
}
}

and Following is my Client code.

package io.test;

import com.google.common.util.concurrent.UncaughtExceptionHandlers;
import io.test.generated.GreeterGrpc;
import io.test.generated.GreeterOuterClass;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* Created by adongre on 9/20/16.
*/
public class GrpcClient {

public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost",
50051)
.usePlaintext(true)
.executor(new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
pool -> {
ForkJoinWorkerThread thread =
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setDaemon(true);

return thread;
}, UncaughtExceptionHandlers.systemExit(), true /* async */))
.build();
GreeterGrpc.GreeterStub asyncStub = GreeterGrpc.newStub(channel);

final CountDownLatch finishLatch = new CountDownLatch(1);
final List<Integer> rowsReceived = new ArrayList<>();
final long timeStart = System.nanoTime();
StreamObserver<GreeterOuterClass.HelloReply> observer = new StreamObserver<GreeterOuterClass.HelloReply>() {
@Override
public void onNext(GreeterOuterClass.HelloReply helloReply) {
rowsReceived.add(helloReply.getMessageList().size());
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onCompleted() {
int rowsReceivedNum = rowsReceived.stream().mapToInt(i -> i.intValue()).sum();
System.out.println("GrpcClient.onCompleted.Num Of Rows -> " + rowsReceivedNum + " In "
+ (System.nanoTime() - timeStart));
finishLatch.countDown();
}
};
asyncStub.sayHello(GreeterOuterClass.HelloRequest.newBuilder().build(), observer);
try {
finishLatch.await(30, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Paul Johnston

unread,
Sep 20, 2016, 12:26:33 PM9/20/16
to grpc.io
What's what the purpose of System.in.read() before stopping the server?  The HelloWorld started example uses something like the following:


  public void start() throws IOException {
    server = ServerBuilder.forPort(port)
        .addService(new GreeterImpl())
        .build()
        .start();
    logger.info("Server started, listening on " + port);
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        // Use stderr here since the logger may have been reset by its JVM shutdown hook.
        System.err.println("*** shutting down gRPC server since JVM is shutting down");
        HelloWorldServer.this.stop();
        System.err.println("*** server shut down");
      }
    });
  }

  public void stop() {
    if (server != null) {
      server.shutdown();
    }
  }

  /**
   * Await termination on the main thread since the grpc library uses daemon threads.
   */
  private void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
      server.awaitTermination();
    }
  }

  /**
   * Main launches the server from the command line.
   */
  public static void main(String[] args) throws IOException, InterruptedException {
    final HelloWorldServer server = new HelloWorldServer();
    server.start();
    server.blockUntilShutdown();

Avinash Dongre

unread,
Sep 20, 2016, 12:37:54 PM9/20/16
to grpc.io
Thanks Paul,
I changed the code but still I am getting the same Exception.

Thanks
Avinash

Avinash Dongre

unread,
Sep 20, 2016, 1:11:13 PM9/20/16
to grpc.io
Thanks Paul,
My mistake on coding side.
Issue is fixed is now.

Thanks
Avinash
Reply all
Reply to author
Forward
0 new messages