client StreamObserver is always getting messages in sequence

30 views
Skip to first unread message

mayank kulshreshtha

unread,
Feb 9, 2022, 1:15:11 AM2/9/22
to grpc.io
I Created the managed channel with a thread pool executor.
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.executor(Executors.newFixedThreadPool(7))
.usePlaintext()
.build();
this.clientStub = StockQuoteProviderGrpc.newStub(channel);


the expectation was that the messages which i will receive in observer will be out of order  as ClientCallImpl<ReqT, RespT>.class has 
callExecutor.execute(new MessagesAvailable());

public class StockResponseStreamObserver implements StreamObserver<StockQuote> {

AtomicInteger atom = new AtomicInteger(0);
@Override
public void onNext(StockQuote stockQuote) {

if(atom.incrementAndGet()%5==0){
System.out.println("T sleeping for 1000 ms " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(LocalDateTime.now() +" : "+ stockQuote.getPrice()+" description:"+ stockQuote.getDescription()+" T "+Thread.currentThread().getName());

}

but the output is always printed in sequence even with thread.sleep to a random thread.

is this an expected behaviour?

Thanks
Mayank


Yuri Golobokov

unread,
Feb 9, 2022, 1:26:16 PM2/9/22
to grpc.io
When executor gets
callExecutor.execute(new MessagesAvailable());
inside MessageAvailable there is a loop:

It looks like the thread may process multiple messages from the producer, thus sleeping in the thread has no effect if a bunch of messages are available at the same time.

mayank kulshreshtha

unread,
Feb 9, 2022, 5:44:25 PM2/9/22
to grpc.io
adding the sleep for every 5th thread is to introduce a delay in every 5th message to make it loose the order but that did not happen.
interestingly i am getting messages one by one. at server side i could see all messages are printed immediately so its not like client is requesting one by one.

Eric Anderson

unread,
Feb 11, 2022, 3:32:45 PM2/11/22
to mayank kulshreshtha, grpc.io
On Tue, Feb 8, 2022 at 10:15 PM mayank kulshreshtha <mayan...@gmail.com> wrote:
the expectation was that the messages which i will receive in observer will be out of order

StreamObserver (and ClientCall.Listener) are not thread-safe, so gRPC serializes callbacks into them.

as ClientCallImpl<ReqT, RespT>.class has 
callExecutor.execute(new MessagesAvailable());


      this.callExecutor = new SerializingExecutor(executor);
 
is this an expected behaviour?

Yes, it is. It is very hard for applications to manage the events if they arrive out-of-order. It is much easier for users to implement a non-thread-safe class. 
Reply all
Reply to author
Forward
0 new messages