no HDFS output when converting JSON to AVRO

194 views
Skip to first unread message

Zhu Wayne

unread,
Mar 31, 2014, 7:01:41 PM3/31/14
to camu...@googlegroups.com
I am not sure why I am not getting AVRO output in HDFS. In KafkaToAvroDecoder.java, I convert a JSON string into an AVRO record. I can see the record being generated correctly.

Here is my decoder snippet:
 @Override
        public CamusWrapper<Record> decode(byte[] message) {
                JsonObject jsonObj = getJsonObject(message);

                if (jsonObj == null)
                        return null;
                Record r = new GenericData.Record(this.latestSchema);

                transformer.transform(jsonObj, r);
                long timestamp = getTimestamp(r);
                System.out.println("TS:" + timestamp + " R:" + r.toString());
                return new CamusWrapper<Record>(r, timestamp);
        }


# Needed Camus properties, more cleanup to come
# final top-level data output directory, sub-directory will be dynamically created for each topic pulled
etl.destination.path=/user/wzhu00/camus/camus-dest
# HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files
etl.execution.base.path=/user/wzhu00/camus/camus-exec
# where completed Camus job output directories are kept, usually a sub-dir in the base.path
etl.execution.history.path=/user/wzhu00/camus/camus-exec/history

# Kafka-0.8 handles all zookeeper calls
zookeeper.hosts=myZKHost
zookeeper.broker.topics=/brokers/topics
zookeeper.broker.nodes=/brokers/ids

# Concrete implementation of the Encoder class to use (used by Kafka Audit, and thus optional for now)
#camus.message.encoder.class=com.linkedin.batch.etl.kafka.coders.DummyKafkaMessageEncoder

# Concrete implementation of the Decoder class to use
#camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaToAvroDecoder

# Used by avro-based Decoders to use as their Schema Registry
kafka.message.coder.schema.registry.class=com.linkedin.camus.example.schemaregistry.DummySchemaRegistry

# Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all
# topic that do not have a partitioner specified
etl.partitioner.class=com.linkedin.camus.etl.kafka.coders.DefaultPartitioner

etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
#etl.output.record.delimiter=\n

# Partitioners can also be set on a per-topic basis
#etl.partitioner.class.<topic-name>=com.your.custom.CustomPartitioner

# all files in this dir will be added to the distributed cache and placed on the classpath for hadoop tasks
# hdfs.default.classpath.dir=

# max hadoop tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=30
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1

# if whitelist has values, only whitelisted topic are pulled.  nothing on the blacklist is pulled
kafka.blacklist.topics=
kafka.whitelist.topics=pricehfds

# Name of the client as seen by kafka
kafka.client.name=camus
# Fetch Request Parameters
kafka.fetch.buffer.size=104857600
kafka.fetch.request.correlationid=
kafka.fetch.request.max.wait=
kafka.fetch.request.min.bytes=
# Connection parameters.
kafka.brokers=myBrokers
kafka.timeout.value=6000


#Stops the mapper from getting inundated with Decoder exceptions for the same topic
#Default value is set to 10
max.decoder.exceptions.to.print=5

#Controls the submitting of counts to Kafka
#Default value set to true
post.tracking.counts.to.kafka=true

log4j.configuration=true

# everything below this point can be ignored for the time being, will provide more documentation down the road
##########################
etl.run.tracking.post=false
kafka.monitor.tier=
etl.counts.path=
kafka.monitor.time.granularity=10

etl.hourly=hourly
etl.daily=daily
etl.ignore.schema.errors=false

# configure output compression for deflate or snappy. Defaults to deflate
etl.output.codec=deflate
etl.deflate.level=6
#etl.output.codec=snappy

etl.default.timezone=America/Los_Angeles
etl.output.file.time.partition.mins=60
etl.keep.count.files=false
etl.execution.history.max.of.quota=.8

mapred.output.compress=false
mapred.map.max.attempts=1

kafka.client.buffer.size=20971520
kafka.client.so.timeout=60000

#zookeeper.session.timeout=
#zookeeper.connection.timeout=

Reply all
Reply to author
Forward
0 new messages