Index old data

125 views
Skip to first unread message

Marcelo Oikawa

unread,
Jun 2, 2016, 10:46:37 AM6/2/16
to druid...@googlegroups.com
Hi, List

I tried to set window period to "P4Y". Actually, I'm using data with timestamp column that has 2014 (a little old but sometimes I need to re-index data) to test my application using tranquility but I'm getting the error:
2016-06-02T14:22:30,429 INFO [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Running task: index_realtime_telecom_2014-08-19T11:55:00.000Z_0_0
2016-06-02T14:22:30,441 INFO [task-runner-0] io.druid.segment.realtime.plumber.RealtimePlumber - Creating plumber using rejectionPolicy[io.druid.segment.realtime.plumber.NoopRejectionPolicyFactory$1@621295db]
2016-06-02T14:22:30,454 ERROR [task-runner-0] io.druid.indexing.common.task.RealtimeIndexTask - Exception aborted realtime processing[telecom]: {class=io.druid.indexing.common.task.RealtimeIndexTask, exceptionType=class java.lang.UnsupportedOperationException, exceptionMessage=Cannot convert to Duration as this period contains years and years vary in length}
java.lang.UnsupportedOperationException: Cannot convert to Duration as this period contains years and years vary in length
	at org.joda.time.Period.checkYearsAndMonths(Period.java:1570) ~[joda-time-2.8.2.jar:2.8.2]
	at org.joda.time.Period.toStandardDuration(Period.java:1549) ~[joda-time-2.8.2.jar:2.8.2]
	at io.druid.segment.realtime.plumber.RealtimePlumber.startPersistThread(RealtimePlumber.java:845) ~[druid-server-0.8.3.jar:0.8.3]
	at io.druid.segment.realtime.plumber.RealtimePlumber.startJob(RealtimePlumber.java:209) ~[druid-server-0.8.3.jar:0.8.3]
	at io.druid.indexing.common.task.RealtimeIndexTask.run(RealtimeIndexTask.java:302) [druid-indexing-service-0.8.3.jar:0.8.3]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:285) [druid-indexing-service-0.8.3.jar:0.8.3]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:265) [druid-indexing-service-0.8.3.jar:0.8.3]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_91]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_91]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_91]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Does anyone knows how to solve this? Do I need to do some extra configuration?


Gian Merlino

unread,
Jun 2, 2016, 12:03:40 PM6/2/16
to druid...@googlegroups.com
Hey Marcelo,

That should work with PT35040H rather than P4Y, since the windowPeriods need to be fixed duration.

BUT, Tranquility is not really designed to handle ingestion of historical data like this– you will have issues with handoff, as tasks will stay around for four years before exiting!

A better approach would be to use batch indexing (local batch indexing for small amounts of data, remote Hadoop cluster for large amounts of data), or, if you are brave, the new Kafka supervisor stuff in 0.9.1 (http://druid.io/docs/0.9.1-rc1/development/extensions-core/kafka-ingestion.html). 0.9.1 has not been released yet but there is a release candidate, 0.9.1-rc1 available that you can try out.

Gian

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/CAD1mpAGqmBn%2B3nRHvGnfbeTTu8UdQxs0Q_JoE8e1dCwOxk4NUA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Marcelo Oikawa

unread,
Jun 2, 2016, 3:26:04 PM6/2/16
to druid...@googlegroups.com
Hi, Gian.

That should work with PT35040H rather than P4Y, since the windowPeriods need to be fixed duration.

This works for now.
 

BUT, Tranquility is not really designed to handle ingestion of historical data like this– you will have issues with handoff, as tasks will stay around for four years before exiting!

Thanks for the tip. Actually, I'm not using that in production so it's just a test for now. Now I'm getting the error:
java.lang.IllegalStateException: Not started
	at com.metamx.tranquility.tranquilizer.Tranquilizer.requireStarted(Tranquilizer.scala:265)
	at com.metamx.tranquility.tranquilizer.Tranquilizer.send(Tranquilizer.scala:161)
and my code is down bellow:

rdd.foreachPartition(partition -> {

partition.forEachRemaining(message -> {

if (sender == null || !sender.isAvailable()) {
sender = createTranquilizer(message);
}

druidMessages.stream().forEach(message -> {
Future<BoxedUnit> future = sender.send(message);
future.addEventListener(createListener(message));
});
sender.flush();
sender.close();
});
});
Note that I running an application over spark to send data to druid. This error sounds strange because some messages are sent and some not because sender object raise an exception after send some messages. Do I need to do some extra configuration before send message?



 

Gian Merlino

unread,
Jun 2, 2016, 6:41:35 PM6/2/16
to druid...@googlegroups.com
Hey Marcelo,

You should be avoiding calling "close" on the sender (it's not usable after that). From Scala the easiest way to integrate is to use the Tranquility Spark module (https://github.com/druid-io/tranquility/blob/master/docs/spark.md). It works with RDDs as well as DStreams.

From Java, you can at least do something similar to what it's doing. Check out the source here:


Gian

Reply all
Reply to author
Forward
0 new messages