Hey !
I am new to Druid and still in the process of discovering the technologies...
So far, I have completed the Getting Started section, and in particular the
loading stream data section. This works so far !
Now I would like to use Tranquility so that I can also stream data from my own application. I have been trying to use the
sample code and reusing the same server configuration json file as in the getting started example.
server.json
{
"dataSources" : {
"pageviews" : {
"spec" : {
"dataSchema" : {
"dataSource" : "pageviews",
"parser" : {
"type" : "string",
"parseSpec" : {
"timestampSpec" : {
"column" : "time",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions" : ["url", "user"]
},
"format" : "json"
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "hour",
"queryGranularity" : "none"
},
"metricsSpec" : [
{"name": "views", "type": "count"},
{"name": "latencyMs", "type": "doubleSum", "fieldName": "latencyMs"}
]
},
"ioConfig" : {
"type" : "realtime"
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT10M"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1"
}
}
},
"properties" : {
"zookeeper.connect" : "localhost",
"druid.discovery.curator.path" : "/druid/discovery",
"druid.selectors.indexing.serviceName" : "druid/overlord",
"http.port" : "8200",
"http.threads" : "8"
}
}
The test code:
public class Main {
private static final Logger log = new Logger(Main.class);
public static void main(String[] args) {
// Read config from "server.json" on the classpath.
final InputStream configStream = Main.class.getClassLoader().getResourceAsStream("server.json");
final TranquilityConfig<PropertiesBasedConfig> config = TranquilityConfig.read(configStream);
final DataSourceConfig<PropertiesBasedConfig> pageViewConf = config.getDataSource("pageviews");
final Tranquilizer<Map<String, Object>> sender = DruidBeams.fromConfig(pageViewConf)
.buildTranquilizer(pageViewConf.tranquilizerBuilder());
sender.start();
try {
// Send 10000 objects
for (int i = 0; i < 10000; i++) {
// Build a sample event to send; make sure we use a current date
final Map<String, Object> obj = ImmutableMap.<String, Object>of(
"time", new DateTime().toString(),
"url", "/foo/bar",
"user", "alice",
"latencyMs", i
);
// Asynchronously send event to Druid:
sender.send(obj).addEventListener(
new FutureEventListener<BoxedUnit>() {
public void onSuccess(BoxedUnit value) {
log.info("Sent message: %s", obj);
}
public void onFailure(Throwable e) {
if (e instanceof MessageDroppedException) {
log.warn(e, "Dropped message: %s", obj);
} else {
log.error(e, "Failed to send message: %s", obj);
}
}
}
);
}
} finally {
sender.flush();
sender.stop();
}
}
}
I have started all 5 nodes (historical, broker, coordinator, overlord and middleManager) on local (I am still doing everything local so far) and when starting the code, I get the following error:
Exception in thread "main" java.lang.IllegalArgumentException: Instantiation of [simple type, class io.druid.data.input.impl.StringInputRowParser] value failed: com.metamx.common.parsers.JSONPathParser
at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:2774)
at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:2700)
...
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Instantiation of [simple type, class io.druid.data.input.impl.StringInputRowParser] value failed: com.metamx.common.parsers.JSONPathParser
at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.wrapException(StdValueInstantiator.java:405)
at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:234)
...
Caused by: java.lang.ClassNotFoundException: com.metamx.common.parsers.JSONPathParser
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
From the error I suspect it is because the compiler does not find com.metamx.common.parsers.JSONPathParser? If this is the case, how can I include
this in my project?
Thank for your help !
Pascal