Reading ProtoBuf Kafka-Messages

2,262 views
Skip to first unread message

Jan Rudert

unread,
Jul 12, 2013, 10:34:54 AM7/12/13
to druid-de...@googlegroups.com
Hi,

I'd like to be able to consume Kafka messages that do contain protocol buffers formatted events. The current KafkaFirhose is made for strings (csv, ...). It uses the StringInputRowParser.

I started an attempt on integrating a ProtoBufInputRowParser. It relies on the DynamicMessage API of protocol buffers. One had to provide a .desc descriptor file for the message format.

You can see what I did on branch "kafka-protobuf" here: https://github.com/zanox/druid.git The code is pretty work-in-progress. But before going on I would like you to get some feedback on following questions:
  • Do you want protobuf support?
  • The approach allows only for flat protobuf structures. This is a limitation. Parsing nested messages can only be done by custom implementations IMHO. Is there another way of letting the user inject his custom message parser code somehow other then having a separate firehose factory?
  • Is it the right way to not create a new KafkaFirehoseFactory and do a switch in the existing one?
  • I haven't completely understood the jackson-based wiring yet. What I did works but it feels not very good, especially the instanceof for instantiating the actual firehose instances. Can you give me a hint here?
  • Regarding configuration: The parser part in realtime.spec would look like this with my implementation:
"parser" : { "type" : "protobuf",
                              "timestampSpec" : { "column" : "ts", "format" : "iso" },
                              "data" : { "descriptor" : "MyEvent.desc",
                                         "format" : "protobuf",
                                         "dimensions" : ["dim1","dim2","dim3"]  }
                            }},

    • Note the double mentioning of "protobuf". How could I avoid this? (Relates to wiring I guess)
    • Is dataspec the best place to configure the descriptor file?
  • Any other thoughts?
Thanks a lot!
-- Jan

Eric Tschetter

unread,
Jul 12, 2013, 5:28:38 PM7/12/13
to druid-de...@googlegroups.com
Jan,

Awesome!  Thanks for diving in and making some changes.  I looked over your changes and I can see where your confusion is coming from.  I'll answer your questions and hopefully we'll be able to get something a bit cleaner and without instanceof checks :).

  • Do you want protobuf support?
Yes, absolutely, that will be awesome!
 
  • The approach allows only for flat protobuf structures. This is a limitation. Parsing nested messages can only be done by custom implementations IMHO. Is there another way of letting the user inject his custom message parser code somehow other then having a separate firehose factory?
I don't know enough about protobuf to really be able to say one way or the other.  I am a firm believer in walking before running though, so let's start with the limited view and then let someone who knows protobuf and has a nested message come along and help figure out what needs to be done there :).
 
  • Is it the right way to not create a new KafkaFirehoseFactory and do a switch in the existing one?
I believe your high level approach is correct, I think you are getting caught up on some poor interface choices made by me, but we can fix them :).
 
  • I haven't completely understood the jackson-based wiring yet. What I did works but it feels not very good, especially the instanceof for instantiating the actual firehose instances. Can you give me a hint here?
I completely agree with the instanceof check.  Will elaborate on how to fix that below.
 
  • Regarding configuration: The parser part in realtime.spec would look like this with my implementation:
"parser" : { "type" : "protobuf",
                              "timestampSpec" : { "column" : "ts", "format" : "iso" },
                              "data" : { "descriptor" : "MyEvent.desc",
                                         "format" : "protobuf",
                                         "dimensions" : ["dim1","dim2","dim3"]  }
                            }},

    • Note the double mentioning of "protobuf". How could I avoid this? (Relates to wiring I guess)
    • Is dataspec the best place to configure the descriptor file?

Yeah, mentioning it twice is pretty ugly.  Looking back at the code, I think there's some extra stuff that doesn't really need to be in there.  If you are willing to refactor some stuff, I think we can make it a lot more elegant.

1) MapInputRowParser doesn't need the full DataSpec, it looks like it just needs a "List<String> customDimensions" and if that is null, it can fall back onto the dimensionExclusions like it does now when it does the hasCustomDimension() check
2) With that change, you shouldn't need to even have a DataSpec in your protobuf parser anymore.  You can elevate the descriptor and dimensions up to fields on the protobuf parser and that should make it a bit cleaner.
3) The current KafkaFirehose is hard-coding that the messages have Strings and your instanceof check is working around that and saying that if the InputRowParser is the Protobuf one, then it should expect bytes.  Instead, we should
a) Make StringInputRowParser be an InputRowParser<ByteBuffer>
b) Do all of the current conversion into a String inside the StringInputRowParser, do the ByteString conversion inside the Protobuf parser

Does that all make sense?

--Eric

Jan Rudert

unread,
Jul 19, 2013, 8:26:33 AM7/19/13
to druid-de...@googlegroups.com
Hi Eric,

thanks for motivating and giving me advice in that.

1) MapInputRowParser doesn't need the full DataSpec, it looks like it just needs a "List<String> customDimensions" and if that is null, it can fall back onto the dimensionExclusions like it does now when it does the hasCustomDimension() check
2) With that change, you shouldn't need to even have a DataSpec in your protobuf parser anymore.  You can elevate the descriptor and dimensions up to fields on the protobuf parser and that should make it a bit cleaner.
3) The current KafkaFirehose is hard-coding that the messages have Strings and your instanceof check is working around that and saying that if the InputRowParser is the Protobuf one, then it should expect bytes.  Instead, we should
a) Make StringInputRowParser be an InputRowParser<ByteBuffer>
b) Do all of the current conversion into a String inside the StringInputRowParser, do the ByteString conversion inside the Protobuf parser

Does that all make sense?

Yes, I made the changes that way. Looks much better now and seems to work in first simple tests. Still some points that I want to clarify with you before testing more intensively:

  • Configuration of protobuf and string parsers in realtime.spec is now slightly different. "string" takes a dataspec, "protobuf" not - is that OK and should we rename class DataSpec to StringDataSpec or Something like that?
  • After changing the parameterization of StringInputRowParser I found myself changing lot of client code outside the realtime node. I had to insert ByteBuffer.wrap(line.getBytes()) in S3Firehose for instance. Also affected are e.g. HadoopDruidIndexer, IndexeGenerationJob, FlightsFirehose, etc. This introduces intermediate object creation and lots of place where I would have to test that behaviour hasn't changed. I would like to have another delegate parser implementation only for the Realtime Kafka Firehose and keep using the old StringInputRowParser in the other places. BUT there I got stuck in how to interpret the realtime.spec for cases where there is (type=kafka && parser = string) vs. (type !=kafka && parser == string). You see my problem? Any hint here?
BTW.  The company CLA is almost signed.

--Jan

Eric Tschetter

unread,
Jul 19, 2013, 7:42:06 PM7/19/13
to druid-de...@googlegroups.com
1) MapInputRowParser doesn't need the full DataSpec, it looks like it just needs a "List<String> customDimensions" and if that is null, it can fall back onto the dimensionExclusions like it does now when it does the hasCustomDimension() check
2) With that change, you shouldn't need to even have a DataSpec in your protobuf parser anymore.  You can elevate the descriptor and dimensions up to fields on the protobuf parser and that should make it a bit cleaner.
3) The current KafkaFirehose is hard-coding that the messages have Strings and your instanceof check is working around that and saying that if the InputRowParser is the Protobuf one, then it should expect bytes.  Instead, we should
a) Make StringInputRowParser be an InputRowParser<ByteBuffer>
b) Do all of the current conversion into a String inside the StringInputRowParser, do the ByteString conversion inside the Protobuf parser

Does that all make sense?

Yes, I made the changes that way. Looks much better now and seems to work in first simple tests. Still some points that I want to clarify with you before testing more intensively:

  • Configuration of protobuf and string parsers in realtime.spec is now slightly different. "string" takes a dataspec, "protobuf" not - is that OK and should we rename class DataSpec to StringDataSpec or Something like that?
I think it's fine that protobuf doesn't take it.  I'm fine with renaming or keeping it as is, do you find it confusing that it's not used?
 
  • After changing the parameterization of StringInputRowParser I found myself changing lot of client code outside the realtime node. I had to insert ByteBuffer.wrap(line.getBytes()) in S3Firehose for instance. Also affected are e.g. HadoopDruidIndexer, IndexeGenerationJob, FlightsFirehose, etc. This introduces intermediate object creation and lots of place where I would have to test that behaviour hasn't changed. I would like to have another delegate parser implementation only for the Realtime Kafka Firehose and keep using the old StringInputRowParser in the other places. BUT there I got stuck in how to interpret the realtime.spec for cases where there is (type=kafka && parser = string) vs. (type !=kafka && parser == string). You see my problem? Any hint here?
Hrm, I think I understand.  I think that if you keep the method

  public InputRow parse(String input) throws FormattedException
  {
    return inputRowCreator.parse(parser.parse(input));
  }

on StringInputRowParser, then you can keep StringInputRowParser in all of the other places that it is currently used.

Then, you can make an interface

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes(value = {
    @JsonSubTypes.Type(name = "protobuf", value = ProtobufInputRowParser.class),
    @JsonSubTypes.Type(name = "string", value = StringInputRowParser.class)
})
public interface ByteBufferInputRowParser extends InputRowParser<ByteBuffer>
{
}

And finally we can make the KafkaFirehoseFactory depend on a ByteBufferInputRowParser instead of a StringInputRowParser.

It's a little hairy, but it should maintain full compatibility with the old stuff (i.e. if "type" isn't specified it'll default to StringInputRowParser) and get us moving in the right direction towards having protobuf as an option.

--Eric

 
BTW.  The company CLA is almost signed.

--Jan

--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/CAKtGoKCq1v%2BVL5b9AwRE7w2FpZTLizuOwpn%3Dbf5nYvXA7GokGQ%40mail.gmail.com.

For more options, visit https://groups.google.com/groups/opt_out.
 
 

Jan Rudert

unread,
Jul 29, 2013, 7:04:36 AM7/29/13
to druid-de...@googlegroups.com
Hi Eric,

I have no problem with the different realtime.spec configs of the parsers.

I introduced the ByteBufferInputRowParser as you suggested. Looks good.

Anything else I should do before issuing a pull request?

--Jan


2013/7/20 Eric Tschetter <che...@metamarkets.com>

Eric Tschetter

unread,
Jul 29, 2013, 12:30:45 PM7/29/13
to druid-de...@googlegroups.com
Nope, go ahead and do it.

I believe we are still waiting on the CLA as well :).

--Eric


Jan Rudert

unread,
Jul 30, 2013, 3:02:02 AM7/30/13
to druid-de...@googlegroups.com
Hi Eric,

I just sent the CLA to you. 

What about documentation of this new feature? Where to put it?

Thx
Jan


2013/7/29 Eric Tschetter <che...@metamarkets.com>

Eric Tschetter

unread,
Jul 30, 2013, 2:10:40 PM7/30/13
to druid-de...@googlegroups.com
Right now, the documentation is all the stuff on the wiki.

This is unfortunate because we can't actually version it with the code...  I've been playing around with the idea of pushing the docs out to a github.io page, but haven't figured out if we could leverage the maven site plugin or something for that.

--Eric


Jan Rudert

unread,
Aug 2, 2013, 4:40:09 AM8/2/13
to druid-de...@googlegroups.com
Hi Eric,

I would suggest that as soon as the protobuf stuff is in master I will update the wiki if I am allowed and you tell me where to put the documentation.

But before I do the pull request I have a question on the hadoop version which druid should be dependent on. I am currently testing with following dependencies:

<dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.0.0-mr1-cdh4.1.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.0.0-cdh4.1.1</version>
            </dependency>


It would be a bit effort to get my stuff tested with 0.20.2 because I had to setup such a cluster. So before doing that, I would like to know which hadoop version should be supported by default. I know that madvertise maintains a druid fork which works with hadoop 1.1.1.

Thanks
jan



2013/7/30 Eric Tschetter <che...@metamarkets.com>

Eric Tschetter

unread,
Aug 2, 2013, 5:44:50 PM8/2/13
to druid-de...@googlegroups.com
We are still stuck on 0.20.205 because of issues with AWS EMR.  We are in the process of upgrading our own internal Hadoop version, at which point we will be able to increase the version that Druid depends on by default.

So, until that happens, 0.20.205 is what we generally depend on and would need stuff to be tested with.  That said, I'm not really sure why your changes to add Protobuf support would touch Hadoop, so I'm pretty sure that they would be fine as-is without upping the Hadoop version.  If you can give a pull request without the pom changes to set your hadoop version, we can incorporate that into master and then whenever we do our next internal deployment, it'll end up going through some testing to make sure things continue to work.

--Eric


Jan Rudert

unread,
Aug 5, 2013, 3:24:44 AM8/5/13
to druid-de...@googlegroups.com
Hi,

there is not an obvious connection between my protobuf changes and the hadoop version. But the new hadoop version I tested with uses protocol buffers internally as well. So I had to align the protocol buffers version to 2.4.0. Otherwise it wouldn't compile. I generally feel more comfortable checking in stuff as I have it tested. But this time it should be ok. I made a pull request with the hadoop version of the master. Please tell me if anything broke. 

Cheers
--Jan


2013/8/2 Eric Tschetter <che...@metamarkets.com>

Eric Tschetter

unread,
Aug 5, 2013, 2:29:27 PM8/5/13
to druid-de...@googlegroups.com
Great thanks, will take a look at that soon.

--Eric


Reply all
Reply to author
Forward
0 new messages