The case is actually simple: i am using redis as a recovery queue.
I read data from Kafka, putting each messages into Redis(what I call processing queue), and also add a expire lock on it:
pipeline.set(Helper.getProcessingRedisKey(signal.topic, key),message);
pipeline.setex(Helper.getLockKey(key), 60, Integer.toString(60));
then the message will only be removed once it is processed successfully in the downstream thread.
Then I have a daemon process, monitoring the processing queue, if any item does not have a lock on it,it means the downstream thread might have crashed. Thus I need to reprocess it.
Note that a day's data is divided into 10 minutes block, and each 10 minutes can contain at most 2million messages. Since one day has 24*6 = 144 blocks (of 10 minutes) I am using different prefix for each 10 minute block. The daemon wakes up periodically, say every 1hour, and check all the past 10 min block. For each block:
check the processing queue (scan) if any item does not have a lock on it, resubmit it.
So as you can see, in worst case the processing queue for a single 10 minutes block can contain as many as 2million messages without lock.(all downstream process died) But normally they would be only in 1000~10000. But again, what if the daemon dies, and restarts at the end of day: which means it will need to scan all the past 144 blocks. If each block contains 1k, then again it will be 144k messages.
Performance is my main concern. The prefix is this: "processing_Thursday_100:981568d7-4008-4ce9-8606-a55cb3e3a927"
processing_dayofWeek_No.OfBlock:messageId
Let me know what you think
Chen