reading/writing multiple messages to a stream with Java

668 views
Skip to first unread message

James Strachan

unread,
Jul 8, 2008, 10:23:36 AM7/8/08
to Protocol Buffers
I tried out using PB on the ActiveMQ project; I've created a little
spike using PB as the underlying wire format...

http://svn.apache.org/repos/asf/activemq/trunk/activemq-protocol-buffer/

I hacked up a little test case to write a number of messages to a
file, then read them back again...

http://svn.apache.org/repos/asf/activemq/trunk/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/PerformanceTest.java

this seems to act strangely, whether I use the CodedOutputStream or
OutputStream approach. It kinda seems like the message.writeTo() kinda
moves to the start of the stream and writes there, rather than writing
continuously.

This is kinda odd; I wondered if there was a good reason for this?

e.g. the test program tries writing 10 messages to an output stream,
then tries reading them back in - and kinda barfs as the first message
it reads in is the last one written.

Is there some magic ninja way to do such a thing - treating streams as
stream; or do you have to avoid using streams entirely and work with
byte arrays?

It seems like CodedOutputStream.refreshBuffer() always sets the
position to the start of the stream, not where it finished writing,
which just seems odd.

Maybe I just need more coffee? :)

James


James Strachan

unread,
Jul 8, 2008, 10:47:20 AM7/8/08
to Protocol Buffers
BTW I managed to hack the test case to actually work by using explicit
byte[] objects instead and then reading/writing them to the underlying
streams (along with a size first).

http://svn.apache.org/repos/asf/activemq/trunk/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/Performance2Test.java

I just wondered if there was some kinda way to do this kinda thing
using either streams directly or using some helper class or the Coded
stream classes?

It just seems counterintuitive to not be able to write more than one
message to a stream :)

On Jul 8, 3:23 pm, James Strachan <james.strac...@gmail.com> wrote:
> I tried out using PB on the ActiveMQ project; I've created a little
> spike using PB as the underlying wire format...
>
> http://svn.apache.org/repos/asf/activemq/trunk/activemq-protocol-buffer/
>
> I hacked up a little test case to write a number of messages to a
> file, then read them back again...
>
> http://svn.apache.org/repos/asf/activemq/trunk/activemq-protocol-buff...

Kenton Varda

unread,
Jul 8, 2008, 2:13:53 PM7/8/08
to James Strachan, Protocol Buffers
2008/7/8 James Strachan <james.s...@gmail.com>:

this seems to act strangely, whether I use the CodedOutputStream or
OutputStream approach. It kinda seems like the message.writeTo() kinda
moves to the start of the stream and writes there, rather than writing
continuously.

Hi James,

Your problem here is that the protocol buffer format is not self-delimiting.  If you concatenate multiple messages and then parse that, the parser cannot tell where one message ends and another one starts.  Instead, it just reads the whole thing as if it were one message, and the effect you get is as if you had combined all the messages using mergeFrom().

This can be solved pretty easily, though.  If you use CodedOutputStream directly, you can simply use writeRawVarint() to write the size of each message before writing the message itself.  Then, when reading with a CodedInputStream, read the varint first, then use CodedInputStream.pushLimit() to limit parsing to only that number of bytes.

Another, easier solution would be to just create a protocol message type representing a list of you type, e.g.:

message MyTypeList {
  repeated MyType element = 1;
}

Then serialize your set of messages as a single MyTypeList.

James Strachan

unread,
Jul 8, 2008, 2:43:40 PM7/8/08
to Kenton Varda, Protocol Buffers
Awesome thanks for the heads up!

2008/7/8 Kenton Varda <ken...@google.com>:

--
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://open.iona.com

James Strachan

unread,
Jul 8, 2008, 3:05:18 PM7/8/08
to Protocol Buffers
BTW it might be worth adding to the wiki docs somewhere this little
pattern of using pushLimit() / popLimit(). It took me a little bit of
experimentation to figure out the correct usage of these 2 methods.
e.g.

when writing a number of messages....

CodedOutputStream cout = CodedOutputStream.newInstance(out);
...
// in some loop...
int size = message.getSerializedSize();
cout.writeRawVarint32(size);
message.writeTo(cout);

then reading them...

CodedInputStream cin = CodedInputStream.newInstance(in);
...
// in some loop...
int size = cin.readRawVarint32();
int previous = cin.pushLimit(size);
message = Foo.Bar.parseFrom(cin);
cin.popLimit(previous);

it was the use of pushLimit() / popLimit() that took a little while to
figure out.

Am really liking Protocol Buffers - good stuff! :)

2008/7/8 James Strachan <james.s...@gmail.com>:

Reply all
Reply to author
Forward
0 new messages