--
You received this message because you are subscribed to the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
package org.wikimedia.analytics.kraken.etl;import com.linkedin.camus.coders.CamusWrapper;import com.linkedin.camus.etl.IEtlKey;import com.linkedin.camus.etl.RecordWriterProvider;import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class TextRecordWriterProvider implements RecordWriterProvider {public final static String EXT = ".txt";@Overridepublic String getFilenameExtension() {return EXT;}@Overridepublic RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext context,String fileName,CamusWrapper data,FileOutputCommitter committer) throws IOException, InterruptedException {// Path path = committer.getWorkPath();// path = new Path(path, EtlMultiOutputFormat.getUniqueFile(context, fileName, EXT));// -- I think I need to somehow use this Path to tell the returned RecordWriter// where to write, but I'm not sure how to do this. I've tried:// outputFormat.setOutputPath(<Job>, path), but I don't know where to get Job from.// All I've got is context, and I find a way to get the context's Job from that.// This doesn't write anything to etl.destination.path. :/FileOutputFormat outputFormat = new TextOutputFormat();return outputFormat.getRecordWriter(context);}}
package org.wikimedia.analytics.kraken.etl;import com.linkedin.camus.coders.CamusWrapper;import com.linkedin.camus.etl.IEtlKey;import com.linkedin.camus.etl.RecordWriterProvider;import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
public class TextRecordWriterProvider implements RecordWriterProvider {public final static String EXT = ".txt";@Overridepublic String getFilenameExtension() {return EXT;}@Overridepublic RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext context,String fileName,CamusWrapper data,FileOutputCommitter committer) throws IOException, InterruptedException {
Path path = new Path(committer.getWorkPath(),EtlMultiOutputFormat.getUniqueFile(context, fileName, EXT));final FSDataOutputStream writer = path.getFileSystem(context.getConfiguration()).create(path);return new RecordWriter<IEtlKey, CamusWrapper>() {@Overridepublic void write(IEtlKey ignore, CamusWrapper data) throws IOException {String record = ((String)data.getRecord() + "\n");writer.write(record.getBytes());}@Overridepublic void close(TaskAttemptContext arg0) throws IOException, InterruptedException {writer.close();}};}}