Sure.
public void greetServerStream(GreetServerStreamRequest request, StreamObserver<GreetServerStreamResponse> responseObserver) {
String message = request.getGreeting().getMessage();
try {
for (int i = 0; i < 10; i++) {
String result = "Hello " + message + ", response number: " + i;
GreetServerStreamResponse response = GreetServerStreamResponse.newBuilder()
.setResult(result)
.build();
responseObserver.onNext(response);
Thread.sleep(1000L);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
responseObserver.onCompleted();
}
}
public void greetServerStream(GreetServerStreamRequest request, StreamObserver<GreetServerStreamResponse> responseObserver) {
String message = request.getGreeting().getMessage();
MultiRunnable runnable1 = new MultiRunnable("thread1", message+"1", responseObserver);
runnable1.start();
MultiRunnable runnable2 = new MultiRunnable("thread1", message+"2", responseObserver);
runnable2.start();
}
public class MultiRunnable implements Runnable {
private Thread thread;
private final String threadName;
private final String message;
private final ServerCallStreamObserver<GreetServerStreamResponse> serverCallStreamObserver;
public MultiRunnable(String threadName, String message, StreamObserver<GreetServerStreamResponse> responseObserver) {
this.threadName = threadName;
this.message = message;
this.serverCallStreamObserver = (ServerCallStreamObserver<GreetServerStreamResponse>)responseObserver;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
String result = "Hello " + message + ", response number: " + i;
GreetServerStreamResponse response = GreetServerStreamResponse.newBuilder()
.setResult(result)
.build();
synchronized (serverCallStreamObserver) {
serverCallStreamObserver.onNext(response);
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
} finally {
serverCallStreamObserver.onCompleted();
}
}
public void start () {
System.out.println("Starting " + threadName );
if (thread == null) {
thread = new Thread (this, threadName);
thread.start ();
}
}
}
So each thread will run its own for loop and call onNext() to send response stream back to client. I initially did not add the synchronized block above and got the following error:
"Stream 3 sent too many headers EOS"
Upon adding the block, I was able to make multiple threads executing onNext() concurrently.
I am just curious about whether this is the right way of doing synchronization. From the best practice perspective, what is the best way of doing synchronization? Is multithreading a common thing to do or recommended when calling onNext()?
Thanks,
Bill