Stack trace?
> I have attached the code snippet.
You’re using SinkMode.UPDATE, so the typical problem isn’t there. I assume you tried KEEP as well, and had the same problem?
With the stack trace, Chris Wensel could probably help.
— Ken
public static void insertIntoTable(RunEnvironment env, Properties properties, String targetTableDb, String targetTable,
String srcTableDb, String srcTable) throws Exception {
final String[] hiveFlowQuery = {"insert into " + targetTableDb + "." + targetTable + " select * from " +
srcTableDb +"."+srcTable };
LOGGER.info("Insert query.." + hiveFlowQuery);
final ETLTableSchema etlTableSchemaSink = ETLSchemaHelper.getInstance().getTableSchema(srcTableDb, srcTable);
final Hadoop2MR1FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties);
final HiveTap tapTempsource = TapFactory.getHiveTap(srcTableDb, srcTable);
final Pipe pipe = new Pipe("pipe_partition");
//have tried using both UPDATE & KEEP mode
final HiveTap sinkTap = TapFactory.getHiveTapSink(targetTableDb, targetTable,
etlTableSchemaSink.getFields(), etlTableSchemaSink.getTypes(), SinkMode.UPDATE);
/*final HiveTap sinkTap = TapFactory.getHiveTapSink(targetTableDb, targetTable,
etlTableSchemaSink.getFields(), etlTableSchemaSink.getTypes(), SinkMode.KEEP);*/
final HiveFlow flow2 =
new HiveFlow(hiveflow, hiveFlowQuery, Arrays.<Tap> asList(tapTempsource), sinkTap);
final FlowDef flowdef2 = FlowDef.flowDef();
flowdef2.addSource(pipe, tapTempsource).addTailSink(pipe, sinkTap);
final Flow<?> flow3 = flowConnector.connect(flowdef2);
flow2.complete();
flow3.complete();
}
--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378