I was using Gobblin 0.8 to extract Avro messages from Kafka to HDFS and it was working fine. We upgraded to 0.10 and unable to extract messages and no failures either.
Pull file:
kafka.brokers=xxxx:9092
kafka.deserializer.type=BYTE_ARRAY
job.name=test_gobblin10_kafka_avro_to_hdfs
job.group=test_gobblin10_kafka_avro_to_hdfs
job.description=Gobblin Job to pull avro raw events from Kafka to Hdfs
job.lock.enabled=false
job.jars=/path/to/jar/xxx.jar
topic.whitelist=avro_kafka_topic
bootstrap.with.offset=earliest
reset.on.offset.out.of.range=nearest
mr.job.max.mappers=10
metrics.enabled=true
metrics.reporting.file.enabled=true
metrics.log.dir=/path/to/log
source.kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
source.class=com.xxx.custom.source.KafkaAvroSource
extract.namespace=gobblin.extract.kafka
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=AVRO
writer.partitioner.class=com.custom.partitioning.EventPartitioner
simple.writer.delimiter=\n
data.publisher.type=gobblin.publisher.BaseDataPublisher
data.publisher.final.dir=/output/path
task.data.root.dir=/user/output/task
Custom Avro Extractor
-------------------------------
public class KafkaAvroExtractorCustom extends KafkaAvroExtractor{
public KafkaAvroExtractorCustom(WorkUnitState state) {
super(state);
}
@Override
protected Schema getRecordSchema(byte[] payload) {
Schema schema = null;
try {
schema = new Schema.Parser().parse(ClassLoader.getSystemResourceAsStream(("xxxx.avsc")));
} catch (Exception e) {
LOG.error("Error parsing xxxx avsc" + e.getMessage());
}
return schema;
}
@Override
protected Decoder getDecoder(byte[] payload) {
Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
return decoder;
}
@Override
protected Schema getLatestSchemaByTopic(String topic) {
Schema schema = null;
try {
schema = new Schema.Parser().parse(ClassLoader.getSystemResourceAsStream(("xxxx.avsc")));
} catch (Exception e) {
LOG.error("Error parsing xxxx avsc" + e.getMessage());
}
return schema;
}
@Override
protected Optional<Schema> getExtractorSchema() {
Schema schema = null;
try {
schema = new Schema.Parser().parse(ClassLoader.getSystemResourceAsStream(("xxxx.avsc")));
} catch (Exception e) {
LOG.error("Error parsing xxxx avsc" + e.getMessage());
}
return Optional.fromNullable(schema);
}
}
Custom Avro Source
-------------------------------
public class KafkaAvroSource extends KafkaSource {
public KafkaAvroExtractorCustom getExtractor(WorkUnitState state) throws IOException {
return new KafkaAvroExtractorCustom(state);
}
}
Mapper LOG (sample):
2017-06-14 20:48:58,966 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Fetched 108 Records from xxxx_topic:83
2017-06-14 20:48:59,027 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Consume records xxxx_topic:83 next=30769958 max=30770295
2017-06-14 20:48:59,648 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Fetched 119 Records from xxxx_topic:83
2017-06-14 20:48:59,716 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Consume records xxxx_topic:83 next=30770077 max=30770295
2017-06-14 20:49:00,335 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Fetched 110 Records from xxxx_topic:83
2017-06-14 20:49:00,423 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Consume records xxxx_topic:83 next=30770187 max=30770295
2017-06-14 20:49:01,043 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Fetched 118 Records from xxxx_topic:83
2017-06-14 20:49:01,110 INFO [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Finished pulling partition xxxx_topic:83
2017-06-14 20:49:01,110 INFO [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Finished pulling partition xxxx_topic:83
2017-06-14 20:49:01,110 INFO [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Finished pulling topic xxxx_topic
2017-06-14 20:49:01,110 INFO [TaskExecutor-0] gobblin.runtime.Task: Extracted 0 data records
2017-06-14 20:49:01,111 INFO [TaskExecutor-0] gobblin.runtime.Task: Row quality checker finished with results:
2017-06-14 20:49:01,111 INFO [TaskExecutor-0] gobblin.runtime.Task: Task shutdown: Fork future reaped in 0 millis
2017-06-14 20:49:01,111 INFO [main] gobblin.runtime.GobblinMultiTaskAttempt-attempt_1497454490020_1073_m_000071_0: All assigned tasks of job test_gobblin10_kafka_avro_to_hdfs_1497473210357 have completed in container attempt_1497454490020_1073_m_000071_0
2017-06-14 20:49:01,111 INFO [main] gobblin.runtime.GobblinMultiTaskAttempt-attempt_1497454490020_1073_m_000071_0: Will commit tasks directly.
2017-06-14 20:49:01,158 INFO [Task-committing-pool-0] gobblin.publisher.TaskPublisher: All components finished successfully, checking quality tests
2017-06-14 20:49:01,159 INFO [Task-committing-pool-0] gobblin.publisher.TaskPublisher: All required test passed for this task passed.
2017-06-14 20:49:01,159 INFO [Task-committing-pool-0] gobblin.publisher.TaskPublisher: Cleanup for task publisher executed successfully.
2017-06-14 20:49:01,159 INFO [Task-committing-pool-0] gobblin.runtime.fork.Fork-0: Committing data for fork 0 of task test_gobblin10_kafka_avro_to_hdfs_1497473210357_53
2017-06-14 20:49:01,167 INFO [Task-committing-pool-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Actual high watermark for partition xxxx_topic:83=30770295, expected=30770295
2017-06-14 20:49:01,167 INFO [Task-committing-pool-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Avg time to pull a record for partition xxxx_topic:83 not recorded
2017-06-14 20:49:01,182 INFO [Task-committing-pool-0] gobblin.runtime.Task: publish.data.at.job.level is true. Will publish data at the job level.
2017-06-14 20:49:01,182 INFO [Task-committing-pool-0] gobblin.runtime.mapreduce.MRTaskStateTracker: Task task_test_gobblin10_kafka_avro_to_hdfs_149
Any thoughts why 0.10 is unable to extract any records while 0.8 was working fine with the same custom source/extractor.
Note: I see 0.10 is using Avro 1.8.1 but 0.8 was 1.7.7. Our Kafka topic has Avro records with 1.7.7.