Delta updates comprising partial changes, multiple consumers

110 views
Skip to first unread message

Regunath Balasubramanian

unread,
Dec 9, 2013, 4:14:11 AM12/9/13
to netfli...@googlegroups.com
Hello Zeno team,

Thanks for making Zeno OSS and for the quality of documentation on the Wiki! I already use a few Netflix OSS like Hystrix, Curator and really like them.

I started looking at Zeno to build a change propagation system and have these questions:
  1. Snapshot vs Delta blob : From the Wiki, it appears we need to construct an instance of the complete object - both for Snapshot and for Delta. Lets say I have a Product object with a number of attributes. For Snapshot, I can construct the entire object from the data store (say a RDBMS store). However subsequent updates may be only for sub-set of attributes - say price. In order to create Delta blobs, I will need to create a complete Product object with just the price value changed?
  2. Granularity of Deltas: If my assumption on Delta blobs (as stated in point 1.) is correct, would I then require versioning ability in my source data store for every update - even if it is a single attribute change? The versions will be needed if I intend to propagate every change independently and not grouped into a single version change. For e.g. a Price Change will go out as a Delta entry and a Stock Change (inventory count) will be another. Is it possible to propagate such granular updates using Zeno?
  3. Slow vs. Fast Consumers: Lets assume I have multiple consumers for change/data propagation for a certain POJO types. Each consumer may consume at a different rate with the result that each consumer will need to start/resume from the last successfully applied state. Does Zeno provide ability for different consumers to check-point the last successfully applied state - say the last version number seen by a particular client?
As an aside, any thoughts on how Zeno differs from the LinkedIn Databus library - both appear to have overlapping capabilities. 

Thanks
Regu

Drew Koszewnik

unread,
Dec 9, 2013, 4:22:37 PM12/9/13
to netfli...@googlegroups.com
Hi Regu,

Thanks for your interest in NetflixOSS and Zeno!  Let me try to answer your questions in order:

1) I think your interpretation is correct, the idea is to apply your entire set of object data to the FastBlobStateEngine on each cycle.  Zeno will automatically calculate the differences between the data states, then you can serialize a delta which will contain the calculated differences.

This doesn't necessarily mean you have to read the entire data set from your source of truth and reconstruct everything for each cycle (although this is what we do).  Instead, for all or some of your data model, you can hold objects in memory on your "data origination server", then simply re-add them all to the FastBlobStateEngine on each cycle.

For your example, this would mean that you would update the price field for each Product, then add all of the Products to the state engine.  

2) The "delta" files are set of changes to apply to your complete in-memory data set to bring it up to date.  Hopefully you won't have to worry about the granularity of changes to blob consumers, these details should be transparent.

You won't require any versioning in your source of truth.  However, if you choose to use a persistent file store for your "blob" data (as we do), you'll probably want to be able to version your blob files.  

Your clients (blob consumers) will always try to get as up-to-date as possible, so each will always want to start with its current data state's version and load deltas until no more are available.  The way we do this is to name the delta files in our persistent data store with the version to which they should be applied.  Then the client logic is to always look for a delta blob file to apply to its current FastBlobStateEngine version.

3) The client logic we use is described here: https://github.com/Netflix/zeno/wiki/Blob-consumer-lifecycle.

I think each data state might be considered a "checkpoint" for your clients.  Imagine the data states you are producing are chained together via deltas.  A fast client may be at the state just before the latest, but a slow client may be a few states behind.  The fast client can get up to date by consuming a single delta.  The slower client will get up to date by consuming multiple deltas.

The only nod to blob transmission logic that Zeno provides is a version tag.  Zeno's FastBlobStateEngine contains a field "latestVersion".  On the server, set this field prior to producing blob files.  On the client, when you load a blob file, this field will be set to whatever it was set to on the server when the blob file was produced.  

Thanks for the pointer about LinkedIn Databus library; I'm actually unfamiliar with this.  I'll have to take a look at the details and get back to you.

Please let me know if I've answered your questions.  If you would like me to go into more detail about anything here, I'll be happy to, please let me know.

Thanks again,
Drew.

Regunath Balasubramanian

unread,
Dec 10, 2013, 2:20:23 AM12/10/13
to Drew Koszewnik, netfli...@googlegroups.com
Hi Drew,

Thanks a lot for the extensive answers on questions raised by me. I think I have a better picture of what Zeno does and how I may use it. Here are a few thoughts from my side based on your comments:

  •  The data store from which we want to capture change has a few million items. Reconstructing everything for each cycle would mean I can run it less often - maybe once a day. Given the need to push out changes in near-real time would mean I need to take the in-memory route you mentioned. However this would mean the code to run the data pull cycle i.e. 'data origination server' will need to be embedded in the application server that provides a Service Interface to our data store. That way I can write changes more often and only for those items that have indeed changed. Arguably, I can query the data store only for changed records (say based on something like last_updated_time field). The push model however is better as it can get our systems in sync sooner.
Btw, have you measured average delay in applying updates to consumers(within the same network to rule out wire delays) in any system with high velocity data changes at Netflix? What was the experience?

  • Your comment "Zeno will automatically calculate the differences between the data states, then you can serialize a delta which will contain the calculated differences" is very interesting one and a nice capability in Zeno. I assume I can derive information on updates to individual fields through this - is there an API interface (a listener that can be registered with the diff processor maybe) that I can use to listen in on the data state change?
Thanks
Regu

P.S : I have been experimenting a bit with LinkedIn Databus and here are a few proofs of concept on the key components : https://github.com/regunathb/aesop 



--
You received this message because you are subscribed to a topic in the Google Groups "Netflix Zeno Discussion Group" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/netflix-zeno/ynK2LpTUvSY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to netflix-zeno...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Drew Koszewnik

unread,
Dec 10, 2013, 3:33:23 PM12/10/13
to netfli...@googlegroups.com, Drew Koszewnik
Hi Regu,

How long does it take to pull in a few million items?  As long as you don't have a lot of random disk seeks or back and forth network calls, you should be able to stream a large amount of data to a single box pretty quickly.  If you stream everything in and add it to the FastBlobStateEngine, then immediately turn around and do it again, your "cycle time" will be bound by the length of time it takes to stream in all of the data.

Having said all of that, pushing the data updates as soon as they happen to the data origination server is a great idea.  I only briefly had a chance to investigate LinkedIn Databus.  It does seem like a compelling offering.  Is it a fair estimation to say that it boils down changes in a source of truth into a durable, ordered stream of events?  Would it make sense to use this to get change events to a "data propagation server"?

Once your Objects are all added to the FastBlobStateEngine, then I would expect it to take well under a minute to stream your "blob" artifacts, but would be very interested in hearing your results if you get a chance to try it out.  Your delta files will likely be small, so transferring them to your clients should be quite fast.  Once on the client, I would expect the application of the delta to your data set to take well under a minute, but again for both of these I'd be very interested to hear results here if you get a chance to try this out.  Your mileage will vary greatly depending on your data set size and rate of change.

The "blob" data propagation is built for data sets which can support some data propagation latency, but almost zero data access latency.  It makes the assumption that data on the client will be held as POJO Objects, and tries to minimize the resource impact of updating this data on the client.

Another member on our team actually asked for a way to listen for Object removals / additions today, too.  There is not a great way currently, but there will be a listener interface for blob consumers soon.  I'll update this thread when it becomes available.  Today, if you take all of the Objects in your data state and add them to Maps as described here:


You can derive the differences quickly by taking advantage of the fact that identical Objects will be the same instance between states.  From one state to the next, if two Objects are equal, you can compare them with ==.  During a data update, you should be able to run through the new and previous Maps for each of your types very quickly this way.

Thanks,
Drew.

P.S. Thanks for the link, I'm interested in hearing your thoughts on whether or not I understood LinkedIn's Databus offering correctly.  How easy or difficult would it be to create an adapter for a source of truth which is not backed by an RDB (e.g. Cassandra)?  Do all clients have to understand how the data is organized in the source of truth?

Regunath Balasubramanian

unread,
Dec 11, 2013, 5:58:23 AM12/11/13
to Drew Koszewnik, netfli...@googlegroups.com
Hi Drew,

- "Is it a fair estimation to say that it boils down changes in a source of truth into a durable, ordered stream of events?  Would it make sense to use this to get change events to a "data propagation server"?"
<Regu> : AFAIK, your inference on the ordered stream of events is correct. Durability is available at two points in this flow - the journal/write-ahead-log of the source database and the Bootstrap snapshot data store. The Relay is the "data propagation server" for change events. Any consumer may register and consume change events from a Relay.

I need to try out the Maps approach for determining what changed from snapshot and between cycles using Zeno. Thanks for pointing this out. Also good to hear that there are others who asked for the listener interface to the diff processor. 

Your understanding on the Databus is spot-on AFAIK (this disclaimer because I am figuring it out too). The EventProducer interface is generic and expects you to create 'change events' whose type is defined by your own custom Avro schema. Therefore you may theoretically create an adapter for any source - for e.g. to read from the HBase WAL (or something similar in Cassandra) so long as you can map multiple data mutations into one or more change events. Clients therefore need to understand/consume only change events and need not be concerned about how data is organized in the source system. IMO, creating the change events should be do-able for append-only data stores and RDB that capture details of each data mutation in a journal of some kind. 

Thanks & Regards
Regu

Drew Koszewnik

unread,
Dec 13, 2013, 2:03:40 PM12/13/13
to netfli...@googlegroups.com, Drew Koszewnik

Hi Regu,

I have added the ability to listen to specific changes which happen during a delta.  There is a new class:

TypeDeserializationStateListener

Extend this class and implement the methods removedObject and addedObject.  Then, on your FastBlobStateEngine in the client, register this listener with some type with:

stateEngine.setTypeDeserializationStateListener(String type, TypeDeserializationStateListener<T> listener)

After registering this listener, each object of this type which is removed will result in a call to removedObject with the removed Object as the parameter, and each object which is added will result in a call to addedObject (with the new object as the parameter).

This will allow you to take whatever action is necessary after an update in an incremental fashion, rather than recalculating the whole world.

You can update your Zeno to version to 1.5 to experiment with this (already available in Maven Central).  Please let me know if this helps you achieve your objective!

Thanks,
Drew.


Regunath Balasubramanian

unread,
Dec 16, 2013, 12:44:07 AM12/16/13
to Drew Koszewnik, netfli...@googlegroups.com
Hi Drew,

Thanks for updating this thread with the new feature. Appreciate it!
I will take a look at this capability and revert if I have any questions.

Thanks again.

Regards
Regu


Reply all
Reply to author
Forward
0 new messages