Uncommitted Offsets but Committed Data in Failed Camus Jobs

253 views
Skip to first unread message

Shaun Sit

unread,
May 18, 2015, 1:02:30 AM5/18/15
to camu...@googlegroups.com
Hi there,

We are encountering an issue with failed Camus jobs. We notice that when some of the mappers succeed and other fails, the successful mappers will move their data to the etl.destinaton folder directly and use FileOutputCommitter to commit its offsets. When the entire job ultimately fails, the offsets of those successful mappers gets cleaned up and are never committed to HDFS. This leads to the situation whereby the data is committed but the offsets are not.

Anyone seeing this issue?

Regards,
Shaun

Krzysztof Zarzycki

unread,
Jul 2, 2015, 3:31:45 AM7/2/15
to camu...@googlegroups.com

I believe I'm encountering very similar problem.
My camus job finishes with an error that it failed to move file: 

orary/attempt_1424354741837_0342_m_000000_0/data.frontapp_events_v20150223_1_bin.2.2.1435363200000-m-00000.kv.snappy to /user/camus/protobuf-events/topics/frontapp
_events_v20150223_1_bin/daily/2015/06/27/frontapp_events_v20150223_1_bin.2.2.897294.108600553.1435363200000.kv.snappy
        at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputCommitter.commitFile(EtlMultiOutputCommitter.java:155)
        [...]


I see that indeed, the file already existed from previous Camus job.
Next time I run the job, it failed with the same error, so at least I see, that Camus did not commit offsets, it tried again to load the same offsets. (earlies_offset is the same like in previous job)

I cherry-picked & deleted the old files, but it was awful job and I never know if I haven't made a mistake deleting some of my data forever...

Does anyone have some better idea than deleting the old copied, but not commited data?? 

Thanks,
Krzysztof

Miao Liu

unread,
Jul 21, 2015, 6:10:06 AM7/21/15
to camu...@googlegroups.com
see the code of commitFile method in "camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputCommitter.java"
if (!FileSystem.get(job.getConfiguration()).rename(source, target)) 

if the target file exists, the rename method will fail with printing logs and throwing io exception. 
That is the reason why "Failed to move from xxxxx" happened!


My solution is : to delete the target file if it exists before rename.

The detail is following:
add 3 lines code before calling commitFile method in commitTask method.
---------------------
if (!fs.exists(dest.getParent())) {
   mkdirs(fs, dest.getParent());
}

+ if (fs.exists(dest)) {
+   fs.delete(dest);
+ }

commitFile(context, f.getPath(), dest);
log.info("Moved file from: " + f.getPath() + " to: " + dest);
---------------------------------------------

then, compile & deploy the jar file.

hope it is useful to you.

Krzysztof Zarzycki

unread,
Sep 16, 2015, 2:05:20 AM9/16/15
to Camus - Kafka ETL for Hadoop
Thanks Miao for your idea. 
But do you think it is just safe to delete the committed file? It's there because of some reason. Don't we lose some data here? 
Thanks,
Krzysztof

Zhao Weinan

unread,
Sep 24, 2015, 10:52:58 PM9/24/15
to Camus - Kafka ETL for Hadoop
Hi Krzysztof,

I thinke it's safe to delete the DUPLICATED FILE, because the data shoulde be excatly same if the filename(${brokerId}.${partitionId}.${count}.${lastOffset}.${timestamp}) is same, if not then you are in bigger trouble...

Unfortunately, delete the duplicated file is not enough, since the filename maybe not same while the data range overlapped, for instance: 1.1.10.100.1435363200000 has 10 data from offset 90, and 1.1.20.110.1435363200000 has 20 data from offset 90, if you save both file, then you have 90-100 kafka messages twice...So you need some simple math to detect this situation.

But since Camus is stop developing, you guys should consider other Kafka-HDFS-ETL tools such as Gobblin .

在 2015年9月16日星期三 UTC+8下午2:05:20,Krzysztof Zarzycki写道:

Krzysztof Zarzycki

unread,
Sep 29, 2015, 3:33:08 AM9/29/15
to Zhao Weinan, Camus - Kafka ETL for Hadoop
Great, thank you Zhao for your answer. That explains pretty much. 

How ready for production is Gobblin for Kafka-HDFS ETL, do you know? How easy it is to set up everyday Gobblin job to dump JSON and Protobuf events with it? I know it has a support for Avro, but it doesn't help me unfortunately. 

Thanks!
Krzysiek


--
You received this message because you are subscribed to a topic in the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/camus_etl/_sZX-VMyLLU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Zhao Weinan

unread,
Sep 30, 2015, 5:17:46 AM9/30/15
to Camus - Kafka ETL for Hadoop, xcvi...@gmail.com
Hi Krzysztof,

I don't know whether Gobblin is ready enough for production or not, and we haven't use it. But since it's successor of Camus (both of them are open sourced by Linkedin), and has one release while Camus has zero, I think it's worth to keep eyes on it. At least it has option to avoid the issue you've hit: COMMIT_ON_PARTITIAL_SUCCESS / COMMIT_ON_FULL_SUCCESS .

At a glimpse, I think the code architecture is clearer than Camus, it's pretty easy to extend, you just need implement the your custom data-writer for JSON or protobuf, maybe there already is one.

在 2015年9月29日星期二 UTC+8下午3:33:08,Krzysztof Zarzycki写道:

Azrael Seoeun Park

unread,
Oct 12, 2015, 9:11:37 PM10/12/15
to Camus - Kafka ETL for Hadoop, xcvi...@gmail.com
I have same problem and deleted the target file with the similar way as Miao said.


--- EtlMultiOutputCommitter ---

protected void commitFile(JobContext job, Path source, Path target) throws IOException {
    log.info(String.format("Moving %s to %s", source, target));
    if (!FileSystem.get(job.getConfiguration()).rename(source, target)) {
      log.error(String.format("Failed to move from %s to %s", source, target));
      FileStatus targetFileStatus = FileSystem.get(job.getConfiguration()).getFileStatus(target);
      if (targetFileStatus.isFile()) {
        FileStatus sourceFileStatus = FileSystem.get(job.getConfiguration()).getFileStatus(source);
        log.warn("Failed to move : sourceFile (" + sourceFileStatus.getLen() +"), targetFile (" + targetFileStatus.getLen() +
                ") " + " length equals = " + (sourceFileStatus.getLen() == targetFileStatus.getLen()));
        if (FileSystem.get(job.getConfiguration()).delete(target)) {
          log.warn("Failed to move : retry move : " + FileSystem.get(job.getConfiguration()).rename(source, target));
        }
      } else {
        throw new IOException(String.format("Failed to move from %s to %s", source, target));
      }
    }
  }
-----------------------------------
2015년 9월 30일 수요일 오후 6시 17분 46초 UTC+9, Zhao Weinan 님의 말:
Reply all
Reply to author
Forward
0 new messages