Why Commit is required after reading message off a topic for Consumer, Confluent .NET Client

1,051 views
Skip to first unread message

pingpongo...@gmail.com

unread,
Mar 18, 2018, 8:00:50 PM3/18/18
to Confluent Platform
I am new to Kafka, I want to understand why Commit() is required (either auto/manual commit), after reading messages off a topic (either using Poll() or Consume() ).

Any link to documentation or tutorial will be very much appreciated.

Below is the modified code via .NET using Polling combined with manual CommitAsync() based on the codehttps://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/AdvancedConsumer/Program.cs

  public static void Run_PollWithManualCommit(string brokerList, List<string> topics)
        {
            using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
            {
                // Note: All event handlers are called on the main thread.

                consumer.OnMessage += (_, msg)
                    =>
                {
                    Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
                    Console.WriteLine($"Committing offset");
                    var committedOffsets = consumer.CommitAsync(msg).Result;
                    Console.WriteLine($"Committed offset: [{string.Join(", ", committedOffsets.Offsets)}]");
                };

                consumer.OnPartitionEOF += (_, end)
            => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");

                // Raised on critical errors, e.g. connection failures or all brokers down.
                consumer.OnError += (_, error)
                    => Console.WriteLine($"Error: {error}");

                // Raised on deserialization errors or when a consumed message has an error != NoError.
                consumer.OnConsumeError += (_, msg)
                    => Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");

                //this is NOT called, when autocommit is disabled
                consumer.OnOffsetsCommitted += (_, commit) =>
                {
                    Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");

                    if (commit.Error)
                    {
                        Console.WriteLine($"Failed to commit offsets: {commit.Error}");
                    }
                    Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
                };

                consumer.OnPartitionsAssigned += (_, partitions) =>
                {
                    Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
                    consumer.Assign(partitions);
                };

                consumer.OnPartitionsRevoked += (_, partitions) =>
                {
                    Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
                    consumer.Unassign();
                };

                //consumer.OnStatistics += (_, json)
                //    => Console.WriteLine($"Statistics: {json}");

                //The subscribe() method controls which topics will be fetched in poll.
                consumer.Subscribe(topics);

                Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");

                var cancelled = false;
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cancelled = true;
                };

                Console.WriteLine("Ctrl-C to exit.");
                while (!cancelled)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }

       private static Dictionary<string, object> constructConfig(string brokerList, bool enableAutoCommit) =>
            new Dictionary<string, object>
            {
                { "group.id", "advanced-csharp-consumer" },
                { "enable.auto.commit", enableAutoCommit },
                { "auto.commit.interval.ms", 5000 },
                { "statistics.interval.ms", 60000 },
                { "bootstrap.servers", brokerList },
                { "default.topic.config", new Dictionary<string, object>()
                    {
                        { "auto.offset.reset", "smallest" }
                    }
                }
            };


I have gone through the link below, but am still confused. 



Message has been deleted

Ewen Cheslack-Postava

unread,
Mar 19, 2018, 7:49:27 PM3/19/18
to Confluent Platform
If I understand your question correctly, it's just due to Kafka's general design (and not specific to .NET client). Messages aren't erased from Kafka when you read them, so somewhere in the system we have to track where in the stream of messages the consumer is. And you want this to be tracked in a way that if you kill your process and restart, you pick up where you left off.

Kafka has a mechanism for storing this information (i.e. committed offsets). However, the broker will not just update this based on you fetching data. The reason is that would risk losing/not processing data. If you had the broker update it as soon as you fetched the data, your process might crash or machine die and you might not actually process that message. When the app came back up, it would start farther along than the actual processed messages. So the client indicates to the broker explicitly when it is done processing the data.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/dbad1328-c1ab-4760-aa6c-88fc34a64d59%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Message has been deleted

pingpongo...@gmail.com

unread,
Mar 20, 2018, 5:34:16 PM3/20/18
to Confluent Platform
Hi Ewen,

My reply yesterday was deleted. I can see your reply is not shown here either. 

pingpongo...@gmail.com

unread,
Mar 20, 2018, 5:38:08 PM3/20/18
to Confluent Platform
Hi Ewen,

After reading your comment posted early morning 20 March, I cannot find the answer. To reiterate my previous post, I re-posted here, because it was deleted for some reason by someone else.

Am I missing something here? I disabled the lines in red below, and run Consumer again, it can still remember the CORRECT offset. That is,correct offset is used using Poll() method with manual commit without calling CommitAsync().

          consumer.OnMessage += (_, msg)
                    =>
                {
                    Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
                    //Console.WriteLine($"Committing offset");
                   // var committedOffsets = consumer.CommitAsync(msg).Result;
                   // Console.WriteLine($"Committed offset: [{string.Join(", ", committedOffsets.Offsets)}]");
                };

For example, below is the test

Step 1 Run Consumer to consume one message, 
Step 2 kill it
Step 3 re-start it
Step 4 it can still remember the correct offset each time after it re-start like below

D:\myStudio2\Kafka\confluent-kafka-dotnet\examples\AdvancedConsumer>dotnet run PollWithManualCommit localhost:9092 Advanced
Subscribed to: [Advanced]
Ctrl-C to exit.
Assigned partitions: [Advanced [0]], member id: rdkafka-e4310a50-4621-4c6b-9a26-73fc984b7072
Topic: Advanced Partition: 0 Offset: 0 hello
Reached end of topic Advanced partition 0, next message will be at offset 1


Below is the result of running Producer, which shows the offset 0

D:\myStudio2\Kafka\confluent-kafka-dotnet\examples\AdvancedProducer>dotnet run localhost:9092 Advanced

-----------------------------------------------------------------------
Producer rdkafka#producer-1 producing on topic Advanced.
-----------------------------------------------------------------------
To create a kafka message with UTF-8 encoded key/value message:
> key value<Enter>
To create a kafka message with empty key and UTF-8 encoded value:
> value<enter>
Ctrl-C to quit.

> hello
Partition: 0, Offset: 0
>

Any idea?

Matt Howlett

unread,
Mar 20, 2018, 6:07:09 PM3/20/18
to confluent...@googlegroups.com

if enable.auto.commit is set to false, and you are not manually committing offsets, then the consumer won't remember offsets between restarts.

if you post a complete minimal program demonstrating the problem as a github issue, we can either point out the problem or look into it further. note that there are a number of integration tests that cover this, so i'm pretty sure there shouldn't be a problem... I couldn't see what you might be doing wrong from the code posted.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/Hn5EaKAz63Y/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

pingpongo...@gmail.com

unread,
Mar 20, 2018, 7:47:52 PM3/20/18
to Confluent Platform
Thanks Matt,

The program and my code listed at the beginning can remember offsets between restarts, which contradicts what you said.

I have created a question on github for it.

Thanks.
Reply all
Reply to author
Forward
0 new messages