[grpc-go] Multiple concurrent clients and bidirectional streaming

8,419 views
Skip to first unread message

mbi

unread,
May 15, 2017, 7:56:15 AM5/15/17
to grpc.io

Hi,

We are using protobuffers and Go version of gRPC to implement communication layer in the library we are building. Communication-wise the situation in our case is that we have multiple (possibly concurrent) clients and a server. We are using bidirectional streams between clients and the server, as we want a "ping-pong like" behavior where a series of requests and responses has to flow between a specific client and a server for the RPC service to successfully complete - we can think of it as a sort of a session or a transaction.

The RPC service is defined as follows:

service MyProtocol {
  rpc ExecuteProtocol (stream Message) returns (stream Message) {}
}

For the above service,  a series of 6 messages of type Message are transferred, something like this: client_request_1, server_response_1, client_request_2, server_response_2, client_request_3, server_response_3.

We fire up a gRPC server like this:

grpcServer := grpc.NewServer(grpc.MaxConcurrentStreams(math.MaxUint32))
pb.RegisterProtocolServer(grpcServer, NewMyProtocolServer())
grpcServer.Serve(listener)

And then we want to test with several concurrent clients with the help of goroutines, like this:
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
   defer wg.Done()
   go runMyProtocolClient()
}
wg.Wait()

When we run only one client, the whole "protocol" (e.g. our sequence of requests and responses in a given order) executes perfectly and without errors. If we run several clients sequentially, everything is OK too - all clients finish successfully, and the server stays alive listening for further requests. However, when testing with several concurrent clients, this is what happens: the server at first seems to be able to handle requests from all of the clients, but as soon as some client successfully finishes, the server exits with error: An error ocurred: rpc error: code = Canceled desc = context canceled. All other clients subsequently get the EOF error and exit.

Each of our clients maintains its own connection to the gRPC server (this is on purpose, as we want to simulate clients on different physical nodes) as well as its own client stream. We are passing context.Background() to the client's ExecuteProtocol call, and the client never explicitly closes either its stream, or its connection to the gRPC server.

Can anyone help us with this? Could this be due to an issue in our design of the RPC service, since we assumed that the server can maintain several concurrent streams for separate clients? We're all fairly new to Go and gRPC, and I hope this question is not too generic - implementation of MyProtocolServer and MyProtocolClient was intentionally left out here. I will, of course, provide more details and code if you think it will help.
 
Thanks in advance!
Best regards

mancab...@gmail.com

unread,
May 15, 2017, 8:01:05 AM5/15/17
to grpc.io, mancab...@gmail.com
I'm also pasting logs from the server and concurrent clients:

Server log:

2017/05/15 13:27:02 Registering services
2017/05/15 13:27:02 Instantiating new protocol server
2017/05/15 13:27:02 Starting GRPC server on port 7000
2017/05/15 13:27:03 [Server 0xc82022e000] spawned, stream=0xc8201ba120
2017/05/15 13:27:03 [Server 0xc8200340e0] spawned, stream=0xc8201743e0
2017/05/15 13:27:03 Recieved request from the stream schema:SCHEMA_TYPE_A empty:<> 
2017/05/15 13:27:03 Client requested SCHEMA_TYPE_A variant 1
2017/05/15 13:27:03 Recieved request from the stream schema:SCHEMA_TYPE_A empty:<> 
2017/05/15 13:27:03 Client requested SCHEMA_TYPE_A variant 1
2017/05/15 13:27:03 Obtained H: &{16199448777604385719635926144695029257912338135202671482767928883512 6396801387809980950520705369354328071365311907783138908138286589481}
2017/05/15 13:27:03 Successfully sent response:%!(EXTRA *comm.Message=ec_group_element:<X:"\231\322\255\271=\2307#\314\271\314j\276\213\366\3429\317\355i\313\350\n\224\334zy8" Y:"<\275\303F\315\331\002\201\353\346\277;W\344\263\267^\363H\353g\016\356\255\031\267\246)" > )
2017/05/15 13:27:03 [Server 0xc820284000] spawned, stream=0xc8201e20c0
2017/05/15 13:27:03 Recieved request from the stream schema:SCHEMA_TYPE_A empty:<> 
2017/05/15 13:27:03 Client requested SCHEMA_TYPE_A variant 1
2017/05/15 13:27:03 Obtained H: &{5222284638485064219491810334813365272080065672847296816805697342382 2275113287299095758274364943442341731168296655544534119203197860101}
2017/05/15 13:27:03 Successfully sent response:%!(EXTRA *comm.Message=ec_group_element:<X:"1\226\253\265\205*\243j\035\276\016\262\300P\033z\244K\312\204&\247\037s\203\320\313\256" Y:"\025\232~y}\230\365\345\256]\331\303\204H1[\033\207}\255\023[\333\257V2\271\005" > )
2017/05/15 13:27:03 Obtained H: &{6305095417828729319307152541529557580493743402421550520609646844543 5483979098490912922176532492129495518788718140012781136961875522954}
2017/05/15 13:27:03 Successfully sent response:%!(EXTRA *comm.Message=ec_group_element:<X:";\336\326\202\347\367\272\345\022TSk\023_\315R\223\246\005\002\2314\300\301\321<\032\177" Y:"4\022\320\222C5=O{\320\000\005:\223\305\253\247\227\233\232{\203\343\346c\264\265\212" > )
2017/05/15 13:27:03 Recieved request from the stream ec_group_element:<X:"\356U1\036\nw\243\361\275\307-\353\243\265\376\234\377\0173@\231s\211\r\205\211\362\336" Y:"\310\311\227)2\004\362\355h\366m\334&I\374\240\244\024>W\343\035\264D\312\340\251B" > 
2017/05/15 13:27:03 Successfully sent response:%!(EXTRA *comm.Message=empty:<> ) ******* this message indicates the last message to be sent in
the "protocol" when communicating with one of the clients

2017/05/15 13:27:03 An error ocurred: rpc error: code = Canceled desc = context canceled
exit status 1

Client log:

Will start clients concurrently
Running client #0
Running client #1
Running client #2
2017/05/15 13:27:03 schemaTypeVariant [SCHEMA_TYPE_A], variant 1
2017/05/15 13:27:03 schemaTypeVariant [SCHEMA_TYPE_A], variant 1
2017/05/15 13:27:03 schemaTypeVariant [SCHEMA_TYPE_A], variant 1
2017/05/15 13:27:03 NewProtocol client spawned: [client=0xc820180300, conn=0xc8200340a8, stream=0xc8200340c0]
2017/05/15 13:27:03 Instantiated [Generic client 0xc8200340a0]
2017/05/15 13:27:03 Started client SCHEMA_TYPE_A (variant 1)
2017/05/15 13:27:03 NewProtocol client spawned: [client=0xc8201804a0, conn=0xc8200340d0, stream=0xc8200340e0]
2017/05/15 13:27:03 Instantiated [Generic client 0xc8200340c8]
2017/05/15 13:27:03 Started client SCHEMA_TYPE_A (variant 1)
2017/05/15 13:27:03 Trying to send message: schema:SCHEMA_TYPE_A empty:<> 
2017/05/15 13:27:03 NewProtocol client spawned: [client=0xc820190110, conn=0xc8201ba008, stream=0xc8201ba018]
2017/05/15 13:27:03 Instantiated [Generic client 0xc8201ba000]
2017/05/15 13:27:03 Started client SCHEMA_TYPE_A (1)
2017/05/15 13:27:03 Successfully sent request:%!(EXTRA *comm.Message=schema:SCHEMA_TYPE_A empty:<> )
2017/05/15 13:27:03 Trying to send message: schema:SCHEMA_TYPE_A empty:<> 
2017/05/15 13:27:03 Successfully sent request:%!(EXTRA *comm.Message=schema:SCHEMA_TYPE_A empty:<> )
2017/05/15 13:27:03 Trying to send message: schema:SCHEMA_TYPE_A empty:<> 
2017/05/15 13:27:03 Successfully sent request:%!(EXTRA *comm.Message=schema:SCHEMA_TYPE_A empty:<> )
2017/05/15 13:27:03 [Client 0xc8201f4030] Recieved response from the stream: ec_group_element:<X:"\231\322\255\271=\2307#\314\271\314j\276\213\366\3429\317\355i\313\35...">  
2017/05/15 13:27:03 [Client 0xc8201f4020] Recieved response from the stream: ec_group_element:<X:"1\226\253\265\205*\243j\035\276\016\262\300P\033z\244K\312\204&\247\"..."> 
2017/05/15 13:27:03 Trying to send message: ec_group_element:<X:"\356U1\036\nw\243\361\275\307-\353\243\265\376\234\377\0173@\231s\211\r\205\211\362\336" Y:"\310\311\227)2\004\362\355h\366m\334&I\374\240\244\024>W\343\035\264D\312\340\251B" > 
2017/05/15 13:27:03 Successfully sent request:%!(EXTRA *comm.Message=ec_group_element:<X:"\356U1\036\nw\243\361\275\307-\353\243\265\376\234\377\0173@\231s\211\r\205\211\362\336" Y:"\310\311\227)2\004\362\355h\366m\334&I\374\240\244\024>W\343\035\264D\312\340\251B" > ) <----- this is the last request of the protocol for this client
2017/05/15 13:27:03 [Client 0xc8201f4028] EOF error
2017/05/15 13:27:03 [Client 0xc82020e010] ERROR: EOF


mbi

unread,
May 15, 2017, 8:08:03 AM5/15/17
to grpc.io, mancab...@gmail.com
I am also correcting the error in my original post: the actual code we're using for spawning concurrent clients is this:

var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
        runMyProtocolClient()
    }()
}
wg.Wait()

Menghan Li

unread,
May 16, 2017, 7:32:09 PM5/16/17
to grpc.io, mancab...@gmail.com
Everything you described about the design RPC service looks reasonable. But we may need more information to find out why all RPCs are accidentally canceled.

Is each client an independent ClientConn, or one stream from the same ClientConn (I assume independent ClientConn because you said it maintains its own connection).

Can you provide more details about what's done in the RPC service handler? How do you deal with the stream when RPC finishes?

If you could come up with a small example to reproduce this, it would also be very helpful.

Thanks,
Menghan
Message has been deleted

Manca Bizjak

unread,
May 22, 2017, 6:13:01 AM5/22/17
to grp...@googlegroups.com
Hi,

Thank you for your reply. I am working on a minimal example to reproduce this. In the meanwhile, I will try to provide some further clarifications below.

Is each client an independent ClientConn, or one stream from the same ClientConn (I assume independent ClientConn because you said it maintains its own connection).

Every client maintains its own ClientConn and a corresponding stream. Here's my Client struct (grpc-related fields are in bold). This Client struct implements the grpc client stub. Several of such clients are instantiated concurrently, each starting its own RPC. 

type Client struct {
id       int32
conn     *grpc.ClientConn
protocolClient  *pb.ProtocolClient
stream   pb.Protocol_RunClient // this is obtained via protocolClient.Run(context.Background())
schema   pb.SchemaType
variant  pb.SchemaVariant
handler  *ClientHandler

Can you provide more details about what's done in the RPC service handler? How do you deal with the stream when RPC finishes
 
I have found that doing "stream.CloseSend" on one Client closes the stream for all other clients as well (other clients that have not yet received responses from the server get the EOF error during stream.Recv(). This is why I'm currently not explicitly closing the stream neither from client side nor from server side. As for the server-side logic, for now it is just supposed to 1.) recieve an initialization message from the client, 2.) read some data from this message, 3.) send a response to the client. I am pasting a part of my implementation (relevant parts are again in bold).

type Server struct {
handler ServerHandler
stream  pb.Protocol_RunServer
}
func (s *Server) Run(stream pb.Protocol_RunServer) error {
s.stream = stream
log.Printf("[Server] New Client connected")
for {
req, err := s.recieve()
if err != nil {
log.Println("Got error when trying to receive from stream")
}
switch req.Content.(type) {
case *pb.Message_Empty:
log.Println("Got empty message, indicating start of a protocol") 
reqSchemaType := req.GetSchema()
reqSchemaVariant := req.GetSchemaVariant()
reqSchemaTypeStr := pb.SchemaType_name[int32(reqSchemaType)]
reqSchemaVariantStr := pb.SchemaVariant_name[int32(reqSchemaVariant)]
reqClientId := req.GetClientId()
log.Println("Client [", reqClientId, "] requested", reqSchemaTypeStr, "variant", reqSchemaVariantStr)
switch reqSchemaType {
case pb.SchemaType_PEDERSEN_EC:

s.handler.pedersenECReciever = commitments.NewPedersenECReceiver()
h := s.handler.pedersenECReciever.GetH()
ecge := pb.ECGroupElement{X: h.X.Bytes(), Y: h.Y.Bytes()}
resp := &pb.Message{Content: &pb.Message_EcGroupElement{&ecge}}
err := s.send(resp)
if err != nil {
return err
}
log.Printf("[Server] Sent response to initial message")

default:
log.Println("The requested protocol is currently unsupported.")
}
default:
log.Printf("[Server] Received an intermediate request", req)
}
}
return nil
}

In the meanwhile, I did some further debugging and I have found that for each concurrent client a new RPC is created (so far so good). I mentioned that every "rpc transaction" between any of my clients and the server involves 3 consecutive request-response pairs. To simplify, I am now testing with only the first request-response pair (as the above server-side code illustrates) and here's what I found: lets say I send a request from 2 clients. As a result, 2 RPCs are started on the server. The server gets the request from each client in its own RPC. However, the server then sends both responses to a single RPC - the RPC of the first client. So the first client always succeeds, but the second client "hangs" waiting for a response from server, which never arrives, since both responses from the server are associated with the first RPC only. I'm attaching an image that demonstrates this.

I was wondering if there is any way I can associate a response with a certain RPC? 

If you could come up with a small example to reproduce this, it would also be very helpful.
 
Sure, I will provide such an example shortly. 

Thanks!
Best regards


grpc_debg_log.png

Menghan Li

unread,
May 23, 2017, 1:57:17 PM5/23/17
to grpc.io, mancab...@gmail.com

I have found that doing "stream.CloseSend" on one Client closes the stream for all other clients as well (other clients that have not yet received responses from the server get the EOF error during stream.Recv().

 
This sounds really strange to me. Is it possible that you accidentally used the same stream all the time? Like override the value of some variable by mistake?

xdrag...@gmail.com

unread,
Nov 6, 2017, 1:36:36 AM11/6/17
to grpc.io
Hi,

I wonder if you have resolve this problem? Because I got a similar one.

aloisio

unread,
Oct 21, 2021, 10:43:54 PM10/21/21
to grpc.io
我也遇到了类似的问题,也是使用了 Bidirectional RPC streaming(类似长连接双工通信)协议,有多个客户端和一个服务端,每个客户端新建连接到服务端,发送一次控制命令(Send-Recv)后就关闭连接。

可以查看这部分 grpc-go 的源代码,server 会为每个新的客户端连接初始化一个 stream rpc 对象,在 handler 函数中成对地进行 Recv 和 Send 就可以保证同一个连接中收发消息匹配。


楼主提到的“An error ocurred: rpc error: code = Canceled desc = context canceled.”错误,推测应该是因为在多个 goroutine 中,多个 stream rpc 对象的 Send 和 Recv 不匹配,比如 A 的消息回复给了 B stream rpc 对象(B 的客户端收到回复关闭连接),而 B 又调研 Recv,就报错了这个错误。

Balakumar Subramani

unread,
Sep 30, 2022, 5:44:07 AM9/30/22
to grpc.io
For anyone visiting this page in future: I had a similar problem and it turned out to be the high buffer size(a bug) used on the stream, this lead to high I/O usage (we write the chunks to disk that are received) and concurrent execution caused processes to hang and terminate with "rpc error: code = Canceled desc = context canceled"
Reply all
Reply to author
Forward
0 new messages