Using a source file connector to feed content to a Kafka topic - format question

1,947 views
Skip to first unread message

Nir Sharony

unread,
Jun 7, 2016, 9:13:53 AM6/7/16
to Confluent Platform
Hello,

I would like to have append data to one or more source files and have a Kafka file source connector listen to to such file and then feed the content to a specific Kakfa topic.
My questions are:

File lines format
What should be the format of the data in the file? I am using Avro for my data. 
The format should such that the connector will be able to read it properly and correctly push it into the Kafka topic with the right schema.
I am assuming that each line in the file should include both the data and the schema.

assume that this is my schema + data:
{
  "value_schema": {
    "type": "record",
    "name": "Demo",
    "fields": [
      {
        "name": "id",
        "type": "int"
      },
      {
        "name": "name",
        "type": "string"
      }
    ]
  },
  "records": [
    {
      "value": {
        "request_id": 1234,
        "name": "john"
      }
    }
  ]
}

Should each line in the file be like this?
{"value_schema":{"type":"record","name":"Demo","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"}]},"records":[{"value":{"request_id":1234,"name":"john"}}]}

I have tried it and it seems that the format is not valid because the connector doesn't seem to be able to add this to the topic (I read off the topic and don't get new content)
Any ideas?

Multiple file connectors
Is it possible to have multiple file source connectors and will read an entire folder, each processing a separate file and writing the file contents to a Kafka topic?

Thank you,
Nir

Ewen Cheslack-Postava

unread,
Jun 7, 2016, 7:04:03 PM6/7/16
to Confluent Platform
Are you talking about using FileStreamSourceConnector? Or implementing your own? The one included with Kafka is intentionally simple and does not do any special parsing of each line of the file. It just reads it and publishes it as a string.

If you want more complex, structured data you'd need a connector that can parse each log line and generate the right schema/data. The structure you show is one option if you have that much control over your log file format, and isn't too far from what JsonConverter already uses as an envelope when the schema is included. Although I would recommend trying to include a schema, it is also worth pointing out that the schema is not *required* and you could publish JSON data directly without a schema from your connector.


Multiple file connectors
Is it possible to have multiple file source connectors and will read an entire folder, each processing a separate file and writing the file contents to a Kafka topic?

See above comment -- the file connector isn't really intended to be a full-fledged log collector. There's a syslog source listed on connectors.confluent.io, you might see if that might fit your needs. If not, there have been previous discussions about how a complete log file connector might work. I'd search around a bit to see if anyone has made one public yet.

-Ewen
 

Thank you,
Nir

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/a6717ad5-e268-4adf-a9d4-02c30e2589e6%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

gerard...@dizzit.com

unread,
Jun 8, 2016, 2:43:50 AM6/8/16
to Confluent Platform
You could take a look at Camel, as part of a poc I read local (xml) files, put them in a schema using castor, and produced them to a kafka cluster. Not sure if that is overkill for what you want to use it for. I assume you could also use Camel to read .json files and put them in an avro generated object.

Thank you,
Nir
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen

Nir Sharony

unread,
Jun 8, 2016, 4:34:21 AM6/8/16
to Confluent Platform
Thanks Ewen,

You mentioned JsonConvertor. 
Is this a connector that I should use on top of a file stream connector or can it be used to as a sink connector to read content of an input file directly?
Can I feed it with JSON that includes both the data and the schema? If so, can you show me what the format will be?
So far I have not been able to find proper documentation on JsonConvertor

Thanks,
Nir
Thanks,
Ewen

Nir Sharony

unread,
Jun 8, 2016, 6:45:42 AM6/8/16
to Confluent Platform
Hi Ewen,

You also mentioned a "previous discussions about how a complete log file connector".
Any chance you might recall how to find it? So far I came up empty-handed in my searches.

Thanks,
Nir

On Wednesday, June 8, 2016 at 2:04:03 AM UTC+3, Ewen Cheslack-Postava wrote:
Thanks,
Ewen

Ewen Cheslack-Postava

unread,
Jun 8, 2016, 12:43:48 PM6/8/16
to Confluent Platform
I think https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/kafka-clients/vhc_-GXuVLM/GUR-GNxXFAAJ might be one of the threads I'm remembering. Basically you need to track multiple files, you need to handle log rolling that does file renaming, etc. From there, the specific parsing of log lines that you do is up to you.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Ewen Cheslack-Postava

unread,
Jun 8, 2016, 12:47:47 PM6/8/16
to Confluent Platform
On Wed, Jun 8, 2016 at 1:34 AM, Nir Sharony <nsha...@gmail.com> wrote:
Thanks Ewen,

You mentioned JsonConvertor. 
Is this a connector that I should use on top of a file stream connector or can it be used to as a sink connector to read content of an input file directly?
Can I feed it with JSON that includes both the data and the schema? If so, can you show me what the format will be?
So far I have not been able to find proper documentation on JsonConvertor


Converters and connectors are different components. Converters deal with converting from the byte[] stored in Kafka to Kafka Connects generic data API. This includes format-specific serialization (like JSON or Avro). Connectors are designed to work with a generic runtime data API so they are decoupled from the details of serialization and can work with different serialization formats without any effort on the part of the connector developer.

In your example, usually a connector would parse whatever is in the file itself and return it using the data API (http://docs.confluent.io/3.0.0/connect/javadocs/org/apache/kafka/connect/data/package-summary.html). If each line happens to be JSON, then you might use Jackson to parse it and then convert it to the data API. The Connect framework doesn't restrict the format of data in the file at all -- that's entirely up to the connector developer.

-Ewen
 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Martin Tyler

unread,
Jun 10, 2016, 5:55:35 AM6/10/16
to Confluent Platform
Hi. I'm new to this list and new to Kafka, just started a PoC.

I've run into the same 'issue' myself... my understanding, after digging around a bit, is that the file source connector will just read strings, newline delimited. Simple as that. The slight confusion comes when you see the JsonConverter in the config and look at the contents of your Kafka topic. This JsonConverter is just wrapping the string from the file with a header/envelope.. and putting the string from the file into a 'payload' field.

In my case I had put Json into my file, hoping that would get parsed, but it just becomes a stringified Json string inside that single payload field. So you have to parse this yourself further down the pipeline (ie in a Kafka Stream Processor)

It's a shame the file source connector can't handle formats within the file it is reading from. It doesn't look too hard to write your own connector, but it would probably be copying a bunch of boiler plate code that is more about reading from the file than parsing the format.

I might just create a separate Kafka Streams app that parses the wrapped JSON and republishes, which will leave my main Streams app cleaner.

The file connector is only really for my PoC though, ultimately it will be a change capture from Oracle or PostgreSQL.

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

Ewen Cheslack-Postava

unread,
Jun 11, 2016, 7:31:19 PM6/11/16
to Confluent Platform
The file connector is not supposed to extract arbitrarily complex data -- it really is intended only as a minimal demonstration of how to build a connector.

Also, keep in mind that parsing JSON may seem simple, but that is just one format that could be handled by a connector that reads text files. There are dozens of text file formats -- JSON, CSV (and a dozen variants just of this), tab delimited, YAML, etc.

I think it'd be great to see connectors that can handle different formats. JSON is well enough specified that it's probably straightforward to support, although you still need to figure out what to do in some edge cases, e.g. parse errors, even if you make simplifying assumptions such as guaranteeing only one JSON object per line of text. Other formats like CSV can be a complete mess to handle robustly, and should definitely have their own connector.

-Ewen

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Martin Tyler

unread,
Jun 12, 2016, 4:12:13 AM6/12/16
to Confluent Platform
Incase this is useful for anyone, what I have done for now is to change the connector properties to this:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

This means the JSON messages in my file go straight into the Kafka queue as is, rather than being rapped and escaped. Then they only need a single step to parse them in my processors.

The other thing with the simple file source connector (as far as I have worked out) is that it does not handle keys in any way. It could be useful to get a key from the file too. So my first step in my processing is to parse the JSON, extract a key and partition.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

Ewen Cheslack-Postava

unread,
Jun 13, 2016, 7:54:25 PM6/13/16
to Confluent Platform
Wow, nice catch. I completely forgot we had the StringConverter. You shouldn't need the schemas.enable flags anymore as those are specific to JsonConverter.

-Ewen

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages