Using a custom data writer with Camus

381 views
Skip to first unread message

Ken Goodhope

unread,
Aug 1, 2013, 4:12:43 PM8/1/13
to camu...@googlegroups.com
Hello Everyone,

I tracked down the commit that added the ability to specify a custom data writer for camus.  We still need to add documentation on how to make use of this, but for those who want to start using it now you can look through the patch to see what classes you need to extend and what configuration params you need to set.

This commit was only added to the kafka .8 branch.  Internally, we are off of kafka .7 and soon this branch will become trunk.  If someone needs this functionality with kafka .7 we'll need to backport this patch.

Thanks to Sam Meder for adding this.  We should now be in a position to handle any kind of data by specifying a custom decoder and a custom record writer.

Ken

Felix GV

unread,
Aug 1, 2013, 4:20:15 PM8/1/13
to Ken Goodhope, camu...@googlegroups.com
Wow very cool :) !

I have an off-topic question regarding 0.8: is LinkedIn fully done with the migration? During the Kafka meet up after the Hadoop Summit you guys were saying you still had about 2/3 of your consumers reading from the (mirrored) 0.7 cluster. Also, are you guys planning a 0.8.1 release soon or are things looking super stable as is right now?

--
Felix


--
You received this message because you are subscribed to the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Ken Goodhope

unread,
Aug 1, 2013, 4:30:04 PM8/1/13
to Felix GV, camu...@googlegroups.com
I'm not sure if we still have .7 consumers or not, or if we are planning a 0.8.1 release.  The Kafka team could answer more definitively.  All I know for sure is all our Camus instances are now pulling from .8.

Felix GV

unread,
Aug 1, 2013, 4:32:38 PM8/1/13
to Ken Goodhope, camu...@googlegroups.com
Ok thanks for the info Ken :)

I'll read/ask on the Kafka list when the time comes for us to migrate to 0.8 :) ...

--
Felix

Jay Kreps

unread,
Aug 1, 2013, 11:26:36 PM8/1/13
to Felix GV, Ken Goodhope, camu...@googlegroups.com
Current state: consumers are 100% migrated, producers maybe 60%.

-Jay


On Thu, Aug 1, 2013 at 1:20 PM, Felix GV <fe...@mate1inc.com> wrote:

Andrew Otto

unread,
Aug 14, 2013, 12:45:40 PM8/14/13
to camu...@googlegroups.com
Thanks Ken!

I'm busy trying to figure out how to make Camus read directly from a text Kafka topic and write the raw text bytes to HDFS.

I think I've figured out the MessageDecoder bit, but am having trouble implementing a working RecordWriterProvider.  

When I try to run that as is, everything looks pretty good.  The job finishes succesfully, and I see info about bytes being written.  However, nothing is actually written to my etl.destination.path.  

I'm sure I'm just missing an obvious but really important step.  (This is actually my first time writing Hadoop Java stuff).

Below is what I have so far:



package org.wikimedia.analytics.kraken.etl;
 
import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
 
public class TextRecordWriterProvider implements RecordWriterProvider {
public final static String EXT = ".txt";
 
@Override
public String getFilenameExtension() {
return EXT;
}
 
@Override
public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(
TaskAttemptContext context,
String fileName,
CamusWrapper data,
FileOutputCommitter committer) throws IOException, InterruptedException {
// Path path = committer.getWorkPath();
// path = new Path(path, EtlMultiOutputFormat.getUniqueFile(context, fileName, EXT));
// -- I think I need to somehow use this Path to tell the returned RecordWriter
// where to write, but I'm not sure how to do this. I've tried:
// outputFormat.setOutputPath(<Job>, path), but I don't know where to get Job from.
// All I've got is context, and I find a way to get the context's Job from that.
 
// This doesn't write anything to etl.destination.path. :/
FileOutputFormat outputFormat = new TextOutputFormat();
return outputFormat.getRecordWriter(context);
}
}



Andrew Otto

unread,
Aug 15, 2013, 10:08:22 AM8/15/13
to camu...@googlegroups.com
Ok, I think I got it!  Or, at least something that works.  I just needed to use FileSystem .create().


package org.wikimedia.analytics.kraken.etl;
 
import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 
public class TextRecordWriterProvider implements RecordWriterProvider {
public final static String EXT = ".txt";
 
@Override
public String getFilenameExtension() {
return EXT;
}
 
@Override
public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(
TaskAttemptContext context,
String fileName,
CamusWrapper data,
FileOutputCommitter committer) throws IOException, InterruptedException {
 
 
Path path = new Path(
committer.getWorkPath(),
EtlMultiOutputFormat.getUniqueFile(context, fileName, EXT)
);
 
final FSDataOutputStream writer = path.getFileSystem(
context.getConfiguration()
).create(path);
 
return new RecordWriter<IEtlKey, CamusWrapper>() {
@Override
public void write(IEtlKey ignore, CamusWrapper data) throws IOException {
String record = ((String)data.getRecord() + "\n");
writer.write(record.getBytes());
}
 
@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
writer.close();
}
};
}
}
Reply all
Reply to author
Forward
0 new messages