Hello,
I am trying to push data to Druid using Tranquility in my Flink streaming application.
Here is the configuration:
- Flink version 1.1.2
- Tranquility version 0.8.2
- Druid version 0.9.1.1
I create fake objects, then I transform them into a HashMap and then I use a custom Beam to send them to Druid via tranquility.
Here is my Flink stream code :
public class SlimDruidStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SimpleObject o;
List<SimpleObject> list = new ArrayList<SimpleObject>();
for(int i = 0 ; i < 3 ; i++) {
o = new SimpleObject(System.currentTimeMillis(), i,"dimValue"+i);
list.add(o);
}
DataStreamSource<SimpleObject> mapSource = env.fromCollection(list);
DataStream<Map<String,Object>> mapStream = mapSource.map(new MapFunction<SimpleObject, Map<String,Object>>() {
public Map<String,Object> map(SimpleObject simpleObj) throws Exception {
Map<String,Object> result = new HashMap<>();
result.put("timestamp", simpleObj.getTimestamp());
result.put("dimValue", simpleObj.getDimValue());
result.put("numValue", simpleObj.getNumValue());
return result;
}
});
mapStream.addSink(new BeamSink(new SimpleBeamFactory("test-flink"), true));
env.execute();
}
}
Here is my SimpleObject class :
public class SimpleObject {
private Long timestamp;
private Integer numValue;
private String dimValue;
public SimpleObject() {
}
public SimpleObject(Long timestamp, Integer numValue, String dimValue) {
this.timestamp = timestamp;
this.numValue = numValue;
this.dimValue = dimValue;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Integer getNumValue() {
return numValue;
}
public void setNumValue(Integer numValue) {
this.numValue = numValue;
}
public String getDimValue() {
return dimValue;
}
public void setDimValue(String dimValue) {
this.dimValue = dimValue;
}
}
Here is my SimpleBeam Factory code:
public class SimpleBeamFactory implements BeamFactory {
final protected String datasource;
final protected String druidZkHosts = "127.0.0.1:2181";
final protected String druidZkDiscoveryPath = "/druid/discovery";
final protected String druidIndexServiceName = "druid/overlord";
final protected String druidFirehosePattern = "firehose:%s";
final protected int numReplicants = 1;
final protected int numPartitions = 1;
public SimpleBeamFactory(String datasource) {
this.datasource = datasource;
}
public Tranquilizer<Map<String,Object>> tranquilizer(){
Tranquilizer<Map<String,Object>> t = Tranquilizer.create(makeBeam());
t.start();
return t;
}
public Beam makeBeam() {
final List<String> dimensions = ImmutableList.of(
"dimValue"
);
final List<AggregatorFactory> aggregators = ImmutableList.of(
new CountAggregatorFactory("agg_count"),
new HyperUniquesAggregatorFactory("agg_distinct_dimValue", "dimValue")
);
// Tranquility uses ZooKeeper (through Curator) for coordination.
final CuratorFramework curator = CuratorFrameworkFactory
.builder()
.connectString(this.druidZkHosts)
.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
.build();
curator.start();
final Timestamper<Map<String,Object>> timestamper = new Timestamper<Map<String,Object>>() {
public DateTime timestamp(Map<String,Object> map) {
return new DateTime(map.get("timestamp"));
}
};
final Beam<Map<String,Object>> beam = DruidBeams
.builder(timestamper)
.curator(curator)
.discoveryPath(this.druidZkDiscoveryPath)
.location(
DruidLocation.create(
this.druidIndexServiceName,
this.druidFirehosePattern,
this.datasource
)
)
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
.tuning(
ClusteredBeamTuning
.builder()
.segmentGranularity(Granularity.DAY)
.windowPeriod(new Period("PT30M"))
.partitions(this.numPartitions)
.replicants(this.numReplicants)
.build()
).buildBeam();
return beam;
}
}
Here is the log I get in my Flink application :
2016-12-05 12:24:10.667+01:00 DEBUG Tranquilizer:? - Sent[1], dropped[0], failed[0] out of 1 messages from batch #1. 1 batches still pending.
The task is created, but the "test-flink" datasource is never created and I get no interesting log in my Druid task.
I am sure data are pushed to Druid because if I voluntarily insert a typing error and write "timestampppp" instead of "timestamp" , i get the following log in my Druid task logs :
2016-12-05T11:04:19,872 WARN [qtp332998175-48] org.eclipse.jetty.servlet.ServletHandler - /druid/worker/v1/chat/firehose:test-flink7-005-0000-0000/push-events com.metamx.common.parsers.ParseException: Unparseable timestamp found! at io.druid.data.input.impl.MapInputRowParser.parse(MapInputRowParser.java:72) ~[druid-api-0.9.1.1.jar:0.9.1.1] at io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory$EventReceiverFirehose.addAll(EventReceiverFirehoseFactory.java:192) ~[druid-server-0.9.1.1.jar:0.9.1.1] at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_111] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111] at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.SubLocatorRule.accept(SubLocatorRule.java:137) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409) ~[jersey-servlet-1.19.jar:1.19] at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:558) ~[jersey-servlet-1.19.jar:1.19] at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:733) ~[jersey-servlet-1.19.jar:1.19] at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) ~[javax.servlet-api-3.1.0.jar:3.1.0] at com.google.inject.servlet.ServletDefinition.doServiceImpl(ServletDefinition.java:278) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ServletDefinition.doService(ServletDefinition.java:268) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ServletDefinition.service(ServletDefinition.java:180) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ManagedServletPipeline.service(ManagedServletPipeline.java:93) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:120) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter$1.call(GuiceFilter.java:132) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter$1.call(GuiceFilter.java:129) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter$Context.call(GuiceFilter.java:206) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:129) ~[guice-servlet-4.0-beta.jar:?] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlets.UserAgentFilter.doFilter(UserAgentFilter.java:83) ~[jetty-servlets-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlets.GzipFilter.doFilter(GzipFilter.java:364) ~[jetty-servlets-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at io.druid.server.initialization.jetty.ResponseHeaderFilterHolder$ResponseHeaderFilter.doFilter(ResponseHeaderFilterHolder.java:100) ~[druid-server-0.9.1.1.jar:0.9.1.1] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1125) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1059) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.Server.handle(Server.java:497) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:248) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) [jetty-io-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:620) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:540) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] Caused by: java.lang.NullPointerException: Null timestamp in input: {dimValue=dimValue2, timestampppp=1480935856025, numValue=2} at io.druid.data.input.impl.MapInputRowParser.parse(MapInputRowParser.java:64) ~[druid-api-0.9.1.1.jar:0.9.1.1]
... 53 more
Any idea why the datasource is never created ?
Do not hesitate to ask if you need more information / more logs.
Thank you in advance for your help.
Melissa Benali-Richard