String inputPathMaster = args[0];
String inputPathDelta = args[1];
String inputPathHistMaster = args[2];
String outputPath = args[3];
String outputHistPath = args[4];
DateFormat df = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
Date dateobj = new Date();
Fields deltaFields = new Fields("id","name","dept","dept_id","state").applyTypes(long.class, String.class,String.class,String.class,String.class);
Fields masterFields = new Fields("id","name","dept","dept_id","state","time").applyTypes(long.class, String.class,String.class,String.class,String.class,Date.class);
Tap inTapMaster = new FileTap(new TextDelimited(masterFields), inputPathMaster);
Tap inTapHistMaster = new FileTap(new TextDelimited(masterFields), inputPathHistMaster);
Tap inTapDelta = new FileTap(new TextDelimited(deltaFields), inputPathDelta);
Pipe masterPipe = new Pipe("masterPipe");
Pipe deltaPipe = new Pipe("deltaPipe");
deltaPipe = new Each(deltaPipe, new Insert(new Fields("time"), df.format(dateobj)), Fields.ALL);
// Curr Master File
Pipe mergePipe = new Merge(masterPipe,deltaPipe);
Pipe groupPipe = new Pipe("groupPipe", mergePipe);
groupPipe = new GroupBy(groupPipe, new Fields("id"),new Fields("time"),true);
groupPipe = new Every(groupPipe, new First(), Fields.ALL);
// Hist Master File
Pipe histMasterPipe = new Pipe("histMasterPipe");
Pipe joinPipe = new Pipe ("joinPipe");
Fields common = new Fields( "id" );
Fields declared = new Fields("id1", "name1", "dept1", "dept_id1", "state1", "time1","id2", "name2", "dept2", "dept_id2", "state2", "time2");
joinPipe = new CoGroup(histMasterPipe, common, deltaPipe, common, declared, new InnerJoin() );
Tap outTap = new FileTap(new TextDelimited(false, "\t"), outputPath, SinkMode.REPLACE);
Tap outHistTap = new FileTap(new TextDelimited(false, "\t"), outputHistPath, SinkMode.REPLACE);
// Creates the flow definition by connecting the taps and copy pipe
FlowDef flowDef = FlowDef.flowDef()
.addSource(masterPipe, inTapMaster)
.addSource(deltaPipe, inTapDelta)
.addSource(histMasterPipe, inTapHistMaster)
.setName("MasterRefresh")
.addTailSink(joinPipe,outHistTap)
.addTailSink(groupPipe, outTap);
// Creates a local planner for executing the flow
Properties properties = AppProps.appProps()
.setName("part1")
.buildProperties();
LocalFlowConnector flowConnector = new LocalFlowConnector(properties);
// Creates the flow using the flow definition and flow connector
Flow flow = flowConnector.connect(flowDef);
// Runs the flow
flow.complete();
LOG.info("Part 1 - Main" + "Refresh complete");
2015-10-06 00:02:59,691 INFO [main] property.AppProps (AppProps.java:getAppID(162)) - using
app.id: 905F40E4D22C4EA6AF36ADA914243A3C
2015-10-06 00:02:59,722 INFO [main] management.CascadingServices (CascadingServices.java:loadProperties(165)) - loading properties: cascading/management/service.properties, from jar: file:/C:/Projects/Cascading/Cascading_Training/labs/JavaDeveloperTraining/common/driven.jar
Exception in thread "main" cascading.flow.planner.PlannerException: could not build flow from assembly: [type may not be null]
at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:577)
at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:108)
at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:40)
at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
at devtraining.Main.main(Main.java:462)
Caused by: java.lang.IllegalArgumentException: type may not be null
at cascading.tuple.Fields.setType(Fields.java:1747)
at cascading.tuple.Fields.select(Fields.java:1095)
at cascading.pipe.Splice.createJoinFields(Splice.java:1025)
at cascading.pipe.Splice.outgoingScopeFor(Splice.java:997)
at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:628)
at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:610)
at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:95)