What is the expected commit/rollback behavior in Camus?

19 views
Skip to first unread message

Matt Reichert

unread,
Jul 8, 2016, 12:47:21 PM7/8/16
to Confluent Platform
Hi there,

I also posted this question in the Camus group:
We've been running Camus for about a year successfully to pull avro payloads from Kafka (ver 0.82) and store as .avro files in HDFS.  Recently, a new team within our company registered about 60 new schemas in our pre-production environment and started sending data through.  They made some mistakes in routing their data to the correct topics, which caused problems when Camus tried to deserialize from Kafka on these topics - the Camus job then failed due to exceeding the 'failed other' error threshold.  The resulting behavior in Camus after the failure was surprising, I wanted to check whether the behavior we observed is expected or whether we have some issue going on with our implementation (at the time we downloaded Camus, it was on its way out from LinkedIn, I think we had to do some work to create a distribution, etc).
I'd also be interested in your take on parameters we can set up to improve the commit/rollback behavior.
Thanks ~

------------

We noticed this behavior when the Camus job fails:
1.  All of the the mapper jobs in Camus succeed and so the map tasks allowed to commit, which then writes .avro files to their final HDFS locations
2.  The CamusJob throws an exception when it computes the % error rate (this is following the mapper commit), which causes the oozie job to fail
3.  The Kafka offsets aren't advanced (Im thinking that step must occur at some point in the code following the thrown exception and so isn't called)  

Our Camus job is set to run every 5 minutes, and because of the behavior above, we write duplicated data to HDFS every 5 minutes (the job runs, commits files to HDFS, fails, then doesn't advance Kafka offsets).
I recreated the problem on my local machine by submitting 10 records with the expected schema and 10 records with an unexpected schema to the same topic and ran our Camus job with only that topic whitelisted.  The result is that the 10 good records are written to HDFS, then 10 bad records are not, and the next time I run the job the offsets aren't advanced, so I pull in the same 10 records again.

Here's a snippet from the log showing the behavior of commit, job failure, etc from my test.
Let me know if more information would be helpful in order to provide advice, etc.

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task:attempt_local866350146_0001_m_000000_0 is done. And is in the process of committing

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task attempt_local866350146_0001_m_000000_0 is allowed to commit now

[EtlMultiOutputFormat] - work path: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0

[EtlMultiOutputFormat] - Destination base path: /user/camus/kafka/data

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.3.2.2.1467979200000.avro

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.7.8.8.1467979200000.avro

[EtlMultiOutputFormat] - work file: errors-m-00000

[FileOutputCommitter] - Saved output of task 'attempt_local866350146_0001_m_000000_0' to file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/task_local866350146_0001_m_000000

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4; advertising.edmunds.admax:0:0; advertising.edmunds.admax:1:1

[Task] - Task 'attempt_local866350146_0001_m_000000_0' done.

[LocalJobRunner] - Finishing task: attempt_local866350146_0001_m_000000_0

[LocalJobRunner] - map task executor complete.

[Job] -  map 100% reduce 0%

[Job] - Job job_local866350146_0001 completed successfully

[Job] - Counters: 23

File System Counters

FILE: Number of bytes read=117251

FILE: Number of bytes written=350942

FILE: Number of read operations=0

FILE: Number of large read operations=0

FILE: Number of write operations=0

Map-Reduce Framework

Map input records=10

Map output records=15

Input split bytes=793

Spilled Records=0

Failed Shuffles=0

Merged Map outputs=0

GC time elapsed (ms)=13

Total committed heap usage (bytes)=251658240

com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG

DECODE_SUCCESSFUL=10

SKIPPED_OTHER=10

File Input Format Counters 

Bytes Read=0

File Output Format Counters 

Bytes Written=5907

total

data-read=840

decode-time(ms)=123

event-count=20

mapper-time(ms)=58

request-time(ms)=12114

skip-old=0

[CamusJob] - Group: File System Counters

[CamusJob] - FILE: Number of bytes read: 117251

[CamusJob] - FILE: Number of bytes written: 350942

[CamusJob] - FILE: Number of read operations: 0

[CamusJob] - FILE: Number of large read operations: 0

[CamusJob] - FILE: Number of write operations: 0

[CamusJob] - Group: Map-Reduce Framework

[CamusJob] - Map input records: 10

[CamusJob] - Map output records: 15

[CamusJob] - Input split bytes: 793

[CamusJob] - Spilled Records: 0

[CamusJob] - Failed Shuffles: 0

[CamusJob] - Merged Map outputs: 0

[CamusJob] - GC time elapsed (ms): 13

[CamusJob] - Total committed heap usage (bytes): 251658240

[CamusJob] - Group: com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG

[CamusJob] - DECODE_SUCCESSFUL: 10

[CamusJob] - SKIPPED_OTHER: 10

[CamusJob] - Group: File Input Format Counters 

[CamusJob] - Bytes Read: 0

[CamusJob] - Group: File Output Format Counters 

[CamusJob] - Bytes Written: 5907

[CamusJob] - Group: total

[CamusJob] - data-read: 840

[CamusJob] - decode-time(ms): 123

[CamusJob] - event-count: 20

[CamusJob] - mapper-time(ms): 58

[CamusJob] - request-time(ms): 12114

[CamusJob] - skip-old: 0

[CamusJob] - Group: File System Counters

[CamusJob] - FILE: Number of bytes read: 117251

[CamusJob] - FILE: Number of bytes written: 350942

[CamusJob] - FILE: Number of read operations: 0

[CamusJob] - FILE: Number of large read operations: 0

[CamusJob] - FILE: Number of write operations: 0

[CamusJob] - Group: Map-Reduce Framework

[CamusJob] - Map input records: 10

[CamusJob] - Map output records: 15

[CamusJob] - Input split bytes: 793

[CamusJob] - Spilled Records: 0

[CamusJob] - Failed Shuffles: 0

[CamusJob] - Merged Map outputs: 0

[CamusJob] - GC time elapsed (ms): 13

[CamusJob] - Total committed heap usage (bytes): 251658240

[CamusJob] - job failed: 50.0% messages skipped due to other, maximum allowed is 0.1%

Reply all
Reply to author
Forward
0 new messages