hey
I see in the log that there is a problem with the dir creation:
2017-03-22 16:06:31 UTC WARN [ParallelRunner] gobblin.util.ParallelRunner$6 318 - Failed to move hdfs://172.28.0.50:8020/user/gobblin/work/task-output/clicks.test6/2017 to /user/ubuntu/gobblin/gobblin-kafka/clicks.test6/2017: dst already exists
org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename hdfs://172.28.0.50:8020/user/gobblin/work/task-output/clicks.test6/2017 to /user/ubuntu/gobblin/gobblin-kafka/clicks.test6/2017: dst already exists
at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:219)
at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:200)
at gobblin.util.HadoopUtils.movePath(HadoopUtils.java:262)
at gobblin.util.ParallelRunner$6.call(ParallelRunner.java:311)
at gobblin.util.ParallelRunner$6.call(ParallelRunner.java:304)
at gobblin.util.executors.MDCPropagatingCallable.call(MDCPropagatingCallable.java:38)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)I'm trying to store into /topic-name/yyyy/mm/dd for which I wrote the following partitioner.:
Every kafka message is json with a 'timestamp' field used to determine where to write in hdfs.
public class JsonKafkaWriterPartitioner extends TimeBasedWriterPartitioner<byte []>{
private static final Logger log = LoggerFactory.getLogger(JsonKafkaWriterPartitioner.class);
private Gson gson = new Gson();
private String TIMESTAMP_FIELD_ID = "timestamp";
public JsonKafkaWriterPartitioner(State state, int numBranches, int branchId) {
super(state, numBranches, branchId);
}
@Override
public long getRecordTimestamp(byte[] payload) {
JsonObject jsonObject;
String payloadString;
try {
payloadString = new String(payload, "UTF-8");
} catch (UnsupportedEncodingException e) {
log.error("Unable to load UTF-8 encoding, falling back to system default", e);
payloadString = new String(payload);
}
try {
jsonObject = gson.fromJson(payloadString, JsonObject.class);
} catch (RuntimeException e) {
log.error("Caught exception while parsing JSON string '" + payloadString + "'.");
throw new RuntimeException(e);
}
long ret = System.currentTimeMillis();
if (jsonObject != null && jsonObject.has(TIMESTAMP_FIELD_ID)) {
try {
Integer index = jsonObject.get("timestamp").getAsString().indexOf("T");
if (index != -1){
ret = ISOtoMili(jsonObject.get(TIMESTAMP_FIELD_ID).getAsString().substring(0, index));
}
} catch (ParseException ex) {
java.util.logging.Logger.getLogger(JsonKafkaWriterPartitioner.class.getName()).log(Level.SEVERE, null, ex);
}
}
return ret;
}
private long ISOtoMili(String iso) throws ParseException{
return new SimpleDateFormat("yyyy-MM-dd").parse(iso).getTime();
}
}
Op woensdag 22 maart 2017 17:05:02 UTC+1 schreef Issac Buenrostro: