One event type per topic? Or multiple event types in a topic?

5,051 views
Skip to first unread message

James Cheng

unread,
Mar 12, 2015, 3:00:48 PM3/12/15
to confluent...@googlegroups.com
Hi,

I haven't had a chance to try out the recent release, but I wanted to ask. In http://blog.confluent.io/2015/02/25/stream-data-platform-2/, you recommend that for pure event streams, a single topic should contain only one type of event.

We experimented at various times with mixing multiple events in a single topic and found this generally lead to undue complexity. Instead give each event it’s own topic and consumers can always subscribe to multiple such topics to get a mixed feed when they want that.

By having a single schema for each topic you will have a much easier time mapping a topic to a Hive table in Hadoop, a database table in a relational DB or other structured stores.

Can you talk more detail about why you recommend this? What was the complexity that you encountered? And does the same recommendation apply to database changes? That particular quote was inside the "Pure Event Streams" section, and I wasn't sure if it applied more broadly.

Related: Do the Schema Registry and Confluent serializers enforce this suggestion, or can they be used to read/write multiple event types in a single topic?

Thanks,
-James

Ewen Cheslack-Postava

unread,
Mar 12, 2015, 4:06:05 PM3/12/15
to confluent...@googlegroups.com
On Thu, Mar 12, 2015 at 12:00 PM, James Cheng <jch...@tivo.com> wrote:
Hi,

I haven't had a chance to try out the recent release, but I wanted to ask. In http://blog.confluent.io/2015/02/25/stream-data-platform-2/, you recommend that for pure event streams, a single topic should contain only one type of event.

We experimented at various times with mixing multiple events in a single topic and found this generally lead to undue complexity. Instead give each event it’s own topic and consumers can always subscribe to multiple such topics to get a mixed feed when they want that.

By having a single schema for each topic you will have a much easier time mapping a topic to a Hive table in Hadoop, a database table in a relational DB or other structured stores.

Can you talk more detail about why you recommend this? What was the complexity that you encountered? And does the same recommendation apply to database changes? That particular quote was inside the "Pure Event Streams" section, and I wasn't sure if it applied more broadly.

You start losing a lot of the benefits of having the schema by having more than one. Maybe you start out only having two schemas for a topic. Your code has to handle both cases, which doesn't sound bad -- just one conditional. But the more you add, or the more you evolve the existing schemas (without enforcing compatibility since the topic supports multiple schemas), the worse this gets, until you end up having as much code dedicated to figuring out what format of data you're working with as you have code to actually process that data.

As alluded to in the post, it also makes it difficult when you want to map the data into another system that may have a less flexible data representation, even when you only have 2 schemas. Flattening a schema for a table-oriented data store usually isn't a problem. But if you have 2 schemas, what do you do? What if there's overlap in the names of some fields -- do you prefix all column names with the schema name or just let them overlap? What value do you use for schema B's columns when you're translating a record with schema A? Null may not be an option since it could be a valid value for schema B records. Do you have to add an extra column to record the original record type or should this be inferred based on the contents of the other columns?

That said, if you are going to use more than one schema per topic, it's at least better to have them all recorded in a schema registry where, for example, a developer could get a listing of all of them when writing code that uses that topic to verify they handle all the schemas in their code.
 

Related: Do the Schema Registry and Confluent serializers enforce this suggestion, or can they be used to read/write multiple event types in a single topic?

The default schema registry settings require that schemas are backward compatible, but you can opt out. So you can have multiple schemas, but not two that are entirely different. See http://confluent.io/docs/current/schema-registry/docs/api.html#compatibility for more details on the compatibility settings. To opt out, you would set the compatibility level to NONE. This can be done globally for all topic (http://confluent.io/docs/current/schema-registry/docs/api.html#get--config) or per topic (http://confluent.io/docs/current/schema-registry/docs/api.html#put--config-%28string-%20subject%29).
 

Thanks,
-James

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/E3A0EDA6-EC04-4F13-8CE6-C00E0FF51270%40tivo.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Felix GV

unread,
Mar 12, 2015, 4:26:00 PM3/12/15
to confluent...@googlegroups.com
You can also achieve this in a backwards compatible way with union types in avro, but that would still introduce a certain amount of complexity in downstream consumers.


For some use cases it's acceptable, but for others it may be more trouble than it's worth. You have to judge wisely...

That being said, I think it would be great to have a good standard first-class way of supporting control messages and other types of *ordered* mixed-schema streams (which makes the use of different topics impossible or at least unwieldy). Ideally, this would be done in a way where incompatible downstream consumers can just ignore (silently or not) the types of messages they do not care about. I'm afraid no standard solution for this exists at the moment, and it's not entirely clear whether this belongs in Kafka-proper or in the serialization layer on top.

--
 
Felix GV
Data Infrastructure Engineer
Distributed Data Systems
LinkedIn
 
f...@linkedin.com
linkedin.com/in/felixgv

From: confluent...@googlegroups.com [confluent...@googlegroups.com] on behalf of Ewen Cheslack-Postava [ew...@confluent.io]
Sent: Thursday, March 12, 2015 1:06 PM
To: confluent...@googlegroups.com
Subject: Re: One event type per topic? Or multiple event types in a topic?

Petr Novak

unread,
Feb 2, 2016, 12:11:12 PM2/2/16
to Confluent Platform
The lack of well formulated best practices about topic modelling is one of the main obstacles to adopt Kafka for us.

Another approach is to have one RAW topic everybody publish to with mixed schemas (or couple of fat raw topics for classes of producers). Then pre-process this RAW topic into well defined separate topics which are used by downstream clients, these topic can have just selection of data required to fulfil existing features. Pre-processed topics can be used for real-time analysis while the RAW topic can be eventually dumped into some schema agnostic storage for ad-hoc inspection (ElasticSearch should work for it).

Having topic per each event type has some drawbacks:

1) The interface for upstream (producing) clients is more complex (complex is probably too harsh term for this). They might have to properly configure tens or hundreds of topics. It is just easier to tell everybody there is just one topic to publish and we will sort it out.

2) Mixing topics is not deterministic. Another stream is generated when processing "online" and another one if processing later. It won't work for scerarions where order matter. From top of my head, e.g.:
 a) Searching for patterns across event types
 b) "Eventsourced" scenarios
 
As already mentioned there is one advantage for separate topics - enforcing schema compatibility. In our case our pipelines go down because producing clients change data schema. It is huge deal for us, hence we are more towards these guarantees. Maybe not one but couple of fat topics per data source with schemas with unions which can evolve would work for us - then process it to different topics as needed.

The performance tax is a question. For sure it depends on event type distribution. Mixing stream from many topics can be more expensive than separating one into many, or visa versa.

Solving the ordering issue for topics per event type probably require to introduce sequence number and then consume and mix topics based on it. Sequence number would have to encode producer. It doesn't look as a simple task to implement.

The problem is that there quite a bit of variables and it is hard to decide up-front, especially with limited experience with these patterns. 

Many thanks for any comment,
Petr
 

Alex Loddengaard

unread,
Feb 3, 2016, 1:09:24 PM2/3/16
to confluent...@googlegroups.com
Hi Petr,

You've highlighted many of the tradeoffs for having a fat raw topic or many topics for each event type. Here are a few others to consider:
  • Using a fat raw topic and processing that topic to break out each event type into its own topic will introduce an additional step and hence, increase end-to-end latency
  • Using a fat raw topic means that all events will exist in Kafka twice (once in the fat raw topic, and once in each event type topic), which will take up more disk space
  • Even in the implementation without one fat raw topic, you could dump the raw data into persistent storage. Of course, you'll need to do this for each topic, which has the same overhead as producers producing to event-specific topics.
I don't follow your second drawback to a topic per each event type. What do you mean by "mixing" topics? Do you mean concurrently consuming from more than one topic?

Alex

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Alex Loddengaard | Solutions Architect | Confluent
Download Apache Kafka and Confluent Platform: www.confluent.io/download

Petr Novak

unread,
Feb 5, 2016, 7:56:43 AM2/5/16
to Confluent Platform
Hi Alex,
thanks for valuable input.

Yes, I mean concurrent consuming from more than one topic and mixing it into one. But outside of cases when topic is partitioned according to next analysis (by a producer, by message id etc.) it probably requires to sort data before, e. g. pattern, analysis anyway. What I mean, having events START, UPDATE, STOP in different topic. When mixing it won't keep order. And when consumed later I would get completely different order, if there is one START, many UPDATE, and one STOP, the order can be END, START, many UPDATES.

And I assume it might affect re-processing as well because I believe mixed stream will be different on each run.

It might not be right use case for Kafka but I was thinking about searching for patterns online before it is stored into some storage.

Many thanks,
Petr

Petr Novak

unread,
Feb 16, 2016, 5:46:52 AM2/16/16
to Confluent Platform
I have a report from devs that reading from a single topic the throughput is stable. When converted to 20 topics and mixing them into one when consumed it is significantly more varied, e.g. nothing is send for 3 sec than all accumulated etc.. Is there a explanation? It is not that big problem, just that I was expecting smoother behaviour.

Many thanks,
Petr

cana...@gmail.com

unread,
Feb 16, 2016, 8:09:40 AM2/16/16
to Confluent Platform
Possibly  has to do with batching on the producer. When you combine messages in to one stream, you're likely not seeing pauses due to low volume data from one source. But when you split things apart, batching becomes more apparent. A low volume stream might only fill a batch (or timeout) every few seconds.

If it's a problem you can lower the batch size or batch timeout, but from our experience, this can increase cup utilization and increase network utilization due to less efficient compression. (assuming you are using snappy)

‎$0.02 CDN

Sent from my BlackBerry 10 smartphone on the TELUS network.
From: Petr Novak
Sent: Tuesday, February 16, 2016 5:46 AM
To: Confluent Platform
Subject: Re: One event type per topic? Or multiple event types in a topic?

I have a report from devs that reading from a single topic the throughput is stable. When converted to 20 topics and mixing them into one when consumed it is significantly more varied, e.g. nothing is send for 3 sec than all accumulated etc.. Is there a explanation? It is not that big problem, just that I was expecting smoother behaviour.

Many thanks,
Petr

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Petr Novak

unread,
Feb 24, 2016, 2:26:20 AM2/24/16
to Confluent Platform
Thank you for advice, it makes a lot of sense.

Regards,
Petr

 
Message has been deleted

Kishore Senji

unread,
Nov 22, 2016, 11:11:40 PM11/22/16
to Confluent Platform
Hi Ewen,

Don't you think the following schema also have similar issues with figuring out what format of data to work with (the conditionals that you mentioned), if one comes up with the below schema just to follow the one schema/topic suggestion and still needing to process all similar events (similar up to Header)? 

  record Header {
    union {null, Type} type;
  }
  record A {
    union {null, string} a;
  }
  record B {
    union {null, long} b;
  }
  record Event {
    Header header;
    union {null, A} a;
    union {null, B} b;
  }

In the above Event, Header will be common across events and some events will populate attribute a and other events only populate attribute b. The consumer will still need to have conditional logic to process attribute of type A and attribute of type B differently. 


Would you prefer the above single Schema (n attributes, and only one set) or would you break them in to individual schemas? Wouldn't individual schemas be better (even in the same topic so that a single consumer can still process such events and make some analytical sense out of them)

For example:

Event of Type A:
  enum Type {
    A, B
  }
  record Header {
    union {null, Type} type;
  }
  record Event {
    Header header;
    union {null, string} a;
  }

Event of Type B:
  enum Type {
    A, B
  }
  record Header {
    union {null, Type} type;
  }
  record Event {
    Header header;
    union {null, long} b;
  }


Schema compatibility can still be acheieved even with multiple schematized payloads in a single topic. The way to do it is to register these schemas under different subjects in Schema Registry and ask the Producers to pre-register before producing to Kafka. Schema registry will make sure that schema is backward compatible in evolution. Only after registering to Schema Registry, Producers can produce to Kafka. Producers don't use the confluent serializers to serialize (so that there is no topic to subject tying). They write the global id they get from Schema Registry (similar to confluent serializers). Consumers will do two phase parsing - First phase they deserialize up to Header. The core elements of Header will not change. The type specifically. Based on the Type of the Header, consumer will deserialize to a Specific type (the full payload and the specific type includes the Header too) if they are able to handle that Type. Schema evolution takes place independently across those multiple schemas by Schema Registry.


The other argument that the Consumers need to know all types is also not a big issue. Similar thing happens even with a single schema. If the consumer is not up to date and if the Producer is producing an optional element, the consumer would ignore that new optional attribute. Similarly if the consumer cannot handle a Type, the consumer would just ignore the payload after the header. The consumer can keep track of such events and try to upgrade the consumer logic to handle such types too.

Wouldn't this (having multiple schemas) be simple to reason about in this specific case (when we have to process, the different events which differ in the core data, together) than to put a bunch of attributes and look for the right attribute to process the data as I can see the conditional logic is still the same and single schema suggestion from Confluent is taken literally and being worked around with schemas like that. What do you think? Which one do you prefer?

Thanks,
Kishore.


Thanks,
-James

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/E3A0EDA6-EC04-4F13-8CE6-C00E0FF51270%40tivo.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Ewen Cheslack-Postava

unread,
Nov 23, 2016, 12:56:52 AM11/23/16
to Confluent Platform
Kishore,

You can absolutely implement custom approaches like this. They generally require writing your own custom serialization layers because, as you point out, you need a custom header parsing scheme. This is painful to implement for pretty much all common serialization formats since they don't support the partial-parsing scheme you're referring to. (In some cases you can make this work, e.g. protobufs, although it definitely won't always have good performance since it may require parsing the entire message even if you're only using the header.)

Note that this still offers limited support for compatibility. If you implement a different mapping of topics/message types -> schema registry subjects, you can still use the schema registry to implement this -- the schema registry is *not* strictly tied to either Kafka or the Confluent Avro serializers! However, you still need to be able to *parse and process* all those messages. When we talk about compatibility in Kafka, we mean compatibility between both producers and consumers. Of course on the producer side you can always block incompatible messages from making it into Kafka, but what you *mean* by compatible matters too. Is there really a benefit to being able to produce messages of multiple formats to a single Kafka topic if downstream consumers are just going to ignore a subset of them when they see message types they don't understand? If that's the case, you could use the schema registry as it is today and just turn off the compatibility requirements since downstream consumers won't be able to assume they will be able to *read and process* those messages anyway.

Re: the headers and payload formats you're proposing, there's an ongoing KIP discussion around headers that is relevant. https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers has the original proposal, as well as (most importantly) to the discussion thread on the mailing list.

-Ewen


Thanks,
-James

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/E3A0EDA6-EC04-4F13-8CE6-C00E0FF51270%40tivo.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/bc5f4be2-d44e-4f42-904c-0edb9c942f9c%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Kishore Senji

unread,
Nov 23, 2016, 2:00:29 AM11/23/16
to Confluent Platform
Thank you Ewen for your views. Good to know that this is a feasible approach. 

We are only using Avro which supports skipping elements from writer if reader doesn't have those elements. So the first phase parsing is really fast especially if Header is first element.
Our requirement is to store these events in to a single topic after sessionization (the process where events are ordered by timestamp for a given user) and the consumer needs to consume all events. Header is the common part across all events and the attributes after Header can be different based on different types of events. 

The argument that I got is that if we don't use one schema/topic, we are forgoing static type checking. ie. if there is only one schema/topic, we can generate the java classes from avro schema and be sure (if we use confluent serializers) that we always get an instance of that class back on deserialization. If it is multiple schemas / topic, then the consumer has to first deserialize to Header type and then de-serialize to more specific type (sort of instanceof check) if the consumer can handle the event (i.e not a new type introduced after the consumer became live); otherwise the consumer can only handle up to Header. This is valid argument; but it is also true for single schema if a new attribute is added and that attribute is actually set then the consumer with the old type will not see that value anyways as reader schema does not have it.

>Is there really a benefit to being able to produce messages of multiple formats to a single Kafka topic if downstream consumers are just going to ignore a subset of them when they see message types they don't understand? If that's the case, you could use the schema registry as it is today and just turn off the compatibility requirements since downstream consumers won't be able to assume they will be able to *read and process* those messages anyway.

No there is no benefit actually. My point was if a consumer is not able to handle a new Type (after Header) that was added after the consumer started processing messages, then that consumer would ignore the new Type and this is similar to the case in a single schema as well where in a old consumer would ignore a newly added optional element (say type c continuing the example above) until the consumer code is upgraded. The only difference is with one big schema, the consumer would know by looking at the schema what types the consumer has to handle (as they can see the attributes that form the schema) vs with multiple schemas, a consumer might inadvertently miss handling a Type as this is not as clear as when the consumer explicitly subscribes to multiple topics each with one schema like Confluent suggestion in the blog  (this can be taken care with proper documentation or the consumer can be made aware that it has to deal with all the types in the Header which is an enum which lists the current types)

And we don't want to turn off compatibility requirements; because that can cause a Producer to push a new version of schema which is not backward compatible.  

The two approaches (one big schema vs multiple schemas) are similar in terms of Consumer processing (ie we don't compromise on Schema evolution & type-safety) when a single consumer needs to process all message types; but I think the manageability/understanding of the single schema might be a bottleneck especially if there more than a handful of Types (like A, B..N) and each corresponding record managed by a separate team in a big organization. 

Thanks,
Kishore.

Thanks,
-James

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/E3A0EDA6-EC04-4F13-8CE6-C00E0FF51270%40tivo.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages