Data pushed to Druid using Tranquility but no datasource is created.

438 views
Skip to first unread message

mbenali...@gmail.com

unread,
Dec 5, 2016, 8:14:45 AM12/5/16
to Druid User
Hello,

I am trying to push data to Druid using Tranquility in my Flink streaming application.

Here is the configuration:
- Flink version 1.1.2
- Tranquility version 0.8.2
- Druid version 0.9.1.1 


I create fake objects, then I transform them into a HashMap and then I use a custom Beam to send them to Druid via tranquility.


Here is my Flink stream code :

public class SlimDruidStream {

 
public static void main(String[] args) throws Exception {

 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env
.setParallelism(1);

 
SimpleObject o;

 
List<SimpleObject> list = new ArrayList<SimpleObject>();

 
for(int i = 0 ; i < 3 ; i++) {
   o
= new SimpleObject(System.currentTimeMillis(), i,"dimValue"+i);
   list
.add(o);
 
}

 
DataStreamSource<SimpleObject> mapSource = env.fromCollection(list);

 
DataStream<Map<String,Object>> mapStream = mapSource.map(new MapFunction<SimpleObject, Map<String,Object>>() {
   
public Map<String,Object> map(SimpleObject simpleObj) throws Exception {
     
Map<String,Object> result = new HashMap<>();
     result
.put("timestamp", simpleObj.getTimestamp());
     result
.put("dimValue", simpleObj.getDimValue());
     result
.put("numValue", simpleObj.getNumValue());

     
return result;
   
}
 
});

 mapStream
.addSink(new BeamSink(new SimpleBeamFactory("test-flink"), true));

 env
.execute();

 
}
}


Here is my SimpleObject class :

public class SimpleObject {

 
private Long timestamp;
 
private Integer numValue;
 
private String dimValue;

 
public SimpleObject() {
 
}

 
public SimpleObject(Long timestamp, Integer numValue, String dimValue) {
 
this.timestamp = timestamp;
 
this.numValue = numValue;
 
this.dimValue = dimValue;
 
}

 
public Long getTimestamp() {
 
return timestamp;
 
}

 
public void setTimestamp(Long timestamp) {
 
this.timestamp = timestamp;
 
}

 
public Integer getNumValue() {
 
return numValue;
 
}

 
public void setNumValue(Integer numValue) {
 
this.numValue = numValue;
 
}

 
public String getDimValue() {
 
return dimValue;
 
}

 
public void setDimValue(String dimValue) {
 
this.dimValue = dimValue;
 
}

}




Here is my SimpleBeam Factory code:

public class SimpleBeamFactory implements BeamFactory {

 
final protected String datasource;
 
final protected String druidZkHosts = "127.0.0.1:2181";
 
final protected String druidZkDiscoveryPath = "/druid/discovery";
 
final protected String druidIndexServiceName = "druid/overlord";
 
final protected String druidFirehosePattern = "firehose:%s";
 
final protected int numReplicants = 1;
 
final protected int numPartitions = 1;

 
public SimpleBeamFactory(String datasource) {
 
this.datasource = datasource;
 
}

 
public Tranquilizer<Map<String,Object>> tranquilizer(){
 
Tranquilizer<Map<String,Object>> t = Tranquilizer.create(makeBeam());
  t
.start();
 
return t;
 
}

 
public Beam makeBeam() {

 
final List<String> dimensions = ImmutableList.of(
   
"dimValue"
 
);

 
final List<AggregatorFactory> aggregators = ImmutableList.of(
 
new CountAggregatorFactory("agg_count"),
 
new HyperUniquesAggregatorFactory("agg_distinct_dimValue", "dimValue")
 
);


 
// Tranquility uses ZooKeeper (through Curator) for coordination.
 
final CuratorFramework curator = CuratorFrameworkFactory
 
.builder()
 
.connectString(this.druidZkHosts)
 
.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
 
.build();
 curator
.start();

 
final Timestamper<Map<String,Object>> timestamper = new Timestamper<Map<String,Object>>() {
 
public DateTime timestamp(Map<String,Object> map) {
   
return new DateTime(map.get("timestamp"));
 
}
 
};


 
final Beam<Map<String,Object>> beam = DruidBeams
 
.builder(timestamper)
 
.curator(curator)
 
.discoveryPath(this.druidZkDiscoveryPath)
 
.location(
 
DruidLocation.create(
   
this.druidIndexServiceName,
   
this.druidFirehosePattern,
   
this.datasource
 
)
 
)
 
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
 
.tuning(
 
ClusteredBeamTuning
 
.builder()
 
.segmentGranularity(Granularity.DAY)
 
.windowPeriod(new Period("PT30M"))
 
.partitions(this.numPartitions)
 
.replicants(this.numReplicants)
 
.build()
 
).buildBeam();

 
return beam;
 
}
}



Here is the log I get in my Flink application : 

2016-12-05 12:24:10.667+01:00 DEBUG Tranquilizer:? - Sent[1], dropped[0], failed[0] out of 1 messages from batch #1. 1 batches still pending.


The task is created, but the "test-flink" datasource is never created and I get no interesting log in my Druid task.

I am sure data are pushed to Druid because if I voluntarily insert a typing error and write "timestampppp" instead of "timestamp" , i get the following log in my Druid task logs :


2016-12-05T11:04:19,872 WARN [qtp332998175-48] org.eclipse.jetty.servlet.ServletHandler - /druid/worker/v1/chat/firehose:test-flink7-005-0000-0000/push-events com.metamx.common.parsers.ParseException: Unparseable timestamp found! at io.druid.data.input.impl.MapInputRowParser.parse(MapInputRowParser.java:72) ~[druid-api-0.9.1.1.jar:0.9.1.1] at io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory$EventReceiverFirehose.addAll(EventReceiverFirehoseFactory.java:192) ~[druid-server-0.9.1.1.jar:0.9.1.1] at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_111] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111] at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.SubLocatorRule.accept(SubLocatorRule.java:137) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409) ~[jersey-servlet-1.19.jar:1.19] at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:558) ~[jersey-servlet-1.19.jar:1.19] at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:733) ~[jersey-servlet-1.19.jar:1.19] at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) ~[javax.servlet-api-3.1.0.jar:3.1.0] at com.google.inject.servlet.ServletDefinition.doServiceImpl(ServletDefinition.java:278) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ServletDefinition.doService(ServletDefinition.java:268) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ServletDefinition.service(ServletDefinition.java:180) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ManagedServletPipeline.service(ManagedServletPipeline.java:93) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:120) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter$1.call(GuiceFilter.java:132) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter$1.call(GuiceFilter.java:129) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter$Context.call(GuiceFilter.java:206) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:129) ~[guice-servlet-4.0-beta.jar:?] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlets.UserAgentFilter.doFilter(UserAgentFilter.java:83) ~[jetty-servlets-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlets.GzipFilter.doFilter(GzipFilter.java:364) ~[jetty-servlets-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at io.druid.server.initialization.jetty.ResponseHeaderFilterHolder$ResponseHeaderFilter.doFilter(ResponseHeaderFilterHolder.java:100) ~[druid-server-0.9.1.1.jar:0.9.1.1] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1125) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1059) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.Server.handle(Server.java:497) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:248) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) [jetty-io-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:620) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:540) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] Caused by: java.lang.NullPointerException: Null timestamp in input: {dimValue=dimValue2, timestampppp=1480935856025, numValue=2} at io.druid.data.input.impl.MapInputRowParser.parse(MapInputRowParser.java:64) ~[druid-api-0.9.1.1.jar:0.9.1.1]
 
... 53 more



Any idea why the datasource is never created ?

Do not hesitate to ask if you need more information / more logs.

Thank you in advance for your help.

Melissa Benali-Richard

Fangjin Yang

unread,
Dec 15, 2016, 7:00:43 PM12/15/16
to Druid User
2016-12-05T11:04:19,872 WARN [qtp332998175-48] org.eclipse.jetty.servlet.ServletHandler - /druid/worker/v1/chat/firehose:test-flink7-005-0000-0000/push-events com.metamx.common.parsers.ParseException: Unparseable timestamp found! at io.druid.data.input.impl.MapInputRowParser.parse(MapInputRowParser.java:72) ~[druid-api-0.9.1.1.jar:0.9.1.1] at io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory$EventReceiverFirehose.addA

mbenali...@gmail.com

unread,
Dec 19, 2016, 3:54:12 AM12/19/16
to Druid User
Hello,

Thank you very much for your answer, however, I just did this error voluntraily (miswriting "timestamp") to show that I am sure that data reach Druid as we can see at the end of the log:

 Caused by:java.lang.NullPointerException: Null timestamp in input: {dimValue=dimValue2, timestampppp=1480935856025, numValue=2}


 If I write "timestamp" correctly in my main method, I do not have any log but no datasource is created... 
Message has been deleted

Gurmohit Singh

unread,
Aug 5, 2017, 7:54:05 AM8/5/17
to Druid User
Hello Melissa Benali-Richard,

Have you integrated Flink and Druid successfully?
Reply all
Reply to author
Forward
0 new messages