Command to read directly from kafka topic not from a file

40 views
Skip to first unread message

nandumal...@gmail.com

unread,
Aug 20, 2018, 1:48:21 PM8/20/18
to Pinot Users
Hello,

I'm trying to setup kafka topic and have pinot consume the data from the topic directly. But I only see StreamAvroIntoKafkaCommand to read the data from avro file to kafka topic. Can someone please tell how to read directly from the topic?

Thanks,
Nandini.

shle...@gmail.com

unread,
Aug 20, 2018, 8:29:26 PM8/20/18
to Pinot Users
Hi Nadini,

You can specify the kafka topic name("stream.kafka.topic.name") under "streamConfigs". You can refer the following page for the table config example. https://github.com/linkedin/pinot/wiki/Pinot-Realtime-Workflow

Best,
Seunghyun

nandumal...@gmail.com

unread,
Aug 23, 2018, 5:31:42 PM8/23/18
to Pinot Users
I'm able to link the topic to pinot. But as soon as i add realtime config to pinot, and try to query , the count is 0. However the total docs are increasing. Am i missing something here ?


Before adding realtime config:
Result: {"aggregationResults":[{"function":"count_star","value":"3823021"}],"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numDocsScanned":3823021,"numEntriesScannedInFilter":3836544,"numEntriesScannedPostFilter":0,"totalDocs":4480984,"timeUsedMs":607,"segmentStatistics":[],"traceInfo":{}}

After adding realtime config:

Result: {"aggregationResults":[{"function":"count_star","value":"0"}],"exceptions":[],"numServersQueried":2,"numServersResponded":2,"numDocsScanned":0,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":0,"totalDocs":4480987,"timeUsedMs":6,"segmentStatistics":[],"traceInfo":{}}



After adding 


Thanks,
Nandini.

Mayank Shrivastava

unread,
Aug 23, 2018, 5:56:57 PM8/23/18
to nandumal...@gmail.com, Pinot Users
It seems like the time boundary may be incorrectly setup. Can you ensure your time column for offline and realtime tables have the same unit and the data is consistent with the unit?

-mayank

From: pinot...@googlegroups.com <pinot...@googlegroups.com> on behalf of nandumal...@gmail.com <nandumal...@gmail.com>
Sent: Thursday, August 23, 2018 2:31 PM
To: Pinot Users
Subject: Re: Command to read directly from kafka topic not from a file
 
--
You received this message because you are subscribed to the Google Groups "Pinot Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pinot_users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/pinot_users/54d6c60e-2465-44d4-9d9f-f2e89cc7b9f0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

kishore g

unread,
Aug 23, 2018, 6:27:59 PM8/23/18
to Mayank Shrivastava, nandumal...@gmail.com, Pinot Users
Also, check if the schema matches across the two.  Run the following queries
- select count(*) from T_OFFLINE
- select count(*) from T_REALTIME
- select max(time_column) from T_OFFLINE
- select min(time_column) from T_OFFLINE
- select max(time_column) from T_REALTIME
- select max(time_column) from T_REALTIME

where T is your table name

thanks,
Kishore G

To unsubscribe from this group and stop receiving emails from it, send an email to pinot_users+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Pinot Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pinot_users+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/pinot_users/BL0PR2101MB1124189FDD19FDD255B231F0BA370%40BL0PR2101MB1124.namprd21.prod.outlook.com.

nandumal...@gmail.com

unread,
Aug 23, 2018, 7:06:16 PM8/23/18
to Pinot Users
I checked the queries  : 

OFFLINE:


Result: {"aggregationResults":[{"function":"count_star","value":"4480984"}],"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numDocsScanned":4480984,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":0,"totalDocs":4480984,"timeUsedMs":9,"segmentStatistics":[],"traceInfo":{}}


REALTIME :

Result: {"aggregationResults":[{"function":"count_star","value":"4"}],"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numDocsScanned":4,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":0,"totalDocs":4,"timeUsedMs":6,"segmentStatistics":[],"traceInfo":{}}

And for time_coulmn , I created a dummy column with same value for all. Data type is INT and value is 365. So all the queries are returning 365 itself. 

However when i try to just get the overall count, it returns 0.

Thanks,

Nandini



To unsubscribe from this group and stop receiving emails from it, send an email to pinot_users...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Pinot Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pinot_users...@googlegroups.com.

Jialiang Li

unread,
Aug 25, 2018, 11:20:40 PM8/25/18
to Pinot Users
Currently Pinot uses "timeFieldSpec" to specify the time boundary for hybrid table. The below is an example of timeFieldSpec:

"timeFieldSpec": {
    "incomingGranularitySpec": {
      "name": "daysSinceEpoch",
      "dataType": "INT",
      "timeType": "DAYS"
    }
}

The way how Pinot merges realtime offline and realtime tables is that it keeps the maximum value for the time column, then splits the query into two based on the value of the time column.

For example, if you do select count(*) from myTable, this gets rewritten to two queries in Pinot broker (and ? is the value of the maximum of daysSinceEpoch in myTable_OFFLINE):
- select count(*) from myTable_OFFLINE where daysSinceEpoch < ?
- select count(*) from myTable_REALTIME where daysSinceEpoch >= ?

So in your case, since all the time column is set to 365, your original query will only hit realtime table. 
If possible, could you please try with a different date for both tables, like [0 - 364] for offline table and [365 - 400] for realtime table?

nandumal...@gmail.com

unread,
Aug 26, 2018, 5:08:15 PM8/26/18
to Pinot Users
Thank you for the detailed explanation. I modified the data and still no luck. The query result is still 0. :( 

Mayank Shrivastava

unread,
Aug 26, 2018, 6:07:25 PM8/26/18
to nandumal...@gmail.com, Pinot Users
Could you please share the output of all the queries Kishore asked to run earlier in the email thread?



 

From: 32051110660n behalf of
Sent: Sunday, August 26, 2018 2:08 PM

nandumal...@gmail.com

unread,
Aug 26, 2018, 11:17:41 PM8/26/18
to Pinot Users

select count(*) from panelView_REALTIME


Result: {"aggregationResults":[{"function":"count_star","value":"8"}],"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numDocsScanned":8,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":0,"totalDocs":8,"timeUsedMs":8,"segmentStatistics":[],"traceInfo":{}}


select count(*) from panelView_OFFLINE


Result: {"aggregationResults":[{"function":"count_star","value":"4480984"}],"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numDocsScanned":4480984,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":0,"totalDocs":4480984,"timeUsedMs":4,"segmentStatistics":[],"traceInfo":{}}


select max(DaysSinceEpoch)  from panelView_OFFLINE


Result: {"aggregationResults":[{"function":"max_DaysSinceEpoch","value":"365.00000"}],"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numDocsScanned":4480984,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":0,"totalDocs":4480984,"timeUsedMs":15,"segmentStatistics":[],"traceInfo":{}}


select max(DaysSinceEpoch)  from panelView_REALTIME


Result: {"aggregationResults":[{"function":"max_DaysSinceEpoch","value":"395.00000"}],"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numDocsScanned":8,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":8,"totalDocs":8,"timeUsedMs":65,"segmentStatistics":[],"traceInfo":{}}


select min(DaysSinceEpoch)  from panelView_REALTIME


Result: {"aggregationResults":[{"function":"min_DaysSinceEpoch","value":"370.00000"}],"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numDocsScanned":8,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":8,"totalDocs":8,"timeUsedMs":4,"segmentStatistics":[],"traceInfo":{}}


select min(DaysSinceEpoch)  from panelView_OFFLINE


Result: {"aggregationResults":[{"function":"min_DaysSinceEpoch","value":"365.00000"}],"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numDocsScanned":4480984,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":0,"totalDocs":4480984,"timeUsedMs":5,"segmentStatistics":[],"traceInfo":{}}

Mayank Shrivastava

unread,
Aug 26, 2018, 11:27:35 PM8/26/18
to nandumal...@gmail.com, Pinot Users
Thanks, so the min time in real-time is greater than that of offline. What is the time unit that you set for both the tables in the table configs?

 

From: 32461276620n behalf of
Sent: Sunday, August 26, 2018 8:17 PM

nandumal...@gmail.com

unread,
Aug 26, 2018, 11:34:55 PM8/26/18
to Pinot Users
Should the min in online be lesser than offline? I gave values as per the previous explanation. 

Schema :

"timeFieldSpec": {
    "incomingGranularitySpec": {
      "timeType": "DAYS",
      "dataType": "INT",
      "name": "DaysSinceEpoch"
    }
  },


Offline config :

"segmentsConfig" : {
        "retentionTimeUnit":"DAYS",
        "retentionTimeValue":"700",
        "segmentPushFrequency":"daily",
        "segmentPushType":"APPEND",
        "replication" : "3",
        "schemaName" : "panelView",
        "timeColumnName" : "DaysSinceEpoch",
        "timeType" : "DAYS",
        "segmentAssignmentStrategy" : "BalanceNumSegmentAssignmentStrategy"
    }

Online Config:

"segmentsConfig" : {
        "retentionTimeUnit":"DAYS",
        "retentionTimeValue":"700",
        "segmentPushFrequency":"daily",
        "segmentPushType":"APPEND",
        "replication" : "1",
        "schemaName" : "panelView",
        "timeColumnName" : "DaysSinceEpoch",
        "timeType" : "DAYS",
        "segmentAssignmentStrategy" : "BalanceNumSegmentAssignmentStrategy"
    }

Thanks,
Nandini.

Mayank Shrivastava

unread,
Aug 27, 2018, 12:53:48 AM8/27/18
to nandumal...@gmail.com, Pinot Users
The time boundary seems correct, so I am unsure what is going on here. Another possibility could have been that since your time values are bogus may be retention kicks in and deletes the segments. But that seems unlikely since you are still able to query the individual tables.

Do you seen any messages in the broker/server logs that might give some clues? If not, may be you could share your setup with us (avro, and how you setup realtime, table configs, schema etc) and we will try to reproduce and debug locally.

-mayank

Sent: Sunday, August 26, 2018 8:34 PM
Reply all
Reply to author
Forward
0 new messages