ProtoBuf Parser

692 views
Skip to first unread message

Herman Lintvelt

unread,
May 14, 2015, 9:36:53 PM5/14/15
to druid...@googlegroups.com
Hi,

I see a ProtoBuf Parser is mentioned in the docs as an option (instead of JSON/CSV/TSV parsers), but now more details are given. I also looked at the code in github, at the ProtoBufInputRowParser class, but somewhere I am missing something. If I send messages via Kafka (and the KafkaFirehose) in json, it works fine, and the events gets ingested, but if I switch to use data serialized using protobuf, and setup the Realtime Node to use ProtoBuf parser, then it looks like the Realtime node does nog get any events (or ignores them if it does, there is no output on the Realtime node's logger)

Any help will be appreciated. 

My spec for the realtime node:

[
  {
    "dataSchema" : {
      "dataSource" : "lqtest2",
      "parser" : {
        "type" : "protoBuf",
        "descriptor" : "MetricsRecord.proto",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "millis"
          },
          "dimensionsSpec" : {
            "dimensions": ["userId","sourceId","deviceId"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [{
        "type" : "count",
        "name" : "count"
      }, {
        "type" : "max",
        "name" : "maxValue",
        "fieldName" : "testValue"
      }, {
        "type" : "min",
        "name" : "minValue",
        "fieldName" : "testValue"
      }],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "SECOND"
      }
    },
    "ioConfig" : {
      "type" : "realtime",
      "firehose": {
        "type": "kafka-0.8",
        "consumerProps": {
          "zookeeper.connect": "localhost:2181",
          "zookeeper.connection.timeout.ms" : "15000",
          "zookeeper.session.timeout.ms" : "15000",
          "zookeeper.sync.time.ms" : "5000",
          "group.id": "druid-example",
          "fetch.message.max.bytes" : "1048586",
          "auto.offset.reset": "largest",
          "auto.commit.enable": "false"
        },
        "feed": "test"
      },
      "plumber": {
        "type": "realtime"
      }
    },
    "tuningConfig": {
      "type" : "realtime",
      "maxRowsInMemory": 500000,
      "intermediatePersistPeriod": "PT5m",
      "windowPeriod": "PT5m",
      "basePersistDirectory": "\/tmp\/realtime\/basePersist",
      "rejectionPolicy": {
        "type": "messageTime"
      }
    }
  }
]

Fangjin Yang

unread,
May 15, 2015, 1:57:47 AM5/15/15
to druid...@googlegroups.com, herman....@gmail.com
Hi Herman, are there any exceptions in the logs?

If you include the RealtimeMetricsMonitor, are there any metrics about events being ingested, thrownAway, etc?

Herman Lintvelt

unread,
May 15, 2015, 4:51:42 AM5/15/15
to druid...@googlegroups.com, herman....@gmail.com
I've managed to get the RealtimeMetricsMonitor going (I'm still a noob with Druid, but really impressed so far)

From the log (see below) I can see that is struggles parsing the data, I'll switch on debug logging and see if I get more information to trace down.

2015-05-15T08:43:05,019 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - New Server[DruidServerMetadata{name='10.0.2.36:8084', host='10.0.2.36:8084', maxSize=0, tier='_default_tier', type='realtime', priority='0'}]

2015-05-15T08:44:02,505 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-05-15T08:44:02.492Z","service":"realtime","host":"10.0.2.36:8084","metric":"events/thrownAway","value":0,"user2":"lqtest2"}]

2015-05-15T08:44:02,505 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [89,999] Unparseable events! Turn on debug logging to see exception stack trace.

2015-05-15T08:44:02,506 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-05-15T08:44:02.506Z","service":"realtime","host":"10.0.2.36:8084","metric":"events/unparseable","value":89999,"user2":"lqtest2"}]

2015-05-15T08:44:02,506 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-05-15T08:44:02.506Z","service":"realtime","host":"10.0.2.36:8084","metric":"events/processed","value":0,"user2":"lqtest2"}]

2015-05-15T08:44:02,506 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-05-15T08:44:02.506Z","service":"realtime","host":"10.0.2.36:8084","metric":"rows/output","value":0,"user2":"lqtest2"}]


Thanks
Herman

Herman Lintvelt

unread,
May 15, 2015, 4:58:46 AM5/15/15
to druid...@googlegroups.com
OK, with debug level logging (see below) I can see that the Realtime node is still using jackson, i.e. expecting json format data.

How can I configure it to use the ProtoBuf parser. My attempt looks like this, but seems not to work:

"dataSchema" : {
      "dataSource" : "lqtest2",
      "parser" : {
        "type" : "protoBuf",
        "descriptor" : "MetricsRecord.proto",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "millis"
          },
          "dimensionsSpec" : {
            "dimensions": ["userId","sourceId","deviceId"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },

Please note: if I set parseSpec.format to "protoBuf", then Druid expects a TSV file, and complains about the rest of configuration not being correct for TSV.

2015-05-15T08:54:17,400 DEBUG [chief-lqtest2] io.druid.segment.realtime.RealtimeManager - thrown away line due to exception, considering unparseable

Pbitmap$0IcommoheartRateIserializedSizeJ UnabletimestampdeviceIdtLjava/lang/String;sourceIdq~LuserIdq~xpCMTU(t$42dd9e99-5ca6-4484-a6b4-8081a861f162t$1a4a19ce-71f1-4cf3-b11b-dfbdb4ab372bt$cbdac087-31a5-4ff3-814f-1b2875252a86]

at com.metamx.common.parsers.JSONParser.parse(JSONParser.java:147) ~[java-util-0.26.15.jar:?]

at io.druid.data.input.impl.StringInputRowParser.parseString(StringInputRowParser.java:86) ~[druid-api-0.3.5.jar:0.3.5]

at io.druid.data.input.impl.StringInputRowParser.buildStringKeyMap(StringInputRowParser.java:73) ~[druid-api-0.3.5.jar:0.3.5]

at io.druid.data.input.impl.StringInputRowParser.parse(StringInputRowParser.java:40) ~[druid-api-0.3.5.jar:0.3.5]

at io.druid.data.input.impl.StringInputRowParser.parse(StringInputRowParser.java:19) ~[druid-api-0.3.5.jar:0.3.5]

at io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.nextRow(KafkaEightFirehoseFactory.java:118) ~[?:?]

at io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:239) [druid-server-0.7.1.1.jar:0.7.1.1]

Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('' (code 65533 / 0xfffd)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')

Pbitmap$0I: sheartRateIserializedSizeJcsRecortimestampdeviceIdtLjava/lang/String;sourceIdq~LuserIdq~xpCMTU(t$42dd9e99-5ca6-4484-a6b4-8081a861f162t$1a4a19ce-71f1-4cf3-b11b-dfbdb4ab372bt$cbdac087-31a5-4ff3-814f-1b2875252a86; line: 1, column: 2]

at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419) ~[jackson-core-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508) ~[jackson-core-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:437) ~[jackson-core-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1462) ~[jackson-core-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:683) ~[jackson-core-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3105) ~[jackson-databind-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3051) ~[jackson-databind-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:1833) ~[jackson-databind-2.4.4.jar:2.4.4]

at com.metamx.common.parsers.JSONParser.parse(JSONParser.java:115) ~[java-util-0.26.15.jar:?]

... 6 more

2015-05-15T08:54:17,401 DEBUG [chief-lqtest2] kafka.consumer.PartitionTopicInfo - reset consume offset of test:0: fetched offset = 71431: consumed offset = 64836 to 64836

Himanshu

unread,
May 15, 2015, 11:49:06 AM5/15/15
to Herman Lintvelt, druid...@googlegroups.com
can you try "protobuf" instead of "protoBuf"? From the code, it looks like doc is wrong.

-- Himanshu

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/d77aec2b-a058-4199-ba60-b36ee6c9f829%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Herman Lintvelt

unread,
May 16, 2015, 5:37:54 AM5/16/15
to druid...@googlegroups.com, herman....@gmail.com
Hi, I changed my spec file affecting this to:

"dataSchema" : {
      "dataSource" : "lqtest2",
      "parser" : {
        "type" : "protobuf",
        "descriptor" : "MetricsRecord.proto",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "millis"
          },
          "dimensionsSpec" : {
            "dimensions": ["userId","sourceId","deviceId"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },

(i.e. using protobuf, instead of protoBuf, but still had to keep parseSpec.format == json, else it expects TSV params in parseSpec)

No there is progress, for each event (which is a a message object build form a proto file, and then send as Array[Byte] over kafka) I get the exception below, so at least ProtobufInputParser is being used. Any advise on how to tackle this InvalidProtocolBufferException? Might it have to do with the descriptor file I'm setting in the spec file? What format does it expect the "descriptor" in? The .proto definition file, or the class file(s) that protoc has compiled?

The exception I get for each event I want to ingest:

2015-05-16T09:30:30,351 DEBUG [chief-lqtest2] io.druid.segment.realtime.RealtimeManager - thrown away line due to exception, considering unparseable

java.lang.RuntimeException: com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type.

at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]

at io.druid.data.input.ProtoBufInputRowParser.getDescriptor(ProtoBufInputRowParser.java:127) ~[druid-processing-0.7.1.1.jar:0.3.5]

at io.druid.data.input.ProtoBufInputRowParser.buildStringKeyMap(ProtoBufInputRowParser.java:85) ~[druid-processing-0.7.1.1.jar:0.3.5]

at io.druid.data.input.ProtoBufInputRowParser.parse(ProtoBufInputRowParser.java:78) ~[druid-processing-0.7.1.1.jar:0.3.5]

at io.druid.data.input.ProtoBufInputRowParser.parse(ProtoBufInputRowParser.java:41) ~[druid-processing-0.7.1.1.jar:0.3.5]

at io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.nextRow(KafkaEightFirehoseFactory.java:118) ~[?:?]

at io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:239) [druid-server-0.7.1.1.jar:0.7.1.1]

Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type.

at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:99) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:498) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:461) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:579) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:280) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.CodedInputStream.readGroup(CodedInputStream.java:240) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:488) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:461) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:579) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:280) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.CodedInputStream.readGroup(CodedInputStream.java:240) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:488) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.GeneratedMessage.parseUnknownField(GeneratedMessage.java:193) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.DescriptorProtos$FileDescriptorSet.<init>(DescriptorProtos.java:89) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.DescriptorProtos$FileDescriptorSet.<init>(DescriptorProtos.java:47) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.DescriptorProtos$FileDescriptorSet$1.parsePartialFrom(DescriptorProtos.java:136) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.DescriptorProtos$FileDescriptorSet$1.parsePartialFrom(DescriptorProtos.java:131) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:200) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:217) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:223) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.DescriptorProtos$FileDescriptorSet.parseFrom(DescriptorProtos.java:253) ~[protobuf-java-2.5.0.jar:?]

at io.druid.data.input.ProtoBufInputRowParser.getDescriptor(ProtoBufInputRowParser.java:119) ~[druid-processing-0.7.1.1.jar:0.3.5]

... 5 more

Himanshu

unread,
May 16, 2015, 10:02:24 AM5/16/15
to Herman Lintvelt, druid...@googlegroups.com
OK, we'll fix the documentation regarding "protobuf" . I found some more stuff missing in the doc. I don't think protobuf is well documented and we'll fix that once things work for you and I know that it works..... coz I've never used it :)

 pls try adding the "descriptor" file in the "parser" . here is what you should try..

  "parser" : {
    "type" : "protobuf",
    "descriptor" : "descriptorFileName" //this will is expected to be present in the classpath

    "parseSpec" : {
      "format" : "json",
      "timestampSpec" : {
        "column" : "timestamp",
        "format" : "auto"
      },
..

more of "dev" info, but....

the "descriptor" file is later read using following code, which seems to be failing for you right now.
 InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFileInClassPath);
 FileDescriptorSet set = FileDescriptorSet.parseFrom(fin);


-- Himanshu



Reply all
Reply to author
Forward
0 new messages