What is the expected commit/rollback behavior in Camus

22 views
Skip to first unread message

Matt Reichert

unread,
Jul 8, 2016, 11:18:31 AM7/8/16
to Camus - Kafka ETL for Hadoop
Hi there,

We've been running Camus for about a year successfully to pull avro payloads from Kafka (ver 0.82) and store as .avro files in HDFS.  Recently, a new team within our company registered about 60 new schemas in our pre-production environment and started sending data through.  They made some mistakes in routing their data to the correct topics, which caused problems when Camus tried to deserialize from Kafka on these topics - the Camus job then failed due to exceeding the 'failed other' error threshold.  The resulting behavior in Camus after the failure was surprising, I wanted to check whether the behavior we observed is expected or whether we have some issue going on with our implementation (at the time we downloaded Camus, it was on its way out from LinkedIn, I think we had to do some work to create a distribution, etc).
I'd also be interested in your take on parameters we can set up to improve the commit/rollback behavior.
Thanks ~

------------

We noticed this behavior when the Camus job fails:
1.  All of the the mapper jobs in Camus succeed and so the map tasks allowed to commit, which then writes .avro files to their final HDFS locations
2.  The CamusJob throws an exception when it computes the % error rate (this is following the mapper commit), which causes the oozie job to fail
3.  The Kafka offsets aren't advanced (Im thinking that step must occur at some point in the code following the thrown exception and so isn't called)  

Our Camus job is set to run every 5 minutes, and because of the behavior above, we write duplicated data to HDFS every 5 minutes (the job runs, commits files to HDFS, fails, then doesn't advance Kafka offsets).
I recreated the problem on my local machine by submitting 10 records with the expected schema and 10 records with an unexpected schema to the same topic and ran our Camus job with only that topic whitelisted.  The result is that the 10 good records are written to HDFS, then 10 bad records are not, and the next time I run the job the offsets aren't advanced, so I pull in the same 10 records again.

Here's a snippet from the log showing the behavior of commit, job failure, etc from my test.
Let me know if more information would be helpful in order to provide advice, etc.

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task:attempt_local866350146_0001_m_000000_0 is done. And is in the process of committing

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task attempt_local866350146_0001_m_000000_0 is allowed to commit now

[EtlMultiOutputFormat] - work path: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0

[EtlMultiOutputFormat] - Destination base path: /user/camus/kafka/data

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.3.2.2.1467979200000.avro

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.7.8.8.1467979200000.avro

[EtlMultiOutputFormat] - work file: errors-m-00000

[FileOutputCommitter] - Saved output of task 'attempt_local866350146_0001_m_000000_0' to file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/task_local866350146_0001_m_000000

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4; advertising.edmunds.admax:0:0; advertising.edmunds.admax:1:1

[Task] - Task 'attempt_local866350146_0001_m_000000_0' done.

[LocalJobRunner] - Finishing task: attempt_local866350146_0001_m_000000_0

[LocalJobRunner] - map task executor complete.

[Job] -  map 100% reduce 0%

[Job] - Job job_local866350146_0001 completed successfully

[Job] - Counters: 23

File System Counters

FILE: Number of bytes read=117251

FILE: Number of bytes written=350942

FILE: Number of read operations=0

FILE: Number of large read operations=0

FILE: Number of write operations=0

Map-Reduce Framework

Map input records=10

Map output records=15

Input split bytes=793

Spilled Records=0

Failed Shuffles=0

Merged Map outputs=0

GC time elapsed (ms)=13

Total committed heap usage (bytes)=251658240

com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG

DECODE_SUCCESSFUL=10

SKIPPED_OTHER=10

File Input Format Counters 

Bytes Read=0

File Output Format Counters 

Bytes Written=5907

total

data-read=840

decode-time(ms)=123

event-count=20

mapper-time(ms)=58

request-time(ms)=12114

skip-old=0

[CamusJob] - Group: File System Counters

[CamusJob] - FILE: Number of bytes read: 117251

[CamusJob] - FILE: Number of bytes written: 350942

[CamusJob] - FILE: Number of read operations: 0

[CamusJob] - FILE: Number of large read operations: 0

[CamusJob] - FILE: Number of write operations: 0

[CamusJob] - Group: Map-Reduce Framework

[CamusJob] - Map input records: 10

[CamusJob] - Map output records: 15

[CamusJob] - Input split bytes: 793

[CamusJob] - Spilled Records: 0

[CamusJob] - Failed Shuffles: 0

[CamusJob] - Merged Map outputs: 0

[CamusJob] - GC time elapsed (ms): 13

[CamusJob] - Total committed heap usage (bytes): 251658240

[CamusJob] - Group: com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG

[CamusJob] - DECODE_SUCCESSFUL: 10

[CamusJob] - SKIPPED_OTHER: 10

[CamusJob] - Group: File Input Format Counters 

[CamusJob] - Bytes Read: 0

[CamusJob] - Group: File Output Format Counters 

[CamusJob] - Bytes Written: 5907

[CamusJob] - Group: total

[CamusJob] - data-read: 840

[CamusJob] - decode-time(ms): 123

[CamusJob] - event-count: 20

[CamusJob] - mapper-time(ms): 58

[CamusJob] - request-time(ms): 12114

[CamusJob] - skip-old: 0

[CamusJob] - Group: File System Counters

[CamusJob] - FILE: Number of bytes read: 117251

[CamusJob] - FILE: Number of bytes written: 350942

[CamusJob] - FILE: Number of read operations: 0

[CamusJob] - FILE: Number of large read operations: 0

[CamusJob] - FILE: Number of write operations: 0

[CamusJob] - Group: Map-Reduce Framework

[CamusJob] - Map input records: 10

[CamusJob] - Map output records: 15

[CamusJob] - Input split bytes: 793

[CamusJob] - Spilled Records: 0

[CamusJob] - Failed Shuffles: 0

[CamusJob] - Merged Map outputs: 0

[CamusJob] - GC time elapsed (ms): 13

[CamusJob] - Total committed heap usage (bytes): 251658240

[CamusJob] - job failed: 50.0% messages skipped due to other, maximum allowed is 0.1%



Matt Reichert

unread,
Jul 28, 2016, 11:29:07 AM7/28/16
to Camus - Kafka ETL for Hadoop
We're still facing this issue, I received one response on Stack Overflow indicating that others are experiencing the same with Camus in their production environments.  
We've handled in a temporary way by disabling Camus error handling (e.g. we allow 100% of messages to experience deserialization errors, we ignore schema errors, etc).
At the moment the system feels very fragile.
Let me know if there's any wisdom out there.  I'm not sure whether we have a problem in the Camus software or somewhere else - could be how we're using it, but we need to come up with a better method of error handling that allows users to make mistakes without bringing down our pipeline or creating duplicates, etc. 

Issac Buenrostro

unread,
Jul 28, 2016, 11:47:11 AM7/28/16
to Matt Reichert, Camus - Kafka ETL for Hadoop

Hi Matt,

This is definitely a bug, and the stopgap solution of disabling error handling seems the most appropriate in the short term.

Unfortunately the LinkedIn team no longer supports Camus, so we will not be able to address this bug. Instead, we encourage you to migrate to Gobblin:

http://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/

Let us know if you need any help with the migration.
Issac

--
You received this message because you are subscribed to the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages