Debugging avro decoder

1,145 views
Skip to first unread message

Hubert Behaghel

unread,
Jul 10, 2016, 3:15:07 PM7/10/16
to Druid Development
Hi,

I am trying to consume an avro-encoded kafka topic using the druid-avro-extensions, I am stuck on that error and I don't have ayn idea for how to debug it:

[...]
2016-07-10 18:31:22,750 [KafkaConsumer-1] INFO  i.d.initialization.Initialization - added URL[file:/app
/imply-1.3.0/dist/druid/extensions/druid-avro-extensions/servlet-api-2.5-20081211.jar]
2016-07-10 18:31:22,750 [KafkaConsumer-1] INFO  i.d.initialization.Initialization - added URL[file:/app
/imply-1.3.0/dist/druid/extensions/druid-avro-extensions/avro-1.7.7.jar]
2016-07-10 18:31:22,758 [KafkaConsumer-1] INFO  i.d.initialization.Initialization - Adding local file s
ystem extension module [io.druid.data.input.avro.AvroExtensionsModule] for class [io.druid.initializati
on.DruidModule]
2016-07-10 18:31:23,922 [KafkaConsumer-1] INFO  c.metamx.emitter.core.LoggingEmitter - Start: started [
true]
2016-07-10 18:31:24,730 [KafkaConsumer-1] INFO  c.m.t.finagle.FinagleRegistry - Adding resolver for sch
eme[druidTask!druid:overlord].
2016-07-10 18:31:25,146 [KafkaConsumer-1] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:
java.lang.ArrayIndexOutOfBoundsException: 25
        at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:257) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:199) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) ~[na:na]
        at io.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder.parse(SchemaRepoBasedAvroBytesDecod
er.java:82) ~[na:na]
        at io.druid.data.input.AvroStreamInputRowParser.parse(AvroStreamInputRowParser.java:53) ~[na:na
]
        at io.druid.data.input.AvroStreamInputRowParser.parse(AvroStreamInputRowParser.java:33) ~[na:na
]
        at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:182) ~[
io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:182) ~[
io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:2
00) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:2
00) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2$$anonfun$2.apply(Transformin
gBeam.scala:36) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at com.twitter.util.Try$.apply(Try.scala:13) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]
        at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala
:36) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala
:35) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:7
78) ~[org.scala-lang.scala-library-2.11.7.jar:na]
        at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2
.11.7.jar:na]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-librar
y-2.11.7.jar:na]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-li
brary-2.11.7.jar:na]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-
2.11.7.jar:na]
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) ~[org.scala-l
ang.scala-library-2.11.7.jar:na]
        at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala
:36) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala
:35) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:7
78) ~[org.scala-lang.scala-library-2.11.7.jar:na]
        at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2
.11.7.jar:na]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-librar
y-2.11.7.jar:na]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-li
brary-2.11.7.jar:na]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-
2.11.7.jar:na]
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) ~[org.scala-l
ang.scala-library-2.11.7.jar:na]
        at com.metamx.tranquility.beam.TransformingBeam.sendAll(TransformingBeam.scala:35) ~[io.druid.t
ranquility-core-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.tranquilizer.Tranquilizer.com$metamx$tranquility$tranquilizer$Tranquilizer$$sendBuffer(Tranquilizer.scala:301) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at scala.Option.foreach(Option.scala:257) ~[org.scala-lang.scala-library-2.11.7.jar:na]
        at com.metamx.tranquility.tranquilizer.Tranquilizer.send(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:76) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]
        at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_51]
        at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_51]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
        at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
2016-07-10 18:31:25,147 [KafkaConsumer-1] INFO  c.m.tranquility.kafka.KafkaConsumer - Shutting down - attempting to flush buffers and commit final offsets
2016-07-10 18:31:25,149 [KafkaConsumer-1] INFO  k.c.ZookeeperConsumerConnector - [druid-kafka_mysql.kafkas.prod.slu.faw.bskyb.com-1468175478305-6b28cf41], ZKConsumerConnector shutting down
2016-07-10 18:31:25,158 [KafkaConsumer-1] INFO  k.c.ZookeeperTopicEventWatcher - Shutting down topic event watcher.
2016-07-10 18:31:25,158 [KafkaConsumer-1] INFO  k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1468175478421] Stopping leader finder thread
[...]

One thing maybe a bit exotic in my setup is the below node proxy to be able to use the Confluent schema registry and make it "AVRO compliant": (I called this file registry-proxy.js)
var http = require('http');
var sys  = require('sys');
var config = require('./proxy-config.json');

http.createServer(function(request, response) {
    sys.log(">> " + request.method + " " + request.url);
    var subrequestPath, requestType;
    var matches = request.url.match(/\/([a-z-0-9]+)\/id\/(\d+)$/);
    // sys.log(matches);
    if (matches) {
        requestType = 'schema';
        var version = matches[2] === '0' ? 'latest' : matches[2];
        subrequestPath = '/'+matches[1]+'/versions/'+version;
    } else {
        requestType = 'latestVersion';
        subrequestPath = request.url + '/versions';
    };
    var options = {
        host: config.target.host,
        port: config.target.port,
        headers: request.headers,
        path: config.target.path + subrequestPath
    };
    var subrequest = http.request(options);
    sys.log(">-> GET "+ options.path);
    subrequest.addListener('response', function (subresponse) {
        sys.log("<-< "+ subresponse.statusCode);
        var answer = '';
        subresponse.addListener('data', function(chunk) {
            answer = answer + chunk.toString();
        });
        subresponse.addListener('end', function() {
            // sys.log('<-< '+ answer);
            var body = 'unhandled request type';
            var json = JSON.parse(answer);
            if (requestType === 'latestVersion') {
                var versions = json;
                var latest = versions.reduce(function(max, v) {
                    if (max < v) {
                        return v;
                    } else {
                        return max;
                    }
                }, -1);
                body = ''+latest;
            } else if (requestType === 'schema' ) {
                var schema = json.schema ? json.schema
                    .replace(/^"/, "")
                    .replace(/"$/,"")
                    .replace(/\\(.)/g, "$1") : 'schema not found';
                body = schema;
            }
            sys.log('<< '+ body);
            subresponse.headers['content-length'] = body.length;
            response.writeHead(subresponse.statusCode, subresponse.headers);
            response.end(body);
        });
    });
    request.addListener('data', function(chunk) {
        subrequest.write(chunk, 'binary');
    });
    request.addListener('end', function() {
        subrequest.end();
    });
}).listen(config.port);

sys.log("Proxy listening on port "+ config.port +
        " hitting http://"+config.target.host+":"+config.target.port+config.target.path);

The config file proxy-config.json looks like this:
{
    "port": 8080,
    "target": {
        "host": "schemaregistry.example.com",
        "port": 8081,
        "path": "/subjects"
    }
}

When both files are in the working directory you can fire it with: $ node registry-proxy.js

I have run a diff on the output of this endpoint with what this project produces https://github.com/schema-repo/schema-repo and apart from spacing, both returned JSON are equal.

Apart from this, I am using the quickconf and here is my kafka config for tranquility:
{
  "dataSources" : [
    {
      "spec" : {
        "dataSchema" : {
          "dataSource" : "web-kafka",
          "parser" : {
            "type" : "avro_stream",
            "avroBytesDecoder": {
              "type": "schema_repo",
              "subjectAndIdConverter": {
                "type": "avro_1124",
                "topic": "web-view-message"
              },
              "schemaRepository": {
                "type": "avro_1124_rest_client",
                "url": "http://localhost:8080"
              }
            },
            "parseSpec" : {
              "timestampSpec" : {
                "column" : "timestamp",
                "format" : "auto"
              },
              "dimensionsSpec" : {
                "dimensions" : [],
                "dimensionExclusions" : [
                  "timestamp"
                ]
              },
              "format" : "timeAndDims"
            }
          },
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "hour",
            "queryGranularity" : "none"
          },
          "metricsSpec" : [
            {
              "type" : "count",
              "name" : "count"
            }
          ]
        },
        "ioConfig" : {
          "type" : "realtime"
        },
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",
          "intermediatePersistPeriod" : "PT10M",
          "windowPeriod" : "PT10M"
        }
      },
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1",
        "topicPattern" : "web-view-message"
      }
    }
  ],
  "properties" : {
    "zookeeper.connect" : "localhost",
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "commit.periodMillis" : "15000",
    "consumer.numThreads" : "2",
    "kafka.zookeeper.connect" : "myzookeeper.example.com:2181",
    "kafka.group.id" : "druid-kafka",
    "serialization.format" : "smile",
    "druidBeam.taskLocator": "overlord"
  }
}

Thanks for your ideas!

--
Hubert

Hubert Behaghel

unread,
Jul 10, 2016, 4:59:54 PM7/10/16
to Druid Development
typo below, the nodejs proxy is to make the confluent schema registry AVRO-1124 compliant (only for the part of the API that's actually used by the extension).

Himanshu

unread,
Jul 12, 2016, 3:54:41 PM7/12/16
to druid-de...@googlegroups.com
Hi Hubert,

I think schema-repo is a fair bit of overhead to use and things can be simplified. some questions...

1. do all events have same schema?
2. do you put schema identifier in each event payload in first 4 bytes?
3. are you forced to build the node application because of the way things are and would rather like to put a config file containing schemas on all tranquility nodes?

-- Himanshu

PS: I'm assuming that your custom node app is compliant with REST api described at https://github.com/schema-repo/schema-repo/wiki/Service-Endpoints .


--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/8f55a37d-bb5d-49fc-a80b-55e37af10f6d%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Hubert Behaghel

unread,
Jul 13, 2016, 4:26:25 AM7/13/16
to druid-de...@googlegroups.com
Thanks for your reply Himanshu!

1. Yes, all events share exactly the same schema
2. Yes, the first 4 bits of each event is the schema id
3. True, I would rather hardcode the schema on each tranquility node.

Thanks again.

--
Hubert


You received this message because you are subscribed to a topic in the Google Groups "Druid Development" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-development/tg2q3VHN5rQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-developm...@googlegroups.com.

To post to this group, send email to druid-de...@googlegroups.com.

Himanshu Gupta

unread,
Jul 15, 2016, 1:12:23 AM7/15/16
to Druid Development
Can you use https://github.com/druid-io/druid/pull/3249

That patch would allow you to inline the reader schema in the task json itself (assuming all events have same schema), you would write something like following...

....
 "parser" : {
            "type" : "avro_stream",
            "avroBytesDecoder": {
              "type": "schema_inline",
              "schema": {
                     //your schema here
              }
            },
            "parseSpec" : {
....


With this, you wouldn't need the node app and all the doubts associated with it. Note that, you would not use first 4 bytes in the event to encode any schema id too.

If you get parsing error even after this then you can dump the event by changing exception catch clause in SimpleAvroBytesDecoder.java and see what is wrong with those events really.

hth,
Himanshu
To unsubscribe from this group and stop receiving emails from it, send an email to druid-development+unsubscribe@googlegroups.com.

To post to this group, send email to druid-development@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Druid Development" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-development/tg2q3VHN5rQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-development+unsubscribe@googlegroups.com.
To post to this group, send email to druid-development@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages