Unable to write to two sinks

48 views
Skip to first unread message

jnkrish

unread,
Oct 6, 2015, 1:07:27 AM10/6/15
to cascading-user
Hello,

 I am trying to create two outputs but getting an error as below:  could not build flow from assembly

Can someone please let me know what could be the issue with the flow? Thanks



String inputPathMaster = args[0];

String inputPathDelta = args[1];

String inputPathHistMaster = args[2];

String outputPath = args[3];

String outputHistPath = args[4];

DateFormat df = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
Date dateobj = new Date();



Fields deltaFields = new Fields("id","name","dept","dept_id","state").applyTypes(long.class, String.class,String.class,String.class,String.class);

Fields masterFields = new Fields("id","name","dept","dept_id","state","time").applyTypes(long.class, String.class,String.class,String.class,String.class,Date.class);
Tap inTapMaster = new FileTap(new TextDelimited(masterFields), inputPathMaster);

Tap inTapHistMaster = new FileTap(new TextDelimited(masterFields), inputPathHistMaster);

Tap inTapDelta = new FileTap(new TextDelimited(deltaFields), inputPathDelta);

Pipe masterPipe = new Pipe("masterPipe");


Pipe deltaPipe = new Pipe("deltaPipe");

deltaPipe = new Each(deltaPipe, new Insert(new Fields("time"), df.format(dateobj)), Fields.ALL);

// Curr Master File
Pipe mergePipe = new Merge(masterPipe,deltaPipe);

Pipe groupPipe = new Pipe("groupPipe", mergePipe);

groupPipe = new GroupBy(groupPipe, new Fields("id"),new Fields("time"),true);

groupPipe = new Every(groupPipe, new First(), Fields.ALL);

// Hist Master File
Pipe histMasterPipe = new Pipe("histMasterPipe");
Pipe joinPipe = new Pipe ("joinPipe");
Fields common = new Fields( "id" );
Fields declared = new Fields("id1", "name1", "dept1", "dept_id1", "state1", "time1","id2", "name2", "dept2", "dept_id2", "state2", "time2");
joinPipe = new CoGroup(histMasterPipe, common, deltaPipe, common, declared, new InnerJoin() );

Tap outTap = new FileTap(new TextDelimited(false, "\t"), outputPath, SinkMode.REPLACE);

Tap outHistTap = new FileTap(new TextDelimited(false, "\t"), outputHistPath, SinkMode.REPLACE);

// Creates the flow definition by connecting the taps and copy pipe
FlowDef flowDef = FlowDef.flowDef()
        .addSource(masterPipe, inTapMaster)
        .addSource(deltaPipe, inTapDelta)
        .addSource(histMasterPipe, inTapHistMaster)
        .setName("MasterRefresh")
        .addTailSink(joinPipe,outHistTap)
        .addTailSink(groupPipe, outTap);

// Creates a local planner for executing the flow
Properties properties = AppProps.appProps()
        .setName("part1")
        .buildProperties();
LocalFlowConnector flowConnector = new LocalFlowConnector(properties);

// Creates the flow using the flow definition and flow connector
Flow flow = flowConnector.connect(flowDef);
// Runs the flow
flow.complete();
LOG.info("Part 1 - Main" + "Refresh complete");

Error:
2015-10-06 00:02:59,691 INFO  [main] property.AppProps (AppProps.java:getAppID(162)) - using app.id: 905F40E4D22C4EA6AF36ADA914243A3C
2015-10-06 00:02:59,722 INFO  [main] management.CascadingServices (CascadingServices.java:loadProperties(165)) - loading properties: cascading/management/service.properties, from jar: file:/C:/Projects/Cascading/Cascading_Training/labs/JavaDeveloperTraining/common/driven.jar
Exception in thread "main" cascading.flow.planner.PlannerException: could not build flow from assembly: [type may not be null]
        at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:577)
        at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:108)
        at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:40)
        at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
        at devtraining.Main.main(Main.java:462)
Caused by: java.lang.IllegalArgumentException: type may not be null
        at cascading.tuple.Fields.setType(Fields.java:1747)
        at cascading.tuple.Fields.select(Fields.java:1095)
        at cascading.pipe.Splice.createJoinFields(Splice.java:1025)
        at cascading.pipe.Splice.outgoingScopeFor(Splice.java:997)
        at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:628)
        at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:610)
        at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:95)

Ken Krugler

unread,
Oct 6, 2015, 9:02:29 AM10/6/15
to cascadi...@googlegroups.com
I haven't verified, but I notice that you insert a new field called "time" into the deltaPipe, but you didn't provide a type for that field.

deltaPipe = new Each(deltaPipe, new Insert(new Fields("time"), df.format(dateobj)), Fields.ALL);

-- Ken


From: jnkrish

Sent: October 5, 2015 10:07:26pm PDT

To: cascading-user

Subject: Unable to write to two sinks


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/27db6da9-4c37-44f3-b6e4-641cafa22f0b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr







--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





jnkrish

unread,
Oct 6, 2015, 12:49:49 PM10/6/15
to cascading-user
Ken,

 Thanks a lot :) It worked well.  
Reply all
Reply to author
Forward
0 new messages