Any link to documentation or tutorial will be very much appreciated.
public static void Run_PollWithManualCommit(string brokerList, List<string> topics)
{
using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
// Note: All event handlers are called on the main thread.
consumer.OnMessage += (_, msg)
=>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
Console.WriteLine($"Committing offset");
var committedOffsets = consumer.CommitAsync(msg).Result;
Console.WriteLine($"Committed offset: [{string.Join(", ", committedOffsets.Offsets)}]");
};
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
// Raised on critical errors, e.g. connection failures or all brokers down.
consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");
// Raised on deserialization errors or when a consumed message has an error != NoError.
consumer.OnConsumeError += (_, msg)
=> Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");
//this is NOT called, when autocommit is disabled
consumer.OnOffsetsCommitted += (_, commit) =>
{
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");
if (commit.Error)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};
consumer.OnPartitionsAssigned += (_, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions);
};
consumer.OnPartitionsRevoked += (_, partitions) =>
{
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};
//consumer.OnStatistics += (_, json)
// => Console.WriteLine($"Statistics: {json}");
//The subscribe() method controls which topics will be fetched in poll.
consumer.Subscribe(topics);
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
while (!cancelled)
{
consumer.Poll(TimeSpan.FromMilliseconds(100));
}
}
}
private static Dictionary<string, object> constructConfig(string brokerList, bool enableAutoCommit) =>
new Dictionary<string, object>
{
{ "
group.id", "advanced-csharp-consumer" },
{ "enable.auto.commit", enableAutoCommit },
{ "bootstrap.servers", brokerList },
{ "default.topic.config", new Dictionary<string, object>()
{
{ "auto.offset.reset", "smallest" }
}
}
};