intermediatePersist time is 10mins(default). From what I understand is realtime node keeps buffering the data unless the segment reaches the granularity and a data of later time stamp is received. It will still be in the memory till the window period. After that realtime node will write the data to local disk and commit to kafka that message is received. If something goes wrong with realtime node before it persists to local disk, it will consume the non commited data from
kafka. in my case it didn't happened. I checked the kafka topic and all data from cluster is gone now. As the rention period in kafka cluster is over the data is deleted from kafka as well. How should I handle this case?
Can we do something like realtime node write all the data it has because once our data is finished, realtime node will still hold the last chunk in its memory which might be lost in future? As in my case the last 10 minutes data should be written to the disk and loaded in historical nodes rather than being served by realtime node?
Here is my realtime spec file
[
{
"dataSchema" : {
"dataSource" : "ping",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "tsv",
"columns" : ["some dimensions"],
"delimiter":"\t",
"timestampSpec" : {
"column" : "server_ts",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"dimensionsSpec" : {
"dimensions": ["some dimensions"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}],
"aggregations": [{
"type": "longSum", "name": "numIngestedEvents", "fieldName": "count" }],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "FIVE_MINUTE", "queryGranularity" : "NONE"
}
},
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.8",
"consumerProps": {
"zookeeper.connect": "hostname:2181",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "avping-full",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "pingFull"
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 250000000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT5m",
"basePersistDirectory": "/mnt/druid/realtime/basePersist",
"rejectionPolicy": {
"type": "messageTime"
}
}
}
]
I have used messageTime rejection policy to show the realtime ingestion functionality to my team. I am not sure this can cause problems. Other than that everything is pretty standard. Please let me know as this will be big question from my time about data loss.