Hi,
I am trying to send data to druid using tranquility. My storm bolt is as follows:
-----------------------------------------------------------------------------------------------------------------------
class DruidBeamBolt implements com.metamx.tranquility.storm.BeamFactory<String> {
@Override
public Beam<String> makeBeam(Map<?, ?> map, IMetricsContext imc) {
try {
final CuratorFramework curator = CuratorFrameworkFactory.newClient(
Configuration.ZOOKEEPER_SERVERS_CONFIG, new BoundedExponentialBackoffRetry(100, 1000, 5));
curator.start(); /* Exception */
final String dataSource = "testBolt";
final List<String> dimensions =ImmutableList.of("test");
final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
new DoubleSumAggregatorFactory("interval", "value"));
final Beam<String> beam = DruidBeams
.builder(
new Timestamper<String>() {
@Override
public DateTime timestamp(String theMap) {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> map = null;
try {
map = mapper.readValue(theMap, Map.class);
} catch (IOException ex) {
System.out.println(ex);
}
return new DateTime(map.get("timestamp"));
}
}
)
.curator(curator)
.discoveryPath("/druid/discovery")
.location(
new DruidLocation(
new DruidEnvironment(
"druid:overlord",
"firehose:druid:overlord:%s"
), dataSource
)
)
.rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.MINUTE))
.tuning(ClusteredBeamTuning.create(Granularity.HOUR, new Period("PT0M"), new Period("PT10M"), 1, 1)
).buildBeam();
//final Beam<String> beam = builder.buildBeam();
return beam;
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
-------------------------------------------------------------logs--------------------------------------------------------------------------------
91342 [ClusteredBeam-ZkFuturePool-48674289-81bf-4943-9e4e-1d7a89bb9bc8] INFO com.metamx.common.scala.control$ - Creating druid indexing task (service = druid:overlord): {
"type" : "index_realtime",
"id" : "index_realtime_testBolt_2016-04-20T02:00:00.000Z_0_0",
"resource" : {
"availabilityGroup" : "testBolt-02-0000",
"requiredCapacity" : 1
},
"spec" : {
"dataSchema" : {
"dataSource" : "testBolt",
"parser" : {
"type" : "map",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "iso",
"missingValue" : null
},
"dimensionsSpec" : {
"dimensions" : [ "test" ],
"spatialDimensions" : [ ]
}
}
},
"metricsSpec" : [ {
"type" : "doubleSum",
"name" : "interval",
"fieldName" : "value"
} ],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : {
"type" : "duration",
"duration" : 60000,
"origin" : "1969-12-31T18:00:00.000-06:00"
}
}
},
"ioConfig" : {
"type" : "realtime",
"plumber" : null,
"firehose" : {
"type" : "clipped",
"interval" : "2016-04-20T02:00:00.000/2016-04-20T03:00:00.000",
"delegate" : {
"type" : "timed",
"shutoffTime" : "2016-04-20T03:15:00.000Z",
"delegate" : {
"type" : "receiver",
"serviceName" : "firehose:druid:overlord:testBolt-02-0000-0000",
"bufferSize" : 100000
}
}
}
},
"tuningConfig" : {
"shardSpec" : {
"type" : "linear",
"partitionNum" : 0
},
"rejectionPolicy" : {
"type" : "none"
},
"buildV9Directly" : false,
"maxPendingPersists" : 0,
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT10M",
"type" : "realtime",
"maxRowsInMemory" : 75000
}
}
176024 [finagle/netty3-2] WARN com.metamx.tranquility.beam.ClusteredBeam - Emitting alert: [anomaly] Failed to propagate events: druid:overlord/testBolt
{
"eventCount" : 1,
"timestamp" : "2016-04-20T02:00:00.000Z",
"beams" : "MergingPartitioningBeam(DruidBeam(interval = 2016-04-20T02:00:00.000/2016-04-20T03:00:00.000, partition = 0, tasks = [index_realtime_testBolt_2016-04-20T02:00:00.000Z_0_0/testBolt-02-0000-0000]))"
}
java.io.IOException: Failed to send request to task[index_realtime_testBolt_2016-04-20T02:00:00.000Z_0_0]: 500 Internal Server Error
---------------------------------------------------------------------- task logs -----------------------------------------------------------------
016-04-20T02:35:31,237 INFO [main] io.druid.indexing.worker.executor.ExecutorLifecycle - Attempting to lock file[/tmp/persistent/task/index_realtime_testBolt_2016-04-20T02:00:00.000Z_0_0/lock].
2016-04-20T02:35:31,238 INFO [main] io.druid.indexing.worker.executor.ExecutorLifecycle - Acquired lock file[/tmp/persistent/task/index_realtime_testBolt_2016-04-20T02:00:00.000Z_0_0/lock] in 1ms.
2016-04-20T02:35:31,243 INFO [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Running task: index_realtime_testBolt_2016-04-20T02:00:00.000Z_0_0
2016-04-20T02:35:31,251 INFO [task-runner-0] io.druid.segment.realtime.plumber.RealtimePlumber - Creating plumber using rejectionPolicy[io.druid.segment.realtime.plumber.NoopRejectionPolicyFactory$1@3c92121e]
2016-04-20T02:35:31,254 INFO [task-runner-0] io.druid.segment.realtime.plumber.RealtimePlumber - Expect to run at [2016-04-20T03:10:00.000Z]
2016-04-20T02:35:31,255 INFO [task-runner-0] io.druid.segment.realtime.plumber.RealtimePlumber - Starting merge and push.
2016-04-20T02:35:31,255 INFO [task-runner-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] segments. Attempting to hand off segments that start before [1970-01-01T00:00:00.000Z].
2016-04-20T02:35:31,255 INFO [task-runner-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] sinks to persist and merge
2016-04-20T02:35:31,289 INFO [task-runner-0] io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory - Connecting firehose: firehose:druid:overlord:testBolt-02-0000-0000
2016-04-20T02:35:31,290 INFO [task-runner-0] io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory - Found chathandler of class[io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider]
2016-04-20T02:35:31,290 INFO [task-runner-0] io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider - Registering Eventhandler[firehose:druid:overlord:testBolt-02-0000-0000]
2016-04-20T02:35:31,294 INFO [task-runner-0] io.druid.curator.discovery.CuratorServiceAnnouncer - Announcing service[DruidNode{serviceName='firehose:druid:overlord:testBolt-02-0000-0000', host='192.168.100.150', port=8101}]
2016-04-20T02:35:31,323 INFO [task-runner-0] io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider - Registering Eventhandler[testBolt-02-0000-0000]
2016-04-20T02:35:31,324 INFO [task-runner-0] io.druid.curator.discovery.CuratorServiceAnnouncer - Announcing service[DruidNode{serviceName='testBolt-02-0000-0000', host='192.168.100.150', port=8101}]
2016-04-20T02:35:31,334 WARN [task-runner-0] org.apache.curator.utils.ZKPaths - The version of ZooKeeper being used doesn't support Container nodes. CreateMode.PERSISTENT will be used instead.
2016-04-20T02:35:31,336 INFO [task-runner-0] io.druid.server.metrics.EventReceiverFirehoseRegister - Registering EventReceiverFirehoseMetric for service [firehose:druid:overlord:testBolt-02-0000-0000]
2016-04-20T02:35:31,337 INFO [task-runner-0] io.druid.data.input.FirehoseFactory - Firehose created, will shut down at: 2016-04-20T03:15:00.000Z-------------------------------------------------------------------------
I do not see any exceptions or any parsing errors in the task logs yet there is 500 internal server error. Any pointers about what I might be doing wrong?