public static void main(String ... args) throws IOException {
Configuration hadoopConfiguration = new Configuration();
Properties properties = new Properties();
CascadingUtils.configureYarnQueue(properties, hadoopConfiguration);
OrcTapFactory orcTapFactory = new OrcTapFactory();
Pipe pipeInAvro = new Pipe("inAvro");
Pipe pipeInOrc = new Pipe("inOrc");
String nomFichierAvro = "/projets/avroin/archive.avro";
Tap inAvro = new Hfs(new AvroScheme(SCHEMA_JSON),nomFichierAvro);
String inOrcPath = "/projets/orcin";
Tap inOrc = orcTapFactory.createSource(FIELDS_OUT, inOrcPath);
String outDirectory = "/projets/out";
Tap out = new Hfs(new TextDelimited(FIELDS_OUT,";"),outDirectory);
SubAssembly assembly = new CoGroupAssembly(pipeInOrc, pipeInAvro);
FlowDef flowDef = FlowDef.flowDef()
.setName("TestCoGroup")
.setRunID("TestCoGroup")
.addSource(pipeInAvro, inAvro)
.addSource(pipeInOrc, inOrc)
.addTailSink(assembly, out);
properties.setProperty(FlowRuntimeProps.GATHER_PARTITIONS,args[0]);
new Hadoop2TezFlowConnector(properties).connect(flowDef).complete();
long result = 0L;
FileSystem fs = FileSystem.get(hadoopConfiguration);
RowFileCounter rowFileCounter = new RowFileCounter(outDirectory, fs);
Map<String,RowFileCounter.EntityContent> countResult = rowFileCounter.count(false, null, false);
result = rowFileCounter.sumEntityContents(countResult);
System.out.println("Nombre de lignes dans le fichier de sortie : " + result);
exit(0);
}
private static class CoGroupAssembly extends SubAssembly {
public CoGroupAssembly(Pipe inOrc, Pipe inAvro) {
setPrevious(inOrc, inAvro);
inOrc = new CoGroup(inOrc, new Fields("id"), inAvro, new Fields(FIELD_ID));
inOrc = new Retain(inOrc,FIELDS_OUT);
setTails(inOrc);
}
}