How to ensure messages in stream are sent in sequence?

81 views
Skip to first unread message

yoges nsamy

unread,
Aug 27, 2020, 1:58:05 PM8/27/20
to grpc.io

I'm calling a bi-directional streaming service with five messages which needs to be sent in sequence. 
For example pbx.ClientMsg_Sub (Id=3) should be sent and completed before pbx.ClientMsg_Set(Id=4).

I notice that if I don't delay the sending of each message, I don't get the expected response.
        for _, req := range requests {
           fmt.Printf("Sending message: %v\n", req)
           stream.Send(req)
           time.Sleep(500 * time.Millisecond) // doesn't run in sequence if removed
       }

My full code is shared below.

func getUserByCUID(w http.ResponseWriter, req *http.Request) {
   enableCors(&w)

    crtFile := "/home/yogesnsamy/Coding/MHub/prod_cert/cert.crt"
   creds, err := credentials.NewClientTLSFromFile(crtFile, "")
   if err != nil {
       log.Fatal("Error loading cert", err)
   }

    conn, err := grpc.Dial("dev.mhub.my:16060", grpc.WithTransportCredentials(creds))
   if err != nil {
       log.Fatal("Error dialing", err)
   }

    c := pbx.NewNodeClient(conn)
   // we create a stream by invoking the client
   stream, err := c.MessageLoop(context.Background())
   if err != nil {
       log.Fatalf("Error while creating stream: %v", err)
       return
   }

    // get user's cognito user id
   cuID, ok := req.URL.Query()["cid"]

    if !ok || len(cuID[0]) < 1 {
       log.Println("Url Param 'cid' is missing")
       return
   }
   tag := fmt.Sprintf(`"%v"`, cuID[0]) // mhub
   resp := Response{"n/a"}

    requests := []*pbx.ClientMsg{
       &pbx.ClientMsg{
           Message: &pbx.ClientMsg_Hi{
               Hi: &pbx.ClientHi{
                   Id:        "1",
                   UserAgent: "Golang_Spider_Bot/3.0",
                   Ver:       "0.15",
                   Lang:      "EN",
               }},
       },
       &pbx.ClientMsg{
           Message: &pbx.ClientMsg_Login{
               Login: &pbx.ClientLogin{
                   Id:     "2",
                   Scheme: "basic",
                   Secret: []byte("carol:carol123"),
               }},
       },
       &pbx.ClientMsg{
           Message: &pbx.ClientMsg_Sub{
               Sub: &pbx.ClientSub{
                   Id:    "3",
                   Topic: "fnd",
                   GetQuery: &pbx.GetQuery{
                       What: "sub",
                   },
               },
           },
       },
       &pbx.ClientMsg{
           Message: &pbx.ClientMsg_Set{
               Set: &pbx.ClientSet{
                   Id:    "4",
                   Topic: "fnd",
                   Query: &pbx.SetQuery{
                       Desc: &pbx.SetDesc{
                           Public: []byte(tag),
                       },
                   },
               },
           },
       },
       &pbx.ClientMsg{
           Message: &pbx.ClientMsg_Get{
               Get: &pbx.ClientGet{
                   Id:    "5",
                   Topic: "fnd",
                   Query: &pbx.GetQuery{
                       What: "sub",
                   },
               }},
       },
   }

    waitc := make(chan struct{})
   // we send a bunch of messages to the client (go routine)
   go func() {
       // function to send a bunch of messages
       for _, req := range requests {
           fmt.Printf("Sending message: %v\n", req)
           stream.Send(req)
           time.Sleep(500 * time.Millisecond) // doesn't run in sequence if removed
       }
       stream.CloseSend()
   }()
   // we receive a bunch of messages from the client (go routine)
   // count := 0
   go func() {
       // function to receive a bunch of messages
       uid := "n/a"
       for {
           res, err := stream.Recv()
           if err == io.EOF {
               resp.TinodeUserID = uid
               js, err := json.Marshal(resp)
               if err != nil {
                   http.Error(w, err.Error(), http.StatusInternalServerError)
                   return
               }

                w.Header().Set("Content-Type", "application/json")
               w.Write(js)
               break
           }
           if err != nil {
               log.Fatalf("Error while receiving: %v", err)
               break
           }
           if res.GetMeta() != nil {
               if res.GetMeta().Id == "5" {
                   sub := res.GetMeta().GetSub()
                   for _, elem := range sub {
                       uid = elem.UserId
                   }
               }
           }
       }
       close(waitc)
   }()

    // block until everything is done
   <-waitc
}



Response when operation is paused before sending the next message:
{"tinodeuserid":"usrNlWi3eOr74w"} // getting the expected result

Response when operation is not paused before sending the next message:
{"tinodeuserid":"n/a"}


What is the right way to make sure the messages are sent in sequence?
My objective is to send message n+1 after message n completes (something like using await in javascript).

Thank you.


Easwar Swaminathan

unread,
Sep 16, 2020, 4:52:07 PM9/16/20
to grpc.io
A bidirectional streaming gRPC call is one where the two streams (sending and receiving) are totally independent of each other. Messages sent by one end will be received by the other end in the order in which it was sent. But there can be no guarantees on the order of messages across the streams. If you want to impose ordering between the sending and receiving, it has to be done at the application layer. For example, instead of sending all five messages from one goroutine and reading all responses in another, you could use a single goroutine to send and receive messages in order.

Russell Wu

unread,
Sep 17, 2020, 9:22:40 AM9/17/20
to Easwar Swaminathan, grpc.io
I think the problem in this code is because of the sender ending too soon. When the sender calls CloseSend, the server will end the other side of the stream as well which causes the receiver to get an EOF and fails to receive the desired response. So you don't have to pause inbetween requests, just pause at the end of sending before calling CloseSend.

Also, messages in a single direction are received in order. This is guaranteed by TCP.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/d9d377dc-63aa-4b99-8d79-cf49c15cb75dn%40googlegroups.com.

yoges nsamy

unread,
Oct 5, 2020, 3:34:52 AM10/5/20
to grpc.io
>> So you don't have to pause inbetween requests, just pause at the end of sending before calling CloseSend.

Thank you, this solved my issue.

>> Also, messages in a single direction are received in order. This is guaranteed by TCP.

My grpc server has been defined as bi-directional.
Does your statement mean I can still send messages in a single direction even if it's defined as bi-di; I assumed it has to be a stream.
Appreciate some guidance.

Russell Wu

unread,
Oct 5, 2020, 4:00:01 AM10/5/20
to yoges nsamy, grpc.io
I meant in a bi-di stream, messages in one direction will be received by the order they were sent. For example, the requests a client sent to a server will always be received in the order they were sent. Vice versa. However, there's no guarantee of order among the requests and responses. For example, if the client sends 3 requests, the server sends 3 responses, there's no guarantee on the TCP or GRPC level that the client will receive the 1st response before it sends the 2nd request. If such guarantee is needed, it has to be implemented by the application.

Reply all
Reply to author
Forward
0 new messages