I am reading a csv from /tmp/input.csv and putting it into source table 1 : cascading_input . I see data in cascading_input hive table. i.e loaded from the csv on hdfs.
I am then using HiveFlow to query the same table and put it in a new table : cascading_output. There is no data in cascading_output hive table.
The stack trace shows zero errors and says flow completed successfully. Any help would be really appreciated.
My code :
public class FileRead {
@SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception {
final JobConf conf = new JobConf();
conf.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"));
conf.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
conf.addResource(new Path("/etc/hive/conf/hive-site.xml"));
conf.setUser("hadoop");
List<Flow> queryFlows = new ArrayList<Flow>();
Properties properties_act = new HfsProps().setTemporaryDirectory("/tmp/cascading").setLocalModeScheme("hdfs").buildProperties(conf);
Properties properties = AppProps.appProps().buildProperties(properties_act);
/*
*
* I have set my properties here. for thrift URI, etc. Removed them for privacy concerns.
*
* */
AppProps.setApplicationJarClass(properties, testRead.class);
// Source Queries
String feedbackQuery = "select id,store_id from cascading.cascading_input_1";
String[] feedbackQueryList = {feedbackQuery };
// TABLE FIELDS stuff
final String[] FEEDBACK_FIELDS = new String[] {"id", "store_id" };
final String[] FEEDBACK_TYPES = new String[] {"bigint", "string" };
final String[] SINK_FEEDBACK_FIELDS = new String[] {"id", "store_id" };
final String[] SINK_FEEDBACK_TYPES = new String[] {"bigint", "string" };
Hadoop2MR1FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties);
HiveTableDescriptor sourceTableDesc1 = new HiveTableDescriptor("cascading", "cascading_input_1", FEEDBACK_FIELDS, FEEDBACK_TYPES);
HiveTap sourceTap1 = new HiveTap(sourceTableDesc1, sourceTableDesc1.toScheme(), UPDATE, false);
Tap input = new Hfs(new TextDelimited(new Fields(FEEDBACK_FIELDS), ","), "/tmp/input.csv");
Pipe copyPipe = new Pipe("Copy Pipe");
queryFlows.add(flowConnector.connect("Flow number 1", input, sourceTap1, copyPipe));
// Sink Tables
HiveTableDescriptor sinkFeedbackDesc =
new HiveTableDescriptor("cascading", "cascading_output_1", SINK_FEEDBACK_FIELDS, SINK_FEEDBACK_TYPES);
HiveTap sinkFeedbackTap = new HiveTap(sinkFeedbackDesc, sinkFeedbackDesc.toScheme(), UPDATE, false);
// HIVE FLOWS
Map<String, String> props_hive_flow = new HashMap<String, String>();
properties.putAll(props_hive_flow);
HiveFlow flow_1 =
new HiveFlow("sinkFeedbackFlow", feedbackQueryList, Arrays.<Tap> asList(sourceTap1), sinkFeedbackTap, props_hive_flow);
queryFlows.add(flow_1);
CascadeConnector connector = new CascadeConnector(properties);
Cascade cascade = connector.connect(queryFlows.toArray(new Flow[0]));
cascade.complete();
System.out.println("*** PROCESS COMPLETE *** ");
}
}