grpcServer := grpc.NewServer(grpc.MaxConcurrentStreams(math.MaxUint32)) pb.RegisterProtocolServer(grpcServer, NewMyProtocolServer())
grpcServer.Serve(listener)
var wg sync.WaitGroup wg.Add(n) for i := 0; i < n; i++ {
defer wg.Done()
go runMyProtocolClient() } wg.Wait()
2017/05/15 13:27:02 Registering services2017/05/15 13:27:02 Instantiating new protocol server2017/05/15 13:27:02 Starting GRPC server on port 70002017/05/15 13:27:03 [Server 0xc82022e000] spawned, stream=0xc8201ba1202017/05/15 13:27:03 [Server 0xc8200340e0] spawned, stream=0xc8201743e02017/05/15 13:27:03 Recieved request from the stream schema:SCHEMA_TYPE_A empty:<> 2017/05/15 13:27:03 Client requested SCHEMA_TYPE_A variant 12017/05/15 13:27:03 Recieved request from the stream schema:SCHEMA_TYPE_A empty:<> 2017/05/15 13:27:03 Client requested SCHEMA_TYPE_A variant 12017/05/15 13:27:03 Obtained H: &{16199448777604385719635926144695029257912338135202671482767928883512 6396801387809980950520705369354328071365311907783138908138286589481}2017/05/15 13:27:03 Successfully sent response:%!(EXTRA *comm.Message=ec_group_element:<X:"\231\322\255\271=\2307#\314\271\314j\276\213\366\3429\317\355i\313\350\n\224\334zy8" Y:"<\275\303F\315\331\002\201\353\346\277;W\344\263\267^\363H\353g\016\356\255\031\267\246)" > )2017/05/15 13:27:03 [Server 0xc820284000] spawned, stream=0xc8201e20c02017/05/15 13:27:03 Recieved request from the stream schema:SCHEMA_TYPE_A empty:<> 2017/05/15 13:27:03 Client requested SCHEMA_TYPE_A variant 12017/05/15 13:27:03 Obtained H: &{5222284638485064219491810334813365272080065672847296816805697342382 2275113287299095758274364943442341731168296655544534119203197860101}2017/05/15 13:27:03 Successfully sent response:%!(EXTRA *comm.Message=ec_group_element:<X:"1\226\253\265\205*\243j\035\276\016\262\300P\033z\244K\312\204&\247\037s\203\320\313\256" Y:"\025\232~y}\230\365\345\256]\331\303\204H1[\033\207}\255\023[\333\257V2\271\005" > )2017/05/15 13:27:03 Obtained H: &{6305095417828729319307152541529557580493743402421550520609646844543 5483979098490912922176532492129495518788718140012781136961875522954}2017/05/15 13:27:03 Successfully sent response:%!(EXTRA *comm.Message=ec_group_element:<X:";\336\326\202\347\367\272\345\022TSk\023_\315R\223\246\005\002\2314\300\301\321<\032\177" Y:"4\022\320\222C5=O{\320\000\005:\223\305\253\247\227\233\232{\203\343\346c\264\265\212" > )2017/05/15 13:27:03 Recieved request from the stream ec_group_element:<X:"\356U1\036\nw\243\361\275\307-\353\243\265\376\234\377\0173@\231s\211\r\205\211\362\336" Y:"\310\311\227)2\004\362\355h\366m\334&I\374\240\244\024>W\343\035\264D\312\340\251B" > 2017/05/15 13:27:03 Successfully sent response:%!(EXTRA *comm.Message=empty:<> ) ******* this message indicates the last message to be sent in the "protocol" when communicating with one of the clients
2017/05/15 13:27:03 An error ocurred: rpc error: code = Canceled desc = context canceledexit status 1
Will start clients concurrentlyRunning client #0Running client #1Running client #22017/05/15 13:27:03 schemaTypeVariant [SCHEMA_TYPE_A], variant 12017/05/15 13:27:03 schemaTypeVariant [SCHEMA_TYPE_A], variant 12017/05/15 13:27:03 schemaTypeVariant [SCHEMA_TYPE_A], variant 12017/05/15 13:27:03 NewProtocol client spawned: [client=0xc820180300, conn=0xc8200340a8, stream=0xc8200340c0]2017/05/15 13:27:03 Instantiated [Generic client 0xc8200340a0]2017/05/15 13:27:03 Started client SCHEMA_TYPE_A (variant 1)2017/05/15 13:27:03 NewProtocol client spawned: [client=0xc8201804a0, conn=0xc8200340d0, stream=0xc8200340e0]2017/05/15 13:27:03 Instantiated [Generic client 0xc8200340c8]2017/05/15 13:27:03 Started client SCHEMA_TYPE_A (variant 1)2017/05/15 13:27:03 Trying to send message: schema:SCHEMA_TYPE_A empty:<> 2017/05/15 13:27:03 NewProtocol client spawned: [client=0xc820190110, conn=0xc8201ba008, stream=0xc8201ba018]2017/05/15 13:27:03 Instantiated [Generic client 0xc8201ba000]2017/05/15 13:27:03 Started client SCHEMA_TYPE_A (1)2017/05/15 13:27:03 Successfully sent request:%!(EXTRA *comm.Message=schema:SCHEMA_TYPE_A empty:<> )2017/05/15 13:27:03 Trying to send message: schema:SCHEMA_TYPE_A empty:<> 2017/05/15 13:27:03 Successfully sent request:%!(EXTRA *comm.Message=schema:SCHEMA_TYPE_A empty:<> )2017/05/15 13:27:03 Trying to send message: schema:SCHEMA_TYPE_A empty:<> 2017/05/15 13:27:03 Successfully sent request:%!(EXTRA *comm.Message=schema:SCHEMA_TYPE_A empty:<> )2017/05/15 13:27:03 [Client 0xc8201f4030] Recieved response from the stream: ec_group_element:<X:"\231\322\255\271=\2307#\314\271\314j\276\213\366\3429\317\355i\313\35..."> 2017/05/15 13:27:03 [Client 0xc8201f4020] Recieved response from the stream: ec_group_element:<X:"1\226\253\265\205*\243j\035\276\016\262\300P\033z\244K\312\204&\247\"..."> 2017/05/15 13:27:03 Trying to send message: ec_group_element:<X:"\356U1\036\nw\243\361\275\307-\353\243\265\376\234\377\0173@\231s\211\r\205\211\362\336" Y:"\310\311\227)2\004\362\355h\366m\334&I\374\240\244\024>W\343\035\264D\312\340\251B" > 2017/05/15 13:27:03 Successfully sent request:%!(EXTRA *comm.Message=ec_group_element:<X:"\356U1\036\nw\243\361\275\307-\353\243\265\376\234\377\0173@\231s\211\r\205\211\362\336" Y:"\310\311\227)2\004\362\355h\366m\334&I\374\240\244\024>W\343\035\264D\312\340\251B" > ) <----- this is the last request of the protocol for this client2017/05/15 13:27:03 [Client 0xc8201f4028] EOF error2017/05/15 13:27:03 [Client 0xc82020e010] ERROR: EOF
var wg sync.WaitGroupwg.Add(n)for i := 0; i < n; i++ {
go func() {
defer wg.Done()
runMyProtocolClient() }()}wg.Wait()
Is each client an independent ClientConn, or one stream from the same ClientConn (I assume independent ClientConn because you said it maintains its own connection).
Can you provide more details about what's done in the RPC service handler? How do you deal with the stream when RPC finishes
type Server struct {
handler ServerHandler
stream pb.Protocol_RunServer
}
func (s *Server) Run(stream pb.Protocol_RunServer) error {
s.stream = stream
log.Printf("[Server] New Client connected")
for {
req, err := s.recieve()
if err != nil {
log.Println("Got error when trying to receive from stream")
}
switch req.Content.(type) {
case *pb.Message_Empty:
log.Println("Got empty message, indicating start of a protocol")
reqSchemaType := req.GetSchema()
reqSchemaVariant := req.GetSchemaVariant()
reqSchemaTypeStr := pb.SchemaType_name[int32(reqSchemaType)]
reqSchemaVariantStr := pb.SchemaVariant_name[int32(reqSchemaVariant)]
reqClientId := req.GetClientId()
log.Println("Client [", reqClientId, "] requested", reqSchemaTypeStr, "variant", reqSchemaVariantStr)
switch reqSchemaType {
case pb.SchemaType_PEDERSEN_EC:
s.handler.pedersenECReciever = commitments.NewPedersenECReceiver()
h := s.handler.pedersenECReciever.GetH()
ecge := pb.ECGroupElement{X: h.X.Bytes(), Y: h.Y.Bytes()}
resp := &pb.Message{Content: &pb.Message_EcGroupElement{&ecge}}
err := s.send(resp)
if err != nil {
return err
}
log.Printf("[Server] Sent response to initial message")
default:
log.Println("The requested protocol is currently unsupported.")
}
default:
log.Printf("[Server] Received an intermediate request", req)
}
}
return nil
}
If you could come up with a small example to reproduce this, it would also be very helpful.
I have found that doing "stream.CloseSend" on one Client closes the stream for all other clients as well (other clients that have not yet received responses from the server get the EOF error during stream.Recv().