On this article, I share my experiences on undergoing a paradigm shift from Batch based to RealTime stream based processing over the course of building a realtime data enrichment pipeline which performs join & lookup in real time.
Being horizontally scalable also helps as the cluster can simply be plugged-in with new machines to give direct scaling without changing the code or manually changing the way data is stored in nearly all of the scenarios. You can check out my previous article where i wrote about scaling and parallelism in general.
Kafka is a tool developed by LinkedIn which was designed to provide a persistent storage layer for all streaming needs. It works on the pub-sub model where producers produce messages into various Topics, and consumers read the messages by subscribing to those topics. Lets breakdown its official moniker, the Distributed Commit Log..
We just saw how Time plays a vital part in the windowing operation where we can group elements based on the time. But the notion of time can be different from different perspectives. Most of the modern streaming applications support 2 notions of time.
Now for some reason, the cancellation service which is responsible for stopping the billing service slows down. It could be due to network issues or due to back pressure because a lot of users are cancelling their subscription a few moments before midnight causing a massive load spike. The cancellation service is unable to process some of the messages in time & by the time it has written the cancellation request on a Kafka Topic, the time is 12:01, falling into the next day
Though the billing service receives the cancellation request eventually, it got it late by a couple of minutes, which technically is the next day compared to when it was originally generated. This causes the billing system to charge the user for one more day even though he cancelled the subscription before midnight.
This problem happened because the billing service was considering the processing time of the cancellation request, which is the time the message was written into the kafka topic or the time it was processed by the billing service. So even though the user cancelled his service at 11.55, it was written to the topic at 12.01 due to lag or latency leading the billing service to consider this time & compute incorrect results. If it were to consider the event time which is 11.55, it will consider the time when the message was generated, ie time when the user unsubscribed from the service, leading to correct results.
Now to run the app according to Event-time, the messages need to have the event time info embedded either in the message header or in the body of the message itself, which can be later extracted by the flink job responsible for billing. This could be as simple as a transaction timestamp.
Borrowing qualities like High availability and Failover Handling from their Batch processing brethren, all of the streaming frameworks restart their Executors / TaskManagers in case of any outages. In any of the scenario, a hardened & fault tolerant system can easily shrug off node failures or even outages of the cluster Master. Most of the advanced streaming app like Flink or Google DataFlow offer 3 types of guarantees when it comes to message processing after the system recovers from a failure.
At-Least-Once processing is when the system guarantees that the messages will be processed once under normal execution. and more than one times when recovering from failure. The point to note here is that the system might process some messages twice because it sends Kafka Commits only after the data is processed. So if the system crashes after picking a message from kafka but before processing and committing that record, it will pick that record back again when it restarts. Now this usually does not impact the business performance of the app, its just that unnecessary computations are done for the data which was already processed and perhaps even persisted but not committed.
We just saw that under At-least-once semantics, messages might be processed twice. This however leads to consistency issues. We briefly mentioned that the producers and consumers use Kafka as a storage layer. The Producer relies on acknowledgements to guarantee that the message has been written on Kafka.
The Consumers rely on offsets which are maintained by Kafka to track their current position in a message stream. This could be compared to a Database cursor which points to the current row, and would give the next row when advanced. Under default configurations Kafka uses auto-commit. When a consumer fetches messages, it will send a commit request for those messages only when it requests the next message batch. The interpretation is that the consumer is done processing the current batch when it requests the next batch, so Kafka internally moves the comittedOffset pointer to mark those records as committed and provides the consumer the next batch of messages to process.
Idempotency means replaying or performing the action wont lead to change of result. The outcome of an idempotent operation would be the same if it is executed single or multiple times. This comes in handy when a kafka consumer running on a flink node is resurrected after a failure.
Ordering any concurrent operation in a distributed environment is an extremely difficult thing to do. This is precisely the reason why most of the multithreaded algorithms take order out of the requirements due to its unpredictability. A lot of efforts programmatically will have to be poured into developing, designing & maintaining such a system. So our first goal here would be to eliminate the need of ordering entirely.
There is one more configuration which needs to be tackled. max.in.flight.requests.per.connectionneeds to be set to 1. Quoting the definition from the actual kafka documentation ,
Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.
This means, when we have message retries enabled [which is, nearly always] we might have a scenario where the producer sent 2 message batches one after the other without waiting for an acknowledgement for batch one to finish. If batch one fails but batch two succeeds, the producer will after some time, retry batch one which will succeed now thus reversing the order of messages written on the topic [Batch 2 followed by 1].
In any distributed application be it batch or streaming, most of the functionality provided will be via the code encapsulated in various MapFunctions, ProcessFunctions [Reduce, Join, Aggregations], Filters etc. The common feature in all these is that the code is run on a cluster in a separate JVM & these methods are given the input elements and they in turn do the computation on each message.
Inspite of all the things I just covered, what we have seen here is just scratching the surface. The world of streaming is constantly evolving & I firmly believe we have lots more to come. I will definitely be writing much more about streaming in the coming future. Hopefully this article will help those who are undergoing the paradigm shift like me ?
LinkedIn and 3rd parties use essential and non-essential cookies to provide, secure, analyze and improve our Services, and to show you relevant ads (including professional and job ads) on and off LinkedIn. Learn more in our Cookie Policy.
To troubleshoot performance issues in streaming replication with PostgreSQL, follow these steps to configure it properly. This will allow you to monitor and diagnose any problems that may occur effectively. Here is a step-by-step guide: