[Feature Request] First-class support for headers in Kafka message format

432 views
Skip to first unread message

Roger Hoover

unread,
Mar 6, 2015, 2:07:53 AM3/6/15
to confluent...@googlegroups.com
Hi Confluent Team,

I really like what you guys are doing and if you'll allow me, I'd like to make a suggestion for improving the platform.

I feel that the current Kafka message format (magic byte + 32-bit schema id + message) is not flexible enough.  It assumes Avro, 32-bit schema ids, and leaves no room for any message-independent metadata.  There's a valuable tradition in message protocols of allowing arbitrary headers (e.g. HTTP, STOMP, AMQP, SOAP, MQTT, JMS, CoAP).  Since headers are common to all messages and independent of the message encoding and content, they allow you to implement common functionality at the framework level across all topics (routing, caching, security, tracing, and more).

Headers could allow the platform to support multiple encodings (like HTTP with Content-Type) and schema systems (Avro, Thrift, JSONSchema, Capt'n Proto, Simple Binary Encoding, FlatBuffers).  It doesn't mean that the support for all these schemes has to be built-in.  You can build an opinionated platform while still leaving room for other encodings and schema systems current and future.  I think heterogeneity may be an unavoidable reality for a data integration platform.

Many frameworks that people will want to build on this platform will need metadata and if there isn't a common way to do it, they'll all have to roll their own (likely incompatible) solutions.  Tears will be shed and hair will be pulled out on glue code to work around this.

If you wanted to build a system like Google Millwheel on top of Kafka + Samza, you'll need to handle event time, low watermarks, and windowing at a framework level, decoupled from the message encoding and message content.

Or if you want lightweight exactly-once messaging, you could create a framework that relies on metadata to remove duplicates (http://ben.kirw.in/2014/11/28/kafka-patterns/#repartitioning-and-deduplication).

These are just a few examples.  Metadata is a love note to the future**.  Can we please have a common metadata format for the Kafka ecosystem?

If you're with me so far, you might argue that headers belong in the Kafka message protocol.  I agree but maybe that ship had already sailed.  If so, then it needs to be done at the stream data platform level.

By first-class support for headers, I mean that they could support throughout the tool chain, including the REST api, Java API, and command line tools like kafka-avro-console-producer.  Just like curl lets me pass headers at the command line and all HTTP clients have a way of getting and setting headers.

A big decision point would be whether the metatdata should be human readable (like HTTP and STOMP) The message format could be

<version><headers><body>

Metadata could be text like HTTP + STOMP or binary for bandwidth concerns.


CoAP supports 4 types of options: empty, opaque, uint, and string. 

AMQP header types are much richer (too complicated?)

= BIT / OCTET
/ short-uint / long-uint / long-long-uint
/ short-string / long-string
/ timestamp
/ field-table


I think we could do something very close to an existing standard and rip out any unnecessary fields.  It does leave a question of which fields should be required, if any.  I'd argue that at the message format level schema id should not be required.  It can be enforced one level up so that some people could have schema-less messages but still have metadata.

The one field that you might be able to argue for as required is event time.  Maybe I'm wrong but don't all messages have an event time???

Thanks for your consideration.

Cheers,

Roger

Felix GV

unread,
Mar 6, 2015, 2:49:50 PM3/6/15
to confluent...@googlegroups.com
Hi Roger,

I think this is an interesting topic, with a few different opinions and tradeoffs...

First of all, it's possible to implement (almost) everything you said within the avro schema and payload. This is commonly done with things like header records nested inside every schema. The advantages I can think of with this approach are that:
  1. Serialization is simple, since it all works the same way (i.e.: magic byte, schema ID, payload). This may also make it easier for anyone to leverage the header-type data.
  2. Avro allows reading just the fields you're interested in, so people who don't care about your extra fields need not be impacted by them (in terms of performance at least).

Disadvantages are:

  1. It is a bit unforgiving for people who choose other serialization formats (i.e.: non-avro). This problem, like any other, is solvable.
  2. It conflates application needs with infrastructure needs. This has a few implications:
    1. App developers need to be somehow coerced into providing everything the infra needs (time, guids or other fields) even if they don't care about these things. Definitely doable, but is it ideal?
    2. Infra people need to trust that app developers are putting sane data in their infra-specific fields (i.e.: perhaps the time field must be UTC server-time, not some client browser's time or a different timezone). Again, doable, but perhaps not ideal.
    3. If the infra people need to add a new field for every topic, this requires evolving every single registered schema, which seems a bit unwieldy, considering that app developers do not necessarily care about the added field. This should happen infrequently, but it does seem to me like a symptom of conflated concerns.
As for supporting headers (whether it is done within a standardized avro nested record, or outside the main payload, within the "envelope"), I think that would be super useful for examples such as those you mentioned, but I think we need to consider where we need to stand in the compact vs flexible tradeoff axis. Arbitrary headers would be most flexible but would likely require header names to be specified in each message. This can compress well in batches of messages, but would be overhead in single-message passing (or small batches) use cases. On the other hand, a protocol could be defined where certain specific headers are specified as necessary (or optional) in certain topics, and then those could be encoded in a very compact way (essentially, that would be like having a schema for the envelope, just like there is a schema for the payload).

I think it's worth noting that all of these changes, if deemed necessary, could be adopted under a different magic byte, meaning that the current implementation could work side-by-side with another.

-F

Roger Hoover

unread,
Mar 8, 2015, 5:50:54 PM3/8/15
to confluent...@googlegroups.com
Felix,

Thanks for the very thoughtful response.  You highlighted the issues well.  The main sticking point for me is requiring all application schemas to include infrastructure fields.  This is awkward, unnecessarily burdensome and, as you said, a sign of conflated concerns.

For anyone concerned about arbitrary headers being too bulky, they could leave the headers empty and pay no penalty.  Essentially, this would default back to magic byte 0x0 behavior.

Roger

Roger Hoover

unread,
Mar 8, 2015, 6:15:50 PM3/8/15
to confluent...@googlegroups.com
I'm not advocating this b/c Kafka already supports batch compression but just as an FYI, HTTP 2 spec defines a compression scheme for headers.  

Jun Rao

unread,
Mar 9, 2015, 12:05:30 PM3/9/15
to confluent...@googlegroups.com
Roger, Felix,

Thanks for the feedback. Yes, there are definitely fields that are infra related (e.g., for monitoring the pipeline). Some of the infra fields are likely needed in every use case (e.g., timestamp). Some other infra fields may be needed for a specific use case. The former probably should be supported at the platform level to avoid everyone redoing the same work. The latter probably can be done through a customized serializer. One of things that we have to think through is that some of the infra fields can also be useful for application as well. A good example is timestamp. So, we need to think through whether to include those just in the header or duplicate them in both the header and the encoded application data.

We do plan to support some infra related fields in the Confluent platform in the future.

Thanks,

Jun

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/176cbadc-7b12-4f8d-9d88-4295355d0544%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Roger Hoover

unread,
Mar 9, 2015, 12:12:23 PM3/9/15
to confluent...@googlegroups.com
Thanks, Jun.

I was imagining that infrastructure fields could be made available to the application in a similar way that HTTP, AMQP, + other libraries expose headers to the application layer if it needs it.  Samza, for example, exposes a message envelope API to the application so that if the application needs access to the metadata, it can get it from the envelope.

Cheers,

Roger

Jun Rao

unread,
Mar 9, 2015, 2:36:35 PM3/9/15
to confluent...@googlegroups.com
Roger,

Other than timestamp, what other infrastructure fields do you think are common?

Thanks,

Jun

Roger Hoover

unread,
Mar 9, 2015, 3:07:20 PM3/9/15
to confluent...@googlegroups.com
Jun,

Here are some of the ones I was thinking about:

event_timestamp/timestamp: the datetime of the original event

created_timestamp: the datetime that the message was created (may be different from event time when there are multiple hops and transformations/enrichments)

tenant: the "owner" of the message in multi-tenant environment.  Even within an organization, there may be multiple tenants (ops, product, marketing teams).

msg-id:
   - Since REST API producers may use different HTTP connections when retrying failed requests, adding UUIDs to each message allows duplicates to be detected and removed downstream.
   - This field might also be used for global ordering (System Change Numbers) in database replication use cases.

For environments have to support multiple types of encodings and schemas:

content-type: to indicate the data encoding (application/mtconnect+xml, application/octet-stream, application/soap+xml, avro+binary, application/x-protobuf, application/x-thrift, application/x-avro-binary, application/json)

msg-schema-id: use "bytes" type so message ids can be of arbitrary length?

Cheers,

Roger


Roger Hoover

unread,
Mar 17, 2015, 4:32:18 PM3/17/15
to confluent...@googlegroups.com

A couple other use cases where custom headers are needed and should not be tied to message content.

1) If you use flume-ng as a source and Kafka as a sink, you need a way to pass metadata along with messages (agent-host, environment [prd/qa], etc.). Flume-ng has a notion of headers and they have to be mapped to a Kafka message wrapper that supports headers (in an interceptor). If the Confluent Platform would support optional custom headers, then you ship an generic Flume-ng interceptor that formats messages for Kafka in a compatible way with Confluent Platform.

2) This change data capture project for MySQL uses Avro with schema id similar to Confluent Platform but needs additional metadata like "mutation type". Again, if we had a flexible message envelope that allowed for custom headers, a single format could accommodate this use case as well.
https://github.com/mardambey/mypipe#kafka-message-format


I created a Github issue in case it helps keep track of this thread:  https://github.com/confluentinc/schema-registry/issues/153


Cheers,


Roger

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

James Cheng

unread,
Jun 8, 2015, 5:29:46 PM6/8/15
to confluent...@googlegroups.com

Resurrecting this old thread.


A number of people have pointed out that this can be implemented using the current avro schema, by either adding headers to the object itself, or implementing an "envelope" that contains the headers, and having the original object now be a payload within the envelope.


There is one instance where this does not work: when using delete tombstones in a log compacted topic. Since delete tombstones are created by writing a message with a null payload, there is no place to add headers of any sort. 


You could instead invent an envelope that represents a delete. For example, an envelope with headers but with a null payload could be used to represent a delete, and an application layer can interpret that as a delete. But Kafka will not interpret that as a delete tombstone, and so the delete envelope will stay around forever. If that topic has a large number of deletes, then the topic could get very large. It's possible to work around this with some sort of cleanup job, but there are some race conditions there.


This also seems related to https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!topic/confluent-platform/XQTjNJd-TrU. If there was a standard way to add headers, then it would also be possible to have a topic with mixed-schemas (which is required to guarantee ordering across mixed-schemas). Instead of "schema id in message + fixed schema type" for a topic, you could instead have "schema id in message + schema type in a header". Downstream consumers who are receiving that topic would be able to filter at the "type" header to only receive certain types of messages, or trigger off the "type" header to take different actions depending on message type.


I realize this might start to get complicated. Delete tombstones might then have to respect a header field, which doesn't sound very elegant.

Anyway, I wanted to see if you have any thoughts on the "headers on delete tombstones" issue.

Thanks,
-James Cheng

Jun Rao

unread,
Jun 11, 2015, 2:55:22 PM6/11/15
to confluent...@googlegroups.com
James,

Could you explain a bit why a header is needed during a delete? Isn't it enough for deletion to just specify a key? It seems that customized header is only needed when you have inserts and updates.

Thanks,

Jun

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/f1703348-201b-40ab-8d42-584e8df0f21d%40googlegroups.com.

Félix GV

unread,
Jun 11, 2015, 5:11:56 PM6/11/15
to confluent...@googlegroups.com
It would be nice to have auditing visibility into all types of messages, including deletes.

If Kafka was extended to support pluggable compaction strategies, this could be achieved in a fairly clean way, no? These pluggable strategies should be stateless in order to maintain the same performance characteristics as regular compaction, but it seems like it could solve some interesting use cases...

-F

James Cheng

unread,
Jun 11, 2015, 6:02:01 PM6/11/15
to confluent...@googlegroups.com
On Jun 11, 2015, at 11:55 AM, Jun Rao <j...@confluent.io> wrote:

James,

Could you explain a bit why a header is needed during a delete? Isn't it enough for deletion to just specify a key? It seems that customized header is only needed when you have inserts and updates.


Jun,

Here are some use-cases that I'm considering (considering, but not yet implemented).

I'm working on change data capture for MySQL->Kafka (similar to BottledWater). MySQL change events are of type insert/update/delete, and each has some common metadata associated with it that I (may) want to replicate to Kafka. For example, here is a MySQL write:

primary key "userId = 1"
--------------- message start -----------------
BinlogEventStartPosition: 8288
BinlogFilename: mariadb-bin.000008
BinlogTransactionStartPosition: 8173
BinlogTimestamp: 1231231213

{"name":"James","userId":1}
--------------- message end ------------------

Those headers could be useful for monitoring progress. They could be used for figuring out where to restart replication after a crash, instead of using a separate mechanism to checkpoint my source offsets. They could be used to implement exactly-once semantics, using the techniques described in http://ben.kirw.in/2014/11/28/kafka-patterns/.

If I were replicating deletes, I would want those headers for the exact same reasons. 

Another example, if I had a job that was merging two log compacted topics into one, and wanted to add info that said which source topic they came from:

topic cluster1.userInfo:
delete for userId = 1
topic cluster2.userInfo:
delete for userId = 20

Merging into a topic:

topic allClusters.userInfo:
delete for userId = 1 (header "Cluster" = 1)
delete for userId = 20 (header "Cluster" = 2)

Thanks,
-James


James Cheng

unread,
Jun 11, 2015, 6:17:26 PM6/11/15
to confluent...@googlegroups.com
On Jun 11, 2015, at 3:01 PM, James Cheng <jch...@tivo.com> wrote:


On Jun 11, 2015, at 11:55 AM, Jun Rao <j...@confluent.io> wrote:

James,

Could you explain a bit why a header is needed during a delete? Isn't it enough for deletion to just specify a key? It seems that customized header is only needed when you have inserts and updates.


Jun,

Here are some use-cases that I'm considering (considering, but not yet implemented).

I'm working on change data capture for MySQL->Kafka (similar to BottledWater). MySQL change events are of type insert/update/delete, and each has some common metadata associated with it that I (may) want to replicate to Kafka. For example, here is a MySQL write:

primary key "userId = 1"
--------------- message start -----------------
BinlogEventStartPosition: 8288
BinlogFilename: mariadb-bin.000008
BinlogTransactionStartPosition: 8173
BinlogTimestamp: 1231231213

{"name":"James","userId":1}
--------------- message end ------------------

Those headers could be useful for monitoring progress. They could be used for figuring out where to restart replication after a crash, instead of using a separate mechanism to checkpoint my source offsets. They could be used to implement exactly-once semantics, using the techniques described in http://ben.kirw.in/2014/11/28/kafka-patterns/.

If I were replicating deletes, I would want those headers for the exact same reasons. 


I admit, there are alternate ways of doing things that wouldn't require this. I could, for example, checkpoint my offsets and include this source information in the checkpoint. I would then do my monitoring and restart according to the information in the offsets, as normal kafka consumers do. It wouldn't be exactly the same, though, since my monitoring and restart granularity would be limited by how frequently I checkpoint.

But anyway, those are some use cases that would be enabled if we had first-class support for headers.

-James
 

Jun Rao

unread,
Jun 12, 2015, 12:49:31 AM6/12/15
to confluent...@googlegroups.com
James,

Thanks for the info. The question is whether the delete message will eventually go away or not. If it does, then you can't rely on the metadata associated with it to always be present. So, such metadata can only be used as an optimization, but not as something required.

Jun 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages