How to accomplish Kafka (json) => HDFS (avro)

1,928 views
Skip to first unread message

Johan Rask

unread,
Apr 4, 2018, 6:06:17 AM4/4/18
to Confluent Platform
Hi,

We are currently successfully using kafka-connect to transfer data from kafka(json) => hdfs(json).

However, we would like to store data in hdfs as AVRO instead so we would like to transform our json into avro before storing on hdfs.
We assumed this would be pretty simple but we are not able to understand how to accomplish this.
Could anyone help us out to get us in the right direction? It seems like something that should be simple.


There is a discussion from last year regarding json => parquet but that did not give any clarification.

Here Gwen Shapira wrote the following but that does not help us much ;-)
 JSON->Avro conversion requires a 
provided schema or something to do inference, which is more 
complicated. "



This is our current configuration. 

connect-distributed.properties

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

connector config

  "config": {
    "partitioner.class" : " io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partitioner.field.name" : "meta.kafka.topic",
    "partition.duration.ms" : "3600000",
    "path.format" : "'year'=YYYY/'month'=MM/'day'=dd",
    "timestamp.extractor" : "RecordField",
    "timestamp.field" : "@timestamp",
    "locale" : "sv_SE",
    "timezone" : "UTC",
    "format.class": "io.confluent.connect.hdfs.json.JsonFormat",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "4",
    "topics": "internal.dv.accesslogs",
    "hdfs.url": "...",
    "name": "hdfs-sink",
    "flush.size": "500000"
  }

Regards /Johan Rask

Robin Moffatt

unread,
Apr 4, 2018, 6:42:44 AM4/4/18
to confluent...@googlegroups.com
This is easy with KSQL :) 

Here's a dummy topic, in JSON: 

$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql_users
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}

In KSQL declare the source stream, specifying the schema: 

ksql> CREATE STREAM source (uid INT, name VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

Now create a derived stream, specifying the target serialisation (Avro) and the target topic (this is optional; without it will just take the name of the stream): 

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> CREATE STREAM target_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='mysql_users_avro') AS SELECT * FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

Check out the resulting Avro topic: 

$ kafka-avro-console-consumer \
                   --bootstrap-server localhost:9092 \
                   --property schema.registry.url=http://localhost:8081 \
                   --topic mysql_users_avro --from-beginning
{"UID":{"int":1},"NAME":{"string":"Cliff"}}
{"UID":{"int":2},"NAME":{"string":"Nick"}}

That resulting topic you can pass through Kafka Connect to land to HDFS in Avro format. Because KSQL is a continuous query, any new records arriving on the source JSON topic will be automagically converted to Avro on the derived topic. 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b8006133-9c54-47f1-88c0-ac9ba7760790%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Johan Rask

unread,
Apr 4, 2018, 6:53:25 AM4/4/18
to Confluent Platform
Thanks a lot for your answer, this could definately work for us.

BUT, I still think that this should work using kafka-connect only so I would be happy to get input on that as well.

/Johan
To post to this group, send email to confluent...@googlegroups.com.

Ewen Cheslack-Postava

unread,
Apr 5, 2018, 1:26:37 AM4/5/18
to Confluent Platform
The main problem with JSON -> Avro is that JSON is schemaless while Avro requires strict, structured schemas.

Connect doesn't currently support adding a schema out of the box, but I wrote some POC code to do schema inference: https://github.com/ewencp/kafka/commit/3abb54a8062fe727ddaabc4dd5a552dd0b465a03 This single message transform (http://kafka.apache.org/documentation/#connect_transforms) is enough to allow you to, inline, take schemaless JSON and pass it to a connector (e.g. HDFS with Avro output) that expects a schema to be present.

There are limitations -- it is inferring schemas per-record, for instance, so in the case you have data in JSON with structure that changes (e.g. if you were performing an upgrade and the new version included a new field, but older version did not), you'll see the schema changing back and forth. However, in some cases this might be ok.

That transform is still just in a branch because it is incomplete. I also intended to add a mode that instead of inferring a schema would *impose* a schema, selecting a set of fields, converting any data types necessary, etc. That variant could be useful in cases where you want a single schema but want to make any incoming data conform to it.

That approach would be Connect-only, but you need to make some tradeoffs -- the transformations aren't currently built-in (i.e. you need to compile and deploy yourself) and there are constraints (use whatever schema is inferred, which could be arbitrary, or restrict to a single and force data to map to it). If those constraints work for you, this type of single message transform might be the right solution for you.

-Ewen


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Johan Rask

unread,
Apr 5, 2018, 8:21:24 AM4/5/18
to Confluent Platform
Hi again Robin,

We finally got it working after running into a lot of problems, 
I had to build master to get it up, ran into some issues that were solved but not released.
We have @ in our json and also nested json which we had to deal with.

BUT after some struggling it is up. Now I will have to see how this is properly deployed next to our kafka-connect cluster. Have some reading todo.

Regards /Johan

Den onsdag 4 april 2018 kl. 12:42:44 UTC+2 skrev Robin Moffatt:
To post to this group, send email to confluent...@googlegroups.com.

Johan Rask

unread,
Apr 5, 2018, 8:23:43 AM4/5/18
to Confluent Platform
Thanks Ewen, 

Now I understand better the issues. So it is recommended to put AVRO on kafka and then convert it to json IF needed?
Would that make more sense?

Regards /Johan
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Johan Rask

unread,
Apr 5, 2018, 8:47:28 AM4/5/18
to Confluent Platform
Just to clarify... what I was talking about was ksql ;-)

/Johan

Andrew Otto

unread,
Apr 5, 2018, 10:08:08 AM4/5/18
to confluent...@googlegroups.com
Wow Ewen, this looks awesome.  I only briefly looked over this code, but iiuc, this would be used by setting something like a transforms=SetSchema and maybe a few other SetSchema specific configurations.  Then, each JSON record would somehow be passed to inferSchema, which would then get us a good ol’ Connect SchemaAndValue object which can then routed to any connect sink.  Something like  that?

I will def try this, although it might take me a couple of months to free up some time.  Thanks so much for this!


  

To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

Robin Moffatt

unread,
Apr 5, 2018, 10:18:19 AM4/5/18
to confluent...@googlegroups.com
Hi Johan, 

What issues did you have with getting KSQL running? With Confluent Platform 4.1 (due soon) it will be much easier as it will ship with Confluent Platform itself. 
If you do encounter any problems, you can raise an issue on github ( https://github.com/confluentinc/ksql/issues/new) and/or join the Confluent Community Slack group https://slackpass.io/confluentcommunity and get help on the #ksql channel there too

thanks, Robin. 




Hi again Robin,

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Johan Rask

unread,
Apr 6, 2018, 4:10:45 AM4/6/18
to Confluent Platform
Hi,

All issues I ran into where already reported and fixed but not released yet. Some where not fixed but had workarounds.
I had to do some minor modifications on our raw data to get it up and running.

We are using 4.0 version now. I will have to read up on how I can properly package and deploy a ksql "app".

At first I felt that it sucks to have a another "moving part" just top do the json -> avro conversion but at the same time ksql opens up
a new range of really cool possibilities once we have it in place :-)

Regards /Johan




thanks, Robin. 




Hi again Robin,

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Andrew Otto

unread,
May 18, 2018, 4:04:41 PM5/18/18
to confluent...@googlegroups.com
Ok, just tried to make this work.  I have never really used Connect before (in any customized way), but I can’t seem to get SetSchema to run as a SMT.

I’m getting 

java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 1 error(s):
Invalid value class org.apache.kafka.connect.transforms.SetSchema for configuration transforms.SetSchema.type: Error getting config definition from Transformation: null
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`


I don’t know what I’m doing here at all though!  My connector.properties are:

name=set-schema-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink
topics=webrequest_text
transforms=SetSchema
transforms.SetSchema.type=org.apache.kafka.connect.transforms.SetSchema


webrequest_text contains json messages.


connect-standalone.properties has

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false


​Which I think is correct, but I’m not sure.

I think I’m close, but maybe i’m missing a transforms.SetSchema configuration?



On Thu, Apr 5, 2018 at 1:26 AM, Ewen Cheslack-Postava <ew...@confluent.io> wrote:
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

Andrew Otto

unread,
May 23, 2018, 11:59:13 AM5/23/18
to confluent...@googlegroups.com
Ah ha!  Still figuring this out, but for reference, the reason I couldn’t get this to work was that I did not specify that I wanted to use the transform for the record value.  I needed to set:

transforms.SetSchema.type=org.apache.kafka.connect.transforms.SetSchema$Value

Andrew Otto

unread,
May 24, 2018, 1:29:49 PM5/24/18
to confluent...@googlegroups.com
 SetSchema works, amazing!  Am trying it in some different contexts, and I’m getting some weird behavior with the kafka-connect-hdfs connector.  I’m using version 1.1.0.  I do

transforms=SetSchema,SetSchemaMetadata

and set the schema name and version (in order to hopefully use hive integration).

But, I cannot write Avro or Parquet to HDFS if I don’t pre-flatten the data.

For ParquetFormat, I get:
java.lang.IllegalArgumentException: Avro schema must be a record.

For AvroFormat, I get
org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault

If I use the Flatten SMT, these work fine.

I suspect the problems are related, but I am not sure.  I am pretty stumped as to how io.confluent.connect.avro.ConnectDefault is even showing up.  I only see this assigned as the Avro schema name in AvroData.toConnectSchema.  However, I don’t think AvroData.toConnectSchema should ever be called, since we aren’t converting from Avro to Connect.  Your SetSchema SMT infers the Connect schema from the JSON record(s).  

I suspect the Avro issue (and possibly the Parquet one too?) is coming from the fact that Avro doesn’t like records defined with the same name, even if they are nested, and somehow AvroData is assigning the same DEFAULT_SCHEMA_FULL_NAME to every Struct field.  Again, I’m not sure how, since AvroData.toConnectSchema shouldn’t be executed in this case.


Is this something you’ve seen before?  I’d make this into a proper Github Issue, buuuut, SetSchema is just POC :)

-Andrew Otto

Ewen Cheslack-Postava

unread,
May 31, 2018, 1:46:51 PM5/31/18
to Confluent Platform
I haven't run into that issue, but DEFAULT_SCHEMA_NAME is also used as a fallback in AvroData.fromConnectSchema when the incoming schema doesn't define a name. So I think what is likely happening is that differences in the data result in slightly different schemas (e.g. a good example would be an optional field, which on a per-record basis, if the field is null, we have no way to know what type it is supposed to be), but they are falling back to the same schema name (which Avro requires for records), and then the Avro library is choking because it sees different schemas with the same name.

Not sure there's a great solution to this -- this is kind of just the limitation of not having the schemas in the first place. One fix would be to update the SetSchema SMT to also support specifying the schema directly instead of inferring it (which I intended to support, but have not gotten around to adding). That would avoid the issue where you end up with different schemas. The only other alternative I can think of is to make the SetSchema SMT create a schema name automatically, e.g. based on a hash of the schema or something, such that the schema names wouldn't collide. The downside to doing that is that you obviously won't get useful names using that approach, they'll just be gibberish.

-Ewen

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Andrew Otto

unread,
May 31, 2018, 1:58:07 PM5/31/18
to confluent...@googlegroups.com
I was able to get around some of the issue with:


Buut, not totally sure if that is the right thing to do.  But in general I agree.  I’m considering implementing a JSONSchema -> Connect Schema Converter/or Transform (not sure which yet).  Possibly a JSONSchemaConverter that extends from JsonConverter?  TBD… :)

Thanks a lot for this Ewen, I’ve been able to understand a lot more of how I could use Connect internals to do what I need.

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAE1jLMOr5uk9TmxvfChbnwDQquvz%3Dmt7QuthngqjoDX7qUUnxA%40mail.gmail.com.

Ewen Cheslack-Postava

unread,
May 31, 2018, 2:52:57 PM5/31/18
to Confluent Platform
Regarding an extended version of JsonConverter, you might want to take a look at the discussion around https://cwiki.apache.org/confluence/display/KAFKA/KIP-301%3A+Schema+Inferencing+for+JsonConverter which was headed in a similar direction (and will encounter similar issues and challenges).

Also, aside from an extended version of Converters, I do think adding the schema SMT (with more functionality filled out) would be a good addition to Kafka, I just haven't had time to do the KIP and round out the functionality.

-Ewen

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Andrew Otto

unread,
May 31, 2018, 4:03:35 PM5/31/18
to confluent...@googlegroups.com
Wow, thanks!

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAE1jLMPqDZiExNSK5xQ4M9%2BO5ZLqdds90KCTkAPZWxrTuXJZRA%40mail.gmail.com.

Philip Schmitt

unread,
Sep 15, 2018, 6:49:41 AM9/15/18
to Confluent Platform
Hi Ewen,

inferring the schema from pure JSON is interesting but, might it be error-prone – for example when inferring a schema from an array of objects.
I would rather want to pre-register a schema in the schema registry and I’d want a connector to always output that schema, even if some nullable attributes are missing.
Then, there wouldn’t be any edge cases where inferring the schema is problematic, and you would not need to define the schema in some connector configuration – the schema would only be in the Schema Registry.

I am wondering if it might be possible to fetch a specified schema from the Schema Registry and convert the Connect data or JSON data by writing a new AvroConverter or a transformation.


I’m not that familiar with the Connect and SchemaRegistry classes, but it looks like there may already be a lot of helpful methods in there.

* the SchemaRegistryClient can fetch a specified Avro schema
* it looks like AvroData.toConnectSchema() can transform the schema to a Connect schema structure – not sure if that works for any Avro schema or if there are some limitations?
* the JsonConverter can take has a method convertToConnect() that can take the connect schema and a JsonNode and build a Connect object.
* the Connect schema and Connect value could then be converted to Avro by the default AvroConverter?

Do you think that could work?



For context, I have a case where we use the JDBC source connector to read a table where one column contains a JSON string. That is "{ attribute1: 'abc', attribute2: 123 }".
If we used the default Avro converter, the result would have a String attribute with the JSON.
But I want to "unwrap" that string and embed the object as part of the Avro schema.
So I would like to convert the data into JSON first: { column1: 'foo', column2: { attribute1: 'abc', attribute2: 123 } }
And then turn that JSON intro Avro so that it matches a specified schema from the Schema Registry.


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.





3

Philip Schmitt

unread,
Oct 12, 2018, 2:03:21 AM10/12/18
to Confluent Platform
I got some feedback from Randall Hauch in the Confluent Slack Community on how one might go about converting my JSON data to Avro:

I have seen SMTs that "inline" JSON into a Connect `Struct` with a schema. In your example, you'd need a Connect `Schema` object fro the `payload` field, and the SMT infers that schema from the JSON document structure. The trick is that inferencing, and whether the SMT can do that without consulting any other components (like SR). You may have business constraints that dictate what that schema is (or the variations of the schema that are used in practice), which makes this a bit easier.

 

A best practice for SMTs is that they don't talk to another service. This may be a gray area, however, since an SMT could talk to an external system (e.g., SR) only once initially or very infrequently (when it detects a new structure it hasn't seen). That might be acceptable. But you definitely do NOT want an SMT consulting an external service more frequently than that.

 

Of course, another approach is to use a Converter. Confluent's Avro Converter, for example, does talk to the SR when it comes across a schema it doesn't know about, and this may range from very infrequent (if the same schemas are used over and over) or more frequently (if schemas evolve frequently). This is also a valid option, and if I went with this option I'd create a custom Converter that instantiated and wrapped another Converter (e.g., Avro) for the actual (de)serialization logic, and my customer Converter could focus on the new logic. One benefit of this approach is that it'd wouldn't require extra configuration logic: the AvroConverter requires the SR URL, which your custom logic could use as well.

 

(Why not extend the `AvroConverter`? That is an option, but its method and design are not public APIs, and and those methods may change at any point, potentially breaking your custom implementation. By wrapping it you constrain yourself to using only the public API, which will be safe.)

 

Given the above, I'd probably start with the Converter approach, since that seems to make it simpler to use: really, just change the `value.converter=io.confluent.connect.avro.AvroConverter` property to refer to your new class.

 

Another reason is that you seem to imply that you're only using the AvroConverter, which makes your custom Converter a little easier since it can probably assume it's wrapping the AvroConverter. If really necessary, though, you could pass in the name of the wrapped Converter implementation class, but that gets a little more complicated.

Andrew Otto

unread,
Oct 15, 2018, 9:58:41 AM10/15/18
to confluent...@googlegroups.com
Oh hm, btw, a while ago I implemented a JSONSchema based ConnectConverter:

It is mostly a prototype, but works for our use case. :)



To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.





3

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Gwen Shapira

unread,
Oct 15, 2018, 12:33:48 PM10/15/18
to Confluent Platform
This is really cool :)
I hope one day it will be in good shape to contribute to the connect hub :)


For more options, visit https://groups.google.com/d/optout.


--
Gwen Shapira
Product Manager | Confluent
650.450.2760 @gwenshap
Follow us: Twitter | blog

Eric Pheatt

unread,
Jan 24, 2019, 2:56:07 PM1/24/19
to Confluent Platform
I have a similar issue where my legacy producer for a topic is publishing CDC style event without an inline schema with the "patch" (using https://github.com/flipkart-incubator/zjsonpatch)  and resultant "doc" after the patch was applied within the event that I need to now consume with the JDBC Sink in addition to the current S3 Sink that can send the events as is for a simple Hive ETL process I currently use:

{
"doc":{"Id":111,"Name":"test","new1":"stuff"},
"patch":[{"op":"add","path":"/new1","value":"stuff"}]
}

I had created a Morphlines based POC for Transform/Sink and ran into similar issues to dynamically apply an external schema to json to get it into avro using a custom command https://github.com/epheatt/kafka-connect-morphlines/blob/master/src/main/java/com/github/epheatt/kafka/connect/morphlines/EnrichJsonBuilder.java#L227 to update the structure needed for sending in the necessary format avro via the Rest Proxy to an interim topic that I'd like to simplify and make more performant with an inline conversion. 

If I understand this thread I should be able to just wrap the kafka JsonConverter to rename the doc to payload, drop the patch attribute, call the SchemaRegistry for the predefined subject and inline the expected schema to the following structure:

{
"schema":{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"Id","type":["null","string"],"default":null},{"name":"Name","type":"string"},{"name":"new1","type":["null","string"],"default":null},{"name":"missing","type":["null","string"],"default":null}]}
"payload":{"Id":"111","Name":"test","new1":"stuff"}
}

When I wrap the calls the super methods (to/from connectData) all I need to do is fill in the missing defaults and union structures as I am doing in my Morphlines EnrichJson command so that the payload is translated to for the JsonConverter to work as expected with the schemas.enabled=true to use this with the standard JDBC Sink that needs a schema?

{"Id":{"string":"111"},"Name":"test","new1":{"string":"stuff"},"missing":null}

I'm planing to eventually upgrade the producer use the schema registry and emit avro directly but I have a need use the JDBC Sink in the short term before I will have a chance to do all of the necessary testing to directly rely in the schema registry in the upstream service.

Thanks,
Eric

Eric Pheatt

unread,
Jan 29, 2019, 6:18:17 PM1/29/19
to Confluent Platform
Hi Ewen,

I created a POC (https://github.com/epheatt/kafka-connect-jsonavroschema) for my specific near-term use case by extending the open source JsonConverter and wired in calls to the Schema Registry and some of the helper methods in AvroConverter/AvroData classes to take the latest Avro Schema, convert it to a JsonSchema via the Connect Schema and coerce the payload somewhat to more closely match the expected structure/format.  While this will likely meet my needs till I can make the switch to publishing directly with Avro with the Schema Registry, is there a better way to do this sort of schemaless conversion/transform/coercion from a consumer only approach (assuming pre-registered schemas) with out of the box Sinks like JDBC prior to SetSchema SMT or the Schema Inference KIP being generally available? 

Thanks,
Eric
Reply all
Reply to author
Forward
0 new messages