'Streaming' messages (say over a socket)

95 views
Skip to first unread message

Alex Black

unread,
Jun 14, 2009, 7:14:10 PM6/14/09
to Protocol Buffers
Is there a way to start sending a message before its fully composed?

Say we have messages like this:

message Entity
{
required int32 id = 1;
required string name = 2;
}

message Entities
{
repeated Entity entity = 1;
}

If we're sending a message Entities with 1,000 Entity objects in it,
is there a way to avoid composing the entire message in memory,
serializing it, and then sending it out?

I'd like to avoid allocating RAM for the entire message, and just send
it out as I compose it...

thx,

- Alex

Christopher Smith

unread,
Jun 15, 2009, 2:58:19 PM6/15/09
to Alex Black, Protocol Buffers
The normal way to do it is to send each Entity as a separate message. CodedInput/OutputStream is handed for that kind of thing.

--Chris
--
Chris

Kenton Varda

unread,
Jun 15, 2009, 3:10:57 PM6/15/09
to Alex Black, Protocol Buffers

Alex Black

unread,
Jun 16, 2009, 11:48:08 PM6/16/09
to Christopher Smith, Protocol Buffers
Thanks, I got something working...how does this look? (ugly I'm sure...)
 
On the C++ server side I am looping sending many messages (all of the same type), and on the Java side I am looping parsing them out.
 
C++ Server:
 
     int size = message.ByteSize(); 
     EnsureBufferIsAtLeastSize(size + 4);
 
     char* pBuffer = (char*) google::protobuf::io::CodedOutputStream::WriteVarint32ToArray(size, (google::protobuf::uint8*) m_pBuffer);
 
     // Serialize the message
     bool result = message.SerializeToArray(pBuffer, size);  
 
     // Calculate how many bytes the 'size' took
     int sizeSize = pBuffer - m_pBuffer;
 
     // Write the message to the stream
     m_Stream.Write(m_pBuffer,size + sizeSize);
 
Java client:
 
   com.google.protobuf.CodedInputStream stream = com.google.protobuf.CodedInputStream.newInstance(url.openStream());
   
   while ( !stream.isAtEnd() )
   {      
    Foor.Bar.Builder builder = Foo.Bar.newBuilder();
    stream.readMessage(builder, null);
    
    Foo.Bar message = builder.build();
   }    

 



From: Christopher Smith [mailto:cbs...@gmail.com]
Sent: Monday, June 15, 2009 2:58 PM
To: Alex Black
Cc: Protocol Buffers
Subject: Re: 'Streaming' messages (say over a socket)

clear=all>
--
Chris

Kenton Varda

unread,
Jun 17, 2009, 4:41:49 PM6/17/09
to Alex Black, Christopher Smith, Protocol Buffers
Mostly looks fine.

Note that a varint can be up to 5 bytes.  You should probably just use CodedOutputStream::Varint32Size() to compute the exact size so that you can allocate a buffer that is exactly large enough.

Also note that if your message is large (say, 10k or more), allocating a single large buffer may make the memory allocator unhappy.  I'm not sure what type m_Stream is, but you might consider wrapping it in a ZeroCopyOutputStream and wrapping that in a CodedOutputStream, then writing to that, so that it doesn't have to buffer the whole message all at once.  Note that CopyingOutputStreamAdaptor makes it pretty easy to adapt traditional output streams to ZeroCopyOutputStream:

Alex Black

unread,
Jun 17, 2009, 4:49:10 PM6/17/09
to Kenton Varda, Christopher Smith, Protocol Buffers
Thanks for pointing out CodedOuptutStream::Varint32Size(), I'll use that.
 
My messages are lists of messages, and I am breaking them into batches of say 1,000 deliberately, so that each is a reasonable size that I can allocate a buffer for and send over the wire. In one scenario I have 100,000 things to send, so I send them in batches, to avoid allocating memory for all 100,000 at once.
 
m_Stream is a light wrapper around a socket, not a c++ stream, I need to get my head around how to implement a c++ style stream wrapper around a socket.
 
- Alex


From: Kenton Varda [mailto:ken...@google.com]
Sent: Wednesday, June 17, 2009 4:42 PM
To: Alex Black
Cc: Christopher Smith; Protocol Buffers

Kenton Varda

unread,
Jun 17, 2009, 6:12:12 PM6/17/09
to Alex Black, Christopher Smith, Protocol Buffers
Don't bother trying to implement your own std::ostream; it's a horrible mess.  Using CopyingOutputStreamAdaptor only requires that you implement the CopyingOutputStream interface, which is extremely simple.

But yeah, since your messages are small, what you have now is fine.
Reply all
Reply to author
Forward
0 new messages