I'm trying to create a storm bolt using tranquility to send data to druid. My storm bolt looks as follows :
class DruidBeamBolt implements com.metamx.tranquility.storm.BeamFactory<Map<String, Object>> {
@Override
public Beam<Map<String, Object>> makeBeam(Map<?, ?> map, IMetricsContext imc) {
try {
final CuratorFramework curator = CuratorFrameworkFactory.newClient(
Configuration.ZOOKEEPER_SERVERS_CONFIG, new BoundedExponentialBackoffRetry(100, 1000, 5));
curator.start();
final String dataSource = Configuration.KAFKA_SERVERS_TOPIC;
final List<String> dimensions = ImmutableList.of("timestamp, series[0].tags, series[0].host, series[0].device_name, series[0].interval, series[0].metric, series[0].points[0][0], series[0].points[0][1], series[0].type, series[1].tags[0], series[1].host, series[1].device_name, series[1].interval, series[1].metric, series[1].points[0][0], series[1].points[0][1], series[1].type");
final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory("events"),
new DoubleSumAggregatorFactory("sum_value", "value"),
new MaxAggregatorFactory("max_value", "value"),
new MinAggregatorFactory("min_value", "value"));
final DruidBeams.Builder<Map<String, Object>> builder = DruidBeams
.builder(
new Timestamper<Map<String, Object>>() {
@Override
public DateTime timestamp(Map<String, Object> theMap) {
try {
Long date = Long.parseLong(theMap.get("timestamp").toString());
date = date * 1000;
return new DateTime(date.longValue());
}
catch(NumberFormatException e)
{
System.out.println(e);
}
return DateTime.now();
}
}
)
.curator(curator)
.discoveryPath("/druid/discovery")
.location(
new DruidLocation(
new DruidEnvironment(
"druid/overlord",
"druid:local:firehose:%s"
), dataSource
)
)
.rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.MINUTE))
.tuning(ClusteredBeamTuning.create(Granularity.HOUR, new Period("PT0M"), new Period("PT10M"), 1, 1));
final Beam<Map<String, Object>> beam = builder.buildBeam();
return beam;
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
-------------------------------------------------Overlord/runtime.properties-----------------------------------------
30403 [ClusteredBeam-ZkFuturePool-e9a72a07-d141-4ba8-a9f1-2c02d30d6900] INFO com.metamx.common.scala.net.finagle.DiscoResolver - Updating instances for service[druid:overlord] to Set(ServiceInstance{name='druid:overlord', id='cebfe02b-6593-49c4-866f-90e884847256', address='localhost', port=8090, sslPort=null, payload=null, registrationTimeUTC=1460677871525, serviceType=DYNAMIC, uriSpec=null})
30510 [ClusteredBeam-ZkFuturePool-e9a72a07-d141-4ba8-a9f1-2c02d30d6900] INFO com.metamx.tranquility.finagle.FinagleRegistry - Created client for service: druid:overlord
31708 [finagle/netty3-17] WARN com.metamx.tranquility.finagle.FutureRetry$ - Transient error, will try again in 12,079 ms
com.twitter.finagle.ChannelWriteException: java.net.ConnectException: Connection refused: no further information: localhost/
127.0.0.1:8090 from service: druid:overlord
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_79]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_79]
at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) [netty-3.10.5.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) [netty-3.10.5.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) [netty-3.10.5.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [netty-3.10.5.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) [netty-3.10.5.Final.jar:na]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.10.5.Final.jar:na]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.10.5.Final.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_79]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_79]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.net.ConnectException: Connection refused: no further information: localhost/
127.0.0.1:8090----------------------------------------------------------------------------------------------------------------------------------------------------------------------------