Reading from Kafka only successful first run

69 views
Skip to first unread message

sborny

unread,
Mar 22, 2017, 11:42:30 AM3/22/17
to gobblin-users
Hello,

I'm running into some problems trying to store a Kafka topic into hdfs.
When I run my job the first time it reads data from the topic and stores it in hdfs, however with only 5 messages in the topic, 2 separate files are created. One has 4 messages and the other 1.
After pushing more messages into the topic and rerunning the job, nothing is stored.
I'm using MR to submit the job.

My job file:

job.name=kafka2hdfs
job.group=GobblinKafka
job.description=Convert Kafka topic to hdfs
job.lock.enabled=false

kafka.brokers=172.28.0.4:9092

topic.whitelist=topic
mr.job.max.mappers=1
bootstrap.with.offset=earliest

source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka

writer.partitioner.class=com.tengu.gobblin.json.kafka.JsonKafkaWriterPartitioner
writer.partition.pattern=YYYY/MM/dd
writer.partition.timezone=UTC

writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
simple.writer.delimiter=\n
simple.writer.prepend.size=false

data.publisher.type=gobblin.publisher.BaseDataPublisher

fs.uri=hdfs://172.28.0.50:8020
writer.fs.uri=hdfs://172.28.0.50:8020
state.store.fs.uri=hdfs://172.28.0.50:8020

metrics.reporting.file.enabled=true
metrics.log.dir=/user/ubuntu/gobblin/metrics
metrics.reporting.file.suffix=txt

mr.job.root.dir=/tmp/gobblin/gobblin-kafka/working
state.store.dir=/user/ubuntu/gobblin/gobblin-kafka/state-store
task.data.root.dir=/user/ubuntu/gobblin/gobblin-kafka/task-data
data.publisher.final.dir=/user/ubuntu/gobblin/gobblin-kafka


Any help is greatly appreciated.

Thanks
sborny

Issac Buenrostro

unread,
Mar 22, 2017, 12:05:02 PM3/22/17
to sborny, gobblin-users
Can you send the execution log?
Thanks
Issac

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/630cd125-4413-4d3e-94bd-4bdd070fa696%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

sborny

unread,
Mar 22, 2017, 12:18:43 PM3/22/17
to gobblin-users, sande...@gmail.com
hey

I see in the log that there is a problem with the dir creation:

2017-03-22 16:06:31 UTC WARN  [ParallelRunner] gobblin.util.ParallelRunner$6  318 - Failed to move hdfs://172.28.0.50:8020/user/gobblin/work/task-output/clicks.test6/2017 to /user/ubuntu/gobblin/gobblin-kafka/clicks.test6/2017: dst already exists
org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename hdfs://172.28.0.50:8020/user/gobblin/work/task-output/clicks.test6/2017 to /user/ubuntu/gobblin/gobblin-kafka/clicks.test6/2017: dst already exists
    at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:219)
    at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:200)
    at gobblin.util.HadoopUtils.movePath(HadoopUtils.java:262)
    at gobblin.util.ParallelRunner$6.call(ParallelRunner.java:311)
    at gobblin.util.ParallelRunner$6.call(ParallelRunner.java:304)
    at gobblin.util.executors.MDCPropagatingCallable.call(MDCPropagatingCallable.java:38)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)


I'm trying to store into /topic-name/yyyy/mm/dd for which I wrote the following partitioner.:
Every kafka message is json with a 'timestamp' field used to determine where to write in hdfs.

public class JsonKafkaWriterPartitioner extends TimeBasedWriterPartitioner<byte []>{
    private static final Logger log = LoggerFactory.getLogger(JsonKafkaWriterPartitioner.class);
    private Gson gson = new Gson();
    private String TIMESTAMP_FIELD_ID = "timestamp";

    public JsonKafkaWriterPartitioner(State state, int numBranches, int branchId) {
        super(state, numBranches, branchId);
    }

    @Override
    public long getRecordTimestamp(byte[] payload) {
        JsonObject jsonObject;
        String payloadString;
        try {
            payloadString = new String(payload, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            log.error("Unable to load UTF-8 encoding, falling back to system default", e);
            payloadString = new String(payload);
        }

        try {
            jsonObject = gson.fromJson(payloadString, JsonObject.class);
        } catch (RuntimeException e) {
            log.error("Caught exception while parsing JSON string '" + payloadString + "'.");
            throw new RuntimeException(e);
        }

        long ret = System.currentTimeMillis();
        if (jsonObject != null && jsonObject.has(TIMESTAMP_FIELD_ID)) {
            try {
                Integer index = jsonObject.get("timestamp").getAsString().indexOf("T");
                if (index != -1){
                    ret = ISOtoMili(jsonObject.get(TIMESTAMP_FIELD_ID).getAsString().substring(0, index));
                }
               
            } catch (ParseException ex) {
                java.util.logging.Logger.getLogger(JsonKafkaWriterPartitioner.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

        return ret;
    }
   
    private long ISOtoMili(String iso) throws ParseException{
        return new SimpleDateFormat("yyyy-MM-dd").parse(iso).getTime();
    }  
   
}

Op woensdag 22 maart 2017 17:05:02 UTC+1 schreef Issac Buenrostro:
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.

Issac Buenrostro

unread,
Mar 22, 2017, 12:24:51 PM3/22/17
to sborny, gobblin-users
Try using "gobblin.publisher.TimePartitionedDataPublisher" as a publisher.

To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.

To post to this group, send email to gobbli...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages