2016-07-10 18:31:22,750 [KafkaConsumer-1] INFO i.d.initialization.Initialization - added URL[file:/app
/imply-1.3.0/dist/druid/extensions/druid-avro-extensions/servlet-api-2.5-20081211.jar]
2016-07-10 18:31:22,750 [KafkaConsumer-1] INFO i.d.initialization.Initialization - added URL[file:/app
/imply-1.3.0/dist/druid/extensions/druid-avro-extensions/avro-1.7.7.jar]
2016-07-10 18:31:22,758 [KafkaConsumer-1] INFO i.d.initialization.Initialization - Adding local file s
ystem extension module [io.druid.data.input.avro.AvroExtensionsModule] for class [io.druid.initializati
on.DruidModule]
2016-07-10 18:31:23,922 [KafkaConsumer-1] INFO c.metamx.emitter.core.LoggingEmitter - Start: started [
true]
2016-07-10 18:31:24,730 [KafkaConsumer-1] INFO c.m.t.finagle.FinagleRegistry - Adding resolver for sch
eme[druidTask!druid:overlord].
2016-07-10 18:31:25,146 [KafkaConsumer-1] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:
java.lang.ArrayIndexOutOfBoundsException: 25
at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:257) ~[na:na]
at org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:199) ~[na:na]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[na:na]
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[na:na]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[na:na]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[na:na]
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[na:na]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[na:na]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[na:na]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) ~[na:na]
at io.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder.parse(SchemaRepoBasedAvroBytesDecod
er.java:82) ~[na:na]
at io.druid.data.input.AvroStreamInputRowParser.parse(AvroStreamInputRowParser.java:53) ~[na:na
]
at io.druid.data.input.AvroStreamInputRowParser.parse(AvroStreamInputRowParser.java:33) ~[na:na
]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:182) ~[
io.druid.tranquility-core-0.8.2.jar:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:182) ~[
io.druid.tranquility-core-0.8.2.jar:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:2
00) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:2
00) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2$$anonfun$2.apply(Transformin
gBeam.scala:36) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
at com.twitter.util.Try$.apply(Try.scala:13) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala
:36) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala
:35) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:7
78) ~[org.scala-lang.scala-library-2.11.7.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2
.11.7.jar:na]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-librar
y-2.11.7.jar:na]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-li
brary-2.11.7.jar:na]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-
2.11.7.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) ~[org.scala-l
ang.scala-library-2.11.7.jar:na]
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala
:36) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala
:35) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:7
78) ~[org.scala-lang.scala-library-2.11.7.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2
.11.7.jar:na]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-librar
y-2.11.7.jar:na]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-li
brary-2.11.7.jar:na]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-
2.11.7.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) ~[org.scala-l
ang.scala-library-2.11.7.jar:na]
at com.metamx.tranquility.beam.TransformingBeam.sendAll(TransformingBeam.scala:35) ~[io.druid.t
ranquility-core-0.8.2.jar:0.8.2]
at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
at scala.Option.foreach(Option.scala:257) ~[org.scala-lang.scala-library-2.11.7.jar:na]
at com.metamx.tranquility.tranquilizer.Tranquilizer.send(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]
at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:76) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]
at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_51]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_51]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
2016-07-10 18:31:25,147 [KafkaConsumer-1] INFO c.m.tranquility.kafka.KafkaConsumer - Shutting down - attempting to flush buffers and commit final offsets
2016-07-10 18:31:25,149 [KafkaConsumer-1] INFO k.c.ZookeeperConsumerConnector - [druid-kafka_mysql.kafkas.prod.slu.faw.bskyb.com-1468175478305-6b28cf41], ZKConsumerConnector shutting down
2016-07-10 18:31:25,158 [KafkaConsumer-1] INFO k.c.ZookeeperTopicEventWatcher - Shutting down topic event watcher.
2016-07-10 18:31:25,158 [KafkaConsumer-1] INFO k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1468175478421] Stopping leader finder thread
[...]
One thing maybe a bit exotic in my setup is the below node proxy to be able to use the Confluent schema registry and make it "AVRO compliant": (I called this file registry-proxy.js)
var http = require('http');
var sys = require('sys');
var config = require('./proxy-config.json');
http.createServer(function(request, response) {
sys.log(">> " + request.method + " " + request.url);
var subrequestPath, requestType;
var matches = request.url.match(/\/([a-z-0-9]+)\/id\/(\d+)$/);
// sys.log(matches);
if (matches) {
requestType = 'schema';
var version = matches[2] === '0' ? 'latest' : matches[2];
subrequestPath = '/'+matches[1]+'/versions/'+version;
} else {
requestType = 'latestVersion';
subrequestPath = request.url + '/versions';
};
var options = {
host: config.target.host,
port: config.target.port,
headers: request.headers,
path: config.target.path + subrequestPath
};
var subrequest = http.request(options);
sys.log(">-> GET "+ options.path);
subrequest.addListener('response', function (subresponse) {
sys.log("<-< "+ subresponse.statusCode);
var answer = '';
subresponse.addListener('data', function(chunk) {
answer = answer + chunk.toString();
});
subresponse.addListener('end', function() {
// sys.log('<-< '+ answer);
var body = 'unhandled request type';
var json = JSON.parse(answer);
if (requestType === 'latestVersion') {
var versions = json;
var latest = versions.reduce(function(max, v) {
if (max < v) {
return v;
} else {
return max;
}
}, -1);
body = ''+latest;
} else if (requestType === 'schema' ) {
var schema = json.schema ? json.schema
.replace(/^"/, "")
.replace(/"$/,"")
.replace(/\\(.)/g, "$1") : 'schema not found';
body = schema;
}
sys.log('<< '+ body);
subresponse.headers['content-length'] = body.length;
response.writeHead(subresponse.statusCode, subresponse.headers);
response.end(body);
});
});
request.addListener('data', function(chunk) {
subrequest.write(chunk, 'binary');
});
request.addListener('end', function() {
subrequest.end();
});
}).listen(config.port);
sys.log("Proxy listening on port "+ config.port +
" hitting http://"+config.target.host+":"+config.target.port+config.target.path);
The config file proxy-config.json looks like this:
{
"port": 8080,
"target": {
"port": 8081,
"path": "/subjects"
}
}
When both files are in the working directory you can fire it with: $ node registry-proxy.js
Apart from this, I am using the quickconf and here is my kafka config for tranquility:
{
"dataSources" : [
{
"spec" : {
"dataSchema" : {
"dataSource" : "web-kafka",
"parser" : {
"type" : "avro_stream",
"avroBytesDecoder": {
"type": "schema_repo",
"subjectAndIdConverter": {
"type": "avro_1124",
"topic": "web-view-message"
},
"schemaRepository": {
"type": "avro_1124_rest_client",
}
},
"parseSpec" : {
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions" : [],
"dimensionExclusions" : [
"timestamp"
]
},
"format" : "timeAndDims"
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "hour",
"queryGranularity" : "none"
},
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
}
]
},
"ioConfig" : {
"type" : "realtime"
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT10M"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1",
"topicPattern" : "web-view-message"
}
}
],
"properties" : {
"zookeeper.connect" : "localhost",
"druid.discovery.curator.path" : "/druid/discovery",
"druid.selectors.indexing.serviceName" : "druid/overlord",
"commit.periodMillis" : "15000",
"consumer.numThreads" : "2",
"serialization.format" : "smile",
"druidBeam.taskLocator": "overlord"
}
}
Thanks for your ideas!
--
Hubert