checkTopicPartition code question

123 views
Skip to first unread message

Jean-Pascal Billaud

unread,
Dec 22, 2014, 2:44:02 PM12/22/14
to secor...@googlegroups.com
Hey,

I am looking at the uploading policy and I would like to have more details around this piece of code from which I see the logs every so often:

    private void checkTopicPartition(TopicPartition topicPartition) throws Exception {
          [...]
           } else if (newOffsetCount > lastSeenOffset) {  // && oldOffset < newOffset
                LOG.debug("last seen offset " + lastSeenOffset +
                          " is lower than committed offset count " + newOffsetCount +
                          ".  Deleting files in topic " + topicPartition.getTopic() +
                          " partition " + topicPartition.getPartition());
                // There was a rebalancing event and someone committed an offset beyond that of the
                // current message.  We need to delete the local file.
                mFileRegistry.deleteTopicPartition(topicPartition);
            } else {  // oldOffsetCount < newOffsetCount <= lastSeenOffset
                LOG.debug("previous committed offset count " + oldOffsetCount +
                          " is lower than committed offset " + newOffsetCount +
                          " is lower than or equal to last seen offset " + lastSeenOffset +
                          ".  Trimming files in topic " + topicPartition.getTopic() +
                          " partition " + topicPartition.getPartition());
                // There was a rebalancing event and someone committed an offset lower than that
                // of the current message.  We need to trim local files.
                trimFiles(topicPartition, newOffsetCount);
            }
        }
    }

Would it be possible for somebody to elaborate in which case any of these 'else' conditions might happen? My understanding (from the comment) is that if the kafka (controller?) is doing some rebalancing then we might have two consumers (from the same consumer group) that would carry data on disk from the same partition temporarily... even though a single one of them will be active from the partition standpoint hence a single of them is taking control of the partition offset and therefore is receiving data. In this case the inactive consumer needs to trim some of its data to prevent data to be exported twice to S3.

Is this what we are talking about here?

Thanks,

Pawel Garbacki

unread,
Dec 23, 2014, 12:56:24 PM12/23/14
to Jean-Pascal Billaud, secor...@googlegroups.com
Reply inline.

That's right. Rebalancing will happen each time there is a membership change in the consumer group. Since rebalancing may change the ownership of topic/partitions, it may be required for Secor to modify the local buffer of messages consumed to date to accommodate the new consumer allocations. This mechanism is indeed intended to prevent message duplication.
 
Thanks,

--
You received this message because you are subscribed to the Google Groups "secor-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to secor-users...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jean-Pascal Billaud

unread,
Dec 23, 2014, 1:48:28 PM12/23/14
to secor...@googlegroups.com, j...@tellapart.com
In that case such corner case should happen fairly rarely... Obviously this mechanism could be very racy though I am assuming the whole thing is secured by ZookeeperConnector.lock(lockPath) in UploadFiles() at which point the final commit happens.

Thanks for your answer.

Pawel Garbacki

unread,
Dec 23, 2014, 2:00:12 PM12/23/14
to Jean-Pascal Billaud, secor...@googlegroups.com
Yes, offset reset should happen fairly rarely. I don't expect many race conditions here since this logic is invoked only when if the upload policy conditions are satisfied (i.e., the local buffer has reached a certain age or size). It would be rather uncommon for those conditions to flip at once on multiple consumers.

Praveen Kumar

unread,
Feb 6, 2015, 4:28:29 PM2/6/15
to secor...@googlegroups.com
Hi Pawel,

I understand we do this to protect against rebalancing, but I fail to understand a scenario when the trimming of files is required. Can you please throw some light here?

The way I see it, 

If there are 2 consumer threads (C1 & C2) , 1 topic with 2 partitions (P1 & P2), 

T0) In a stable system, lets say C1 -> P1 and C2 -> P2
T1) C1 dies/GC pauses, and there is a rebalance, results in C2 -> P1, P2 
      In this case, C2 does not have any state of P2 hence creates a new file all is good.
      C1 could hold a local offset "x" in memory in the mean time if the rebalance was due to GC pause and not the thread dying. If the thread was dead, it loses all local offset state.

T2) Case 1 - C1 Died at T1
      At T2 when C1 comes back into the system, again there is a rebalance and C1 -> P1, C2 -> P2
      now C1 starts consuming P1 from the ZK offset and nothing new needs to be done.
T2) Case 2- C1 GC Paused, C2 did not update ZK offset
     At T2 when the GC pause ended, and the consumer is recognized again and a rebalance kicks in, C1 -> P1, C2 -> P2
     Here again, C1 starts consuming P1 from the ZK offset, hence the condition of (ZK offset != localoffset)[MessageWriter.adjustOffset) will kick in delete the earlier file, and start writing a new file from the last ZK offset.  
     All is well.
T2) Case 3 - C1 GC Paused, C2 did update ZK offset
      Again in this case, the same (ZK offset != localoffset)[MessageWriter.adjustOffset) logic will kick in and delete the earlier file, and new consumption happens from the new ZK offset. (which is again the right thing to do). 

So it seems to me that all consumer rebalance cases are handled already by the MessageWriter.adjustOffset() Condition.

In the end to end test,  move_offset_back_test, I see the trim code kick-in, but that can be explained because there is no real-rebalance in the test, all this test tries to do is just randomly move the ZK offset back. Since there was no-real rebalance triggered, the consumer's local offset isn't updated(refetched from ZK by kafka consumer), and the consumer doesn't know that the ZK state was updated, it happily consumes messages until the upload logic is kicked in, where we detect the ZK value is different and do a trim. But does this apply to a situation.
In the test, when re-setting the ZK offset, if we try to restart secor, the trim doesn't happen. 

Am I missing something obvious.

Would really appreciate if you could explain the specific scenarios when trim happens.

Thanks a lot,
Praveen

Pawel Garbacki

unread,
Feb 6, 2015, 5:29:10 PM2/6/15
to Praveen Kumar, secor...@googlegroups.com
Sure thing. Here is a scenario with trimming:

T0) C1->P1, C1->P2 (there is only one consumer at this point). C1 consumes P2 offsets from 0 to 100 but it does not get a chance to commit them.
T1) C2 (a new consumer) joins the system. C1->P1, C2->P2. C2 starts consuming P2 from offset 0. It commits offsets 0-50 and then dies.
T3) C1->P1, C1->P2. Now C1 has to start consuming P2 from offset 50 while it stores locally offsets 0 to 100. Trimming kicks in.

--

Praveen M

unread,
Feb 6, 2015, 6:32:57 PM2/6/15
to Pawel Garbacki, secor...@googlegroups.com
Ok, i will go with your same example.

T0) C1->P1, C1->P2 (there is only one consumer at this point). C1 consumes P2 offsets from 0 to 100 but it does not get a chance to commit them.
T1) C2 (a new consumer) joins the system. C1->P1, C2->P2. C2 starts consuming P2 from offset 0. It commits offsets 0-50 and then dies.[ZK offset is now 50]
T3) C1->P1, C1->P2. Now C1 has to start consuming P2 from offset 50 [at this point, the local offset stored is 100, and the next offset of message is 50 - messageWriter.adjustOffset kicks in right? - this is called on every message] while it stores locally offsets 0 to 100. Trimming kicks in.

at T3, like i mentioned above if messageWriter.adjustOffset kicks in, it will delete the existing file for the Consumer. mFileRegistry.deleteTopicPartition().

Am i still missing something?
--
-Praveen

Pawel Garbacki

unread,
Feb 6, 2015, 7:24:14 PM2/6/15
to Praveen M, secor...@googlegroups.com
+secor-users

I somehow dropped the group alias.

On Fri, Feb 6, 2015 at 4:18 PM, Pawel Garbacki <paw...@gmail.com> wrote:
You assume that messageWriter.adjustsOffset runs for that topic/partition before the upload policy which I don't think is necessarily true.

Praveen M

unread,
Feb 6, 2015, 10:09:38 PM2/6/15
to Pawel Garbacki, secor...@googlegroups.com
Ah..I understand now, my bad, i know adjustOffset get kicked in on every message. I missed it that it could be for some other TopicPartition mapped to the same consumer thread.

Anyways, the reason i primarily asked about the whole process is, 

In the case of SequenceFiles we store the offsets with each message in the file (as sequence files has a key and a value). However, in the case of DelimitedTextFiles, (and potentially other files we might write - say, Parquet/Avro) there is no context of a key with each record. This means, we will have to store the offset within the record itself. In the case of a GzipDelimitedTextFile, it'll be hard (and also quite hacky). 

Trimming as I see it, is the only reason we need to store the offset on the file for every record. Hence, I was hoping to ignore writing the offset, and whenever we need to trim - just delete the file instead, this is what happens in the case of adjustOffset() being called for a message of the TopicPartition anyways. 

If you look at the existing code, trimming will not work properly for the DelimitedTextFileReaderWriter case, because in the case of partitioning your output files (by using timestamp/some field) messages offsets could be written out of order, and I don't store the offset of each record with the record. 

So, from your explanation. It seems like trimming is just an optimization, to (trim and) upload an already written local file. In the event of me deleting the file nevertheless instead of trimming. The consumer should be able to recreate the file from the last committed ZK offset, and still honor the only once message guarantee of secor. Is that right? 

I'm just thinking it's much easier for me to do that than try to maintain offsets in other filereaderwriters than sequence files. 

Do let me know.

Thanks,
Praveen
--
-Praveen

Pawel Garbacki

unread,
Feb 7, 2015, 2:09:47 AM2/7/15
to Praveen M, secor...@googlegroups.com
Actually, there is more to storing offsets with the message than trimming. Namely, they provide a way to verify the output. In fact, at Pinterest we have a nightly job that verifies the continuity of the offsets written to a day partition. On top of that, we obviously do offset verification in the end-to-end tests.

Interesting that you mention the issue with DelimitedTextFileReaderWriter. I missed it in the review. It looks to me like a design oversight as the concept of keys is flawed in the case of this file format.

Praveen M

unread,
Feb 9, 2015, 11:43:07 AM2/9/15
to Pawel Garbacki, secor...@googlegroups.com
I understand, It sure is valuable to have the offset. This got me worried as I realized I might have muddled with the correctness of the files written in the case of mixing file partitioning and delimited text files.

Do you have a suggestion of how we can fix this? I think this can extend to other new file reader/writers anyone might want to add to secor. We @uber are looking into a parquet reader/writer. While we can solve this for us locally through our message wrappers, a global solution will be ideal. 

Let me know.
--
-Praveen

Pawel Garbacki

unread,
Feb 10, 2015, 2:10:59 AM2/10/15
to Praveen M, secor...@googlegroups.com
We have several options:
(i) enforce that keys (offsets) are written properly for every file format. The annoying bit here is that for many of the formats this would require schema changes,
(ii) make writing keys optional and remove the trimming logic as you suggested before. I didn't think this scenario through and so I'm not sure if there any any corner cases here.

At Pinterest we currently use sequence files only so I would like to hear from people working with other formats how they feel about (i) vs (ii). Personally, given the types of logs we store with Secor, I wouldn't be able to sleep at night unless I had a way to verify the output. Maybe others prefer simpler schema over extra metadata. Honestly, option (ii) is more flexible so if can remove the trimming logic without compromising the correctness of the algorithm, we should probably do that. Then the more paranoid among us can still implement file formats storing offsets next to the message payloads. What do you think? Would you like to take the task of thinking through the corner cases and removing the trimming?

Praveen M

unread,
Feb 10, 2015, 1:19:41 PM2/10/15
to Pawel Garbacki, secor...@googlegroups.com
Hi Pawel,

Thanks for your input. To give a background of how we are going about this @uber.

Our old logs are all DelimitedText formatted(and pretty much quite unstructured data). I originally added the DelimitedTextReaderWriter to not break compliance. 

That said, we are moving into using Parquet as it's well suited for OLAP.
So, we're looking into adding a ParquetReaderWriter. 

While, looking into the ParquetReaderWriter, I realized trimming() would be a problem if we don't store offsets and will break the only once guarantee. That said, I really like storing the offsets with the message for the reasons you highlighted. All the data for us going fwd, will have schemas and we can easily put the offset in a record envelope. 

So, with that background here is what I plan to do.

1) Run the DelimitedTextWriter for the older topics without trimming (so that I can provide the only once guarantee).

2) Run ParquetWriter with trimming() as we will be storing offsets.

So, I'm leaning more towards ii) from your options. Remove the trimming logic and let the people decide for themselves if they want to store offsets/not. 

Yes. I can help look into the corner cases, as I already did a run down to understand trimming() recently. Feel free to open an issue on it. I'll take it.

--
-Praveen
Reply all
Reply to author
Forward
0 new messages