Cannot query new realtime data when ingest data using kafka indexing service.

14 views
Skip to first unread message

Zha Rui

unread,
Oct 10, 2016, 6:22:16 AM10/10/16
to sparklinedata
Hi Harish and all experts,

I cannot query to get new ingested data when data was realtime ingested using kafka indexing service. 

For example. I ingest data in "MINUTE" granularity in real time, after I start spark thriftserver with accelerator jar, I can get current minute data, but when new minute data was ingested, then I cannot get it unless restarting spark thriftserver.

I saw a bug fix note said that "fix bugs in Metadata caching: react to Druid Segment changes" in 0.3.0 release note, I think the case I described above is in this situation.

Please give a help, thank you so much !


Zha Rui

harish

unread,
Oct 10, 2016, 11:14:39 AM10/10/16
to sparklinedata
Hi Zha,

Currently, we don't query real-time servers. If you see line 261 in DruidMetadataCache we only account for historical servers. 
We haven't enabled querying real-time servers because haven't had the time to create a test environment for this.
For now, can you try changing line 261 to include real-time servers? 

Thanks,
Harish.

Zha Rui

unread,
Oct 10, 2016, 10:15:14 PM10/10/16
to sparklinedata
Thank you Harish, I'll try and give a feedback here.


在 2016年10月10日星期一 UTC+8下午11:14:39,harish写道:

Zha Rui

unread,
Oct 11, 2016, 10:43:23 PM10/11/16
to sparklinedata
Hi Harish,

I just fix this issue. There's nothing to do with realtime servers because kafka indexing service don't need realtime nodes. I just added and modified some code in getDataSourceInfo method of HistoricalServerAssignment class and some other relevant code to load metadata every time a  query request was submitted to get the newest intervals.

The main code I added in getDataSourceInfo method is below:

if (dCI.druidDataSources.contains(dRName.druidDataSource)) {

       if (!options.realtimeJob)

           dCI.druidDataSources(dRName.druidDataSource)

       else {

         var i: DruidDataSourceInfo =  dCI.druidDataSources(dRName.druidDataSource)

         dCI.druidDataSources.remove(i._1.name)

         var dds: DruidDataSource = i._2.getDruidClient.metadata(dRName.druidDataSource,

           options.loadMetadataFromAllSegments)

         dds = dds.copy(druidVersion = dCI.coordinatorStatus.version)

         val j: DruidDataSourceInfo = (i._1, dds);

         val m = dCI.druidDataSources.asInstanceOf[MMap[String, DruidDataSourceInfo]]

         m(i._1.name) = j

         i

       }

     } else {
       
...
     
}



I also found two bugs in query granularity implementation part and I'll send PRs on this weekend.

Thank you,
Zha Rui


在 2016年10月10日星期一 UTC+8下午11:14:39,harish写道:
Hi Zha,
x

harish

unread,
Oct 11, 2016, 11:26:19 PM10/11/16
to sparklinedata
ok, please send a PR for this and the other 2 issues.
Reply all
Reply to author
Forward
0 new messages