Cascading-hive : Hive Flow runs smoothly without any error but I do not see the data in the sink table.

47 views
Skip to first unread message

Nischay G

unread,
Dec 7, 2016, 4:46:34 AM12/7/16
to cascading-user
I am reading a csv from /tmp/input.csv and putting it into source table 1 : cascading_input . I see data in cascading_input hive table. i.e loaded from the csv on hdfs.
I am then using HiveFlow to query the same table and put it in a new table : cascading_output. There is no data in cascading_output hive table.
The stack trace shows zero errors and says flow completed successfully. Any help would be really appreciated.


My code :

public class FileRead {

    @SuppressWarnings("rawtypes")
    public static void main(String[] args) throws Exception {
        final JobConf conf = new JobConf();

        conf.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"));
        conf.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
        conf.addResource(new Path("/etc/hive/conf/hive-site.xml"));

        conf.setUser("hadoop");

        List<Flow> queryFlows = new ArrayList<Flow>();
        Properties properties_act = new HfsProps().setTemporaryDirectory("/tmp/cascading").setLocalModeScheme("hdfs").buildProperties(conf);

        Properties properties = AppProps.appProps().buildProperties(properties_act);

       /*
        *
        * I have set my properties here. for thrift URI, etc. Removed them for privacy concerns.
        *
        * */
        AppProps.setApplicationJarClass(properties, testRead.class);

        // Source Queries

        String feedbackQuery = "select id,store_id from cascading.cascading_input_1";
        String[] feedbackQueryList = {feedbackQuery };

        // TABLE FIELDS stuff
        final String[] FEEDBACK_FIELDS = new String[] {"id", "store_id" };
        final String[] FEEDBACK_TYPES = new String[] {"bigint", "string" };

        final String[] SINK_FEEDBACK_FIELDS = new String[] {"id", "store_id" };
        final String[] SINK_FEEDBACK_TYPES = new String[] {"bigint", "string" };

        Hadoop2MR1FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties);

        HiveTableDescriptor sourceTableDesc1 = new HiveTableDescriptor("cascading", "cascading_input_1", FEEDBACK_FIELDS, FEEDBACK_TYPES);

        HiveTap sourceTap1 = new HiveTap(sourceTableDesc1, sourceTableDesc1.toScheme(), UPDATE, false);

        Tap input = new Hfs(new TextDelimited(new Fields(FEEDBACK_FIELDS), ","), "/tmp/input.csv");

        Pipe copyPipe = new Pipe("Copy Pipe");

        queryFlows.add(flowConnector.connect("Flow number 1", input, sourceTap1, copyPipe));
        // Sink Tables

        HiveTableDescriptor sinkFeedbackDesc =
                new HiveTableDescriptor("cascading", "cascading_output_1", SINK_FEEDBACK_FIELDS, SINK_FEEDBACK_TYPES);
        HiveTap sinkFeedbackTap = new HiveTap(sinkFeedbackDesc, sinkFeedbackDesc.toScheme(), UPDATE, false);

        // HIVE FLOWS

        Map<String, String> props_hive_flow = new HashMap<String, String>();
        properties.putAll(props_hive_flow);

        HiveFlow flow_1 =
                new HiveFlow("sinkFeedbackFlow", feedbackQueryList, Arrays.<Tap> asList(sourceTap1), sinkFeedbackTap, props_hive_flow);

        queryFlows.add(flow_1);

        CascadeConnector connector = new CascadeConnector(properties);
        Cascade cascade = connector.connect(queryFlows.toArray(new Flow[0]));
        cascade.complete();
        System.out.println("*** PROCESS COMPLETE *** ");
    }
}
Reply all
Reply to author
Forward
0 new messages