Configure MAX_CONCURRENT_STREAMS for HTTP/2

1,659 views
Skip to first unread message

Bill Li

unread,
May 18, 2021, 6:36:38 PM5/18/21
to grpc.io
Hi,

Does anyone know or have an example for configuring the parameter MAX_CONCURRENT_STREAMS for gRPC server written in Java?

Thanks,
Bill

Sanjay Pujare

unread,
May 18, 2021, 7:28:43 PM5/18/21
to Bill Li, grpc.io
With NettyServerBuilder you can use maxConcurrentCallsPerConnection(int maxCalls) 

This is the same as setting MAX_CONCURRENT_STREAMS per connection.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/cbb2fd35-a01a-4128-879d-08cbc91049b0n%40googlegroups.com.

Bill Li

unread,
May 18, 2021, 9:56:00 PM5/18/21
to grpc.io
Got it, thanks!

I am currently implementing a server-side streaming application. Can one ResponseObserver be shared by multiple threads sending response stream back to the client through onNext() method? Just want to confirm if there is a race condition in calling onNext() at the same time.

Sanjay Pujare

unread,
May 19, 2021, 1:47:54 AM5/19/21
to Bill Li, grpc.io
Pls include a code snippet of what you want to do. Show how you intend to share "one ResponseObserver".

Bill Li

unread,
May 19, 2021, 5:32:21 PM5/19/21
to grpc.io
Sure.

Here is the code without multithreading:

public void greetServerStream(GreetServerStreamRequest request, StreamObserver<GreetServerStreamResponse> responseObserver) {
    String message = request.getGreeting().getMessage();
    try {
        for (int i = 0; i < 10; i++) {
            String result = "Hello " + message + ", response number: " + i;
            GreetServerStreamResponse response = GreetServerStreamResponse.newBuilder()
            .setResult(result)
            .build();
            responseObserver.onNext(response);
            Thread.sleep(1000L);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        responseObserver.onCompleted();
    }
}

From the code above, I implemented a multithreaded version as shown below:

public void greetServerStream(GreetServerStreamRequest request, StreamObserver<GreetServerStreamResponse> responseObserver) {
    String message = request.getGreeting().getMessage();
    MultiRunnable runnable1 = new MultiRunnable("thread1", message+"1", responseObserver);
    runnable1.start();
    MultiRunnable runnable2 = new MultiRunnable("thread1", message+"2", responseObserver);
    runnable2.start();
}

public class MultiRunnable implements Runnable {
    private Thread thread;
    private final String threadName;
    private final String message;
    private final ServerCallStreamObserver<GreetServerStreamResponse> serverCallStreamObserver;

    public MultiRunnable(String threadName, String message, StreamObserver<GreetServerStreamResponse> responseObserver) {
        this.threadName = threadName;
        this.message = message;
        this.serverCallStreamObserver = (ServerCallStreamObserver<GreetServerStreamResponse>)responseObserver;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                String result = "Hello " + message + ", response number: " + i;
                GreetServerStreamResponse response = GreetServerStreamResponse.newBuilder()
                .setResult(result)
                .build();
                synchronized (serverCallStreamObserver) {
                    serverCallStreamObserver.onNext(response);

                }
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            System.out.println("Thread " + threadName + " interrupted.");
        } finally {
            serverCallStreamObserver.onCompleted();
        }
    }

    public void start () {
        System.out.println("Starting " + threadName );
        if (thread == null) {
            thread = new Thread (this, threadName);
            thread.start ();
        }
    }
}

So each thread will run its own for loop and call onNext() to send response stream back to client. I initially did not add the synchronized block above and got the following error:

"Stream 3 sent too many headers EOS"

Upon adding the block, I was able to make multiple threads executing onNext() concurrently.

I am just curious about whether this is the right way of doing synchronization. From the best practice perspective, what is the best way of doing synchronization? Is multithreading a common thing to do or recommended when calling onNext()?

Thanks,
Bill

Sanjay Pujare

unread,
May 20, 2021, 4:34:55 PM5/20/21
to Bill Li, grpc.io
On Wed, May 19, 2021 at 2:32 PM Bill Li <zhl...@gmail.com> wrote:
....

Upon adding the block, I was able to make multiple threads executing onNext() concurrently.

I am just curious about whether this is the right way of doing synchronization. From the best practice perspective, what is the best way of doing synchronization?

Your code should work since you are synchronizing on serverCallStreamObserver which is being used in multiple threads. I can't think of anything better in this particular case. 

Is multithreading a common thing to do or recommended when calling onNext()?

Most non-trivial applications would use multiple threads and they should be able to use gRPC streams with appropriate synchronization in place (as you have done above). Another (and a better?) way to do this would be to use a Queue and have a single thread reading from the Queue to feed the responseObserver and your producer threads feeding the Queue. 
 

Bill Li

unread,
May 20, 2021, 8:35:14 PM5/20/21
to grpc.io
Cool thank you!

One additional question, given the application and scenario I have above, will setting a higher value for MAX_CONCURRENT_STREAMS help in terms of the throughput from server to the client?

Thanks,
Bill

Sanjay Pujare

unread,
May 21, 2021, 2:11:31 AM5/21/21
to Bill Li, grpc.io
On Thu, May 20, 2021 at 5:35 PM Bill Li <zhl...@gmail.com> wrote:
Cool thank you!

One additional question, given the application and scenario I have above, will setting a higher value for MAX_CONCURRENT_STREAMS help in terms of the throughput from server to the client?

Hard to say. Increasing the concurrent streams (calls) on a single connection can result in higher throughput if there is spare capacity in the connection that can be utilized by the concurrent streams.

On the other hand it could have an adverse impact on the throughput if the streams are bottlenecked on that single connection when there is no spare capacity and/or the connection has some network capacity/connectivity issues.

It will be nice if you could share your results after experimenting with MAX_CONCURRENT_STREAMS.
 

Bill Li

unread,
May 21, 2021, 11:59:12 AM5/21/21
to grpc.io
Okay, sure. I will share that once I have some experimental results.
Reply all
Reply to author
Forward
0 new messages