Unable to extract AVRO data from Kafka into HDFS using Gobblin 0.10

154 views
Skip to first unread message

Natesan Rajaraman

unread,
Jun 28, 2017, 4:18:59 PM6/28/17
to gobblin-users
I was using Gobblin 0.8 to extract Avro messages from Kafka to HDFS and it was working fine. We upgraded to 0.10 and unable to extract messages and no failures either. 

Pull file:

kafka.brokers=xxxx:9092
kafka.deserializer.type=BYTE_ARRAY
job.name=test_gobblin10_kafka_avro_to_hdfs
job.group=test_gobblin10_kafka_avro_to_hdfs
job.description=Gobblin Job to pull avro raw events from Kafka to Hdfs
job.lock.enabled=false
job.jars=/path/to/jar/xxx.jar

topic.whitelist=avro_kafka_topic
bootstrap.with.offset=earliest
reset.on.offset.out.of.range=nearest

mr.job.max.mappers=10

metrics.enabled=true
metrics.reporting.file.enabled=true
metrics.log.dir=/path/to/log

source.kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
source.class=com.xxx.custom.source.KafkaAvroSource
extract.namespace=gobblin.extract.kafka

writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=AVRO
writer.partitioner.class=com.custom.partitioning.EventPartitioner
simple.writer.delimiter=\n

data.publisher.type=gobblin.publisher.BaseDataPublisher
data.publisher.final.dir=/output/path
task.data.root.dir=/user/output/task

Custom Avro Extractor
-------------------------------
public class KafkaAvroExtractorCustom extends  KafkaAvroExtractor{

    public KafkaAvroExtractorCustom(WorkUnitState state) {
        super(state);
    }

    @Override
    protected Schema getRecordSchema(byte[] payload) {
        Schema schema = null;
        try {
            schema = new Schema.Parser().parse(ClassLoader.getSystemResourceAsStream(("xxxx.avsc")));
        } catch (Exception e) {
            LOG.error("Error parsing xxxx avsc" + e.getMessage());
        }
        return schema;
    }

    @Override
    protected Decoder getDecoder(byte[] payload) {
        Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
        return decoder;
    }

    @Override
    protected Schema getLatestSchemaByTopic(String topic) {
     Schema schema = null;
        try {
            schema = new Schema.Parser().parse(ClassLoader.getSystemResourceAsStream(("xxxx.avsc")));
        } catch (Exception e) {
            LOG.error("Error parsing xxxx avsc" + e.getMessage());
        }
        return schema;
    }

    @Override
    protected Optional<Schema> getExtractorSchema() {
        Schema schema = null;
        try {
            schema = new Schema.Parser().parse(ClassLoader.getSystemResourceAsStream(("xxxx.avsc")));
        } catch (Exception e) {
            LOG.error("Error parsing xxxx avsc" + e.getMessage());
        }
        return Optional.fromNullable(schema);
    }
}

Custom Avro Source
-------------------------------

public class KafkaAvroSource extends KafkaSource {
 
public KafkaAvroExtractorCustom getExtractor(WorkUnitState state) throws IOException {

        return new KafkaAvroExtractorCustom(state);

    }
}

Mapper LOG (sample):

2017-06-14 20:48:58,966 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Fetched 108 Records from xxxx_topic:83
2017-06-14 20:48:59,027 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Consume records xxxx_topic:83 next=30769958 max=30770295
2017-06-14 20:48:59,648 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Fetched 119 Records from xxxx_topic:83
2017-06-14 20:48:59,716 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Consume records xxxx_topic:83 next=30770077 max=30770295
2017-06-14 20:49:00,335 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Fetched 110 Records from xxxx_topic:83
2017-06-14 20:49:00,423 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Consume records xxxx_topic:83 next=30770187 max=30770295
2017-06-14 20:49:01,043 INFO [TaskExecutor-0] gobblin.kafka.client.Kafka09ConsumerClient: Fetched 118 Records from xxxx_topic:83
2017-06-14 20:49:01,110 INFO [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Finished pulling partition xxxx_topic:83
2017-06-14 20:49:01,110 INFO [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Finished pulling partition xxxx_topic:83
2017-06-14 20:49:01,110 INFO [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Finished pulling topic xxxx_topic
2017-06-14 20:49:01,110 INFO [TaskExecutor-0] gobblin.runtime.Task: Extracted 0 data records
2017-06-14 20:49:01,111 INFO [TaskExecutor-0] gobblin.runtime.Task: Row quality checker finished with results:
2017-06-14 20:49:01,111 INFO [TaskExecutor-0] gobblin.runtime.Task: Task shutdown: Fork future reaped in 0 millis
2017-06-14 20:49:01,111 INFO [main] gobblin.runtime.GobblinMultiTaskAttempt-attempt_1497454490020_1073_m_000071_0: All assigned tasks of job test_gobblin10_kafka_avro_to_hdfs_1497473210357 have completed in container attempt_1497454490020_1073_m_000071_0
2017-06-14 20:49:01,111 INFO [main] gobblin.runtime.GobblinMultiTaskAttempt-attempt_1497454490020_1073_m_000071_0: Will commit tasks directly.
2017-06-14 20:49:01,158 INFO [Task-committing-pool-0] gobblin.publisher.TaskPublisher: All components finished successfully, checking quality tests
2017-06-14 20:49:01,159 INFO [Task-committing-pool-0] gobblin.publisher.TaskPublisher: All required test passed for this task passed.
2017-06-14 20:49:01,159 INFO [Task-committing-pool-0] gobblin.publisher.TaskPublisher: Cleanup for task publisher executed successfully.
2017-06-14 20:49:01,159 INFO [Task-committing-pool-0] gobblin.runtime.fork.Fork-0: Committing data for fork 0 of task test_gobblin10_kafka_avro_to_hdfs_1497473210357_53
2017-06-14 20:49:01,167 INFO [Task-committing-pool-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Actual high watermark for partition xxxx_topic:83=30770295, expected=30770295
2017-06-14 20:49:01,167 INFO [Task-committing-pool-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Avg time to pull a record for partition xxxx_topic:83 not recorded
2017-06-14 20:49:01,182 INFO [Task-committing-pool-0] gobblin.runtime.Task: publish.data.at.job.level is true. Will publish data at the job level.
2017-06-14 20:49:01,182 INFO [Task-committing-pool-0] gobblin.runtime.mapreduce.MRTaskStateTracker: Task task_test_gobblin10_kafka_avro_to_hdfs_149

Any thoughts why 0.10 is unable to extract any records while 0.8 was working fine with the same custom source/extractor.

Note: I see 0.10 is using Avro 1.8.1 but 0.8 was 1.7.7. Our Kafka topic has Avro records with 1.7.7.

Reply all
Reply to author
Forward
0 new messages