package Projectgraph.engine.cascading.assembly; import cascading.jdbc.db.DBInputFormat; import cascading.jdbc.RedshiftScheme; import cascading.jdbc.RedshiftTableDesc; import cascading.jdbc.RedshiftTap; import java.io.IOException; import java.lang.reflect.Type; import java.sql.SQLException; import java.util.Date; import cascading.flow.Flow; import cascading.flow.tez.Hadoop2TezFlowConnector; import cascading.pipe.Pipe; import cascading.scheme.hadoop.TextDelimited; import cascading.tap.SinkMode; import cascading.tap.Tap; import cascading.tap.hadoop.Hfs; import cascading.tuple.Fields; import cascading.tuple.type.DateType; public class RedshiftTableUpdate { public static void main(String[] args) throws SQLException, ClassNotFoundException, IOException { String jdbcurlRedshift = "jdbc:postgresql://52.7.203.21:5439"; String userNameRedshift = "kishorb"; String passwordRedshift = "password"; String databaseRedshift = "dev"; String inputFile = "../Projectgraph.engine.command-line/testData/Input/redshiftInputWithTimeStamp"; // String driverName="RedshiftTap.DB_DRIVER"; String driverName = "org.postgresql.Driver"; /** The input format class to use. */ Class inputFormatClass = DBInputFormat.class; String TESTING_TABLE_NAME = "targettable4"; Class.forName(driverName); // Connection connection = DriverManager.getConnection( jdbcurl ); // connection.setAutoCommit( false ); // DateType dataType = new DateType("yyyy-MM-dd"); Tap source = new Hfs(new TextDelimited(new Fields("f1", "f2", "f3", "f4").applyTypes(String.class,dataType,Date.class,String.class), ","), inputFile); Fields fields = new Fields( new Comparable[] { "f1", "f2", "f3", "f4" }, new Type[] { String.class, dataType, Date.class, String.class }); Pipe parsePipe = new Pipe("insert"); String[] columnNames = { "f1", "f2", "f3", "f4"}; String[] columnDefs = { "VARCHAR(20) NOT NULL", "DATE NOT NULL", "TIMESTAMP NOT NULL", "VARCHAR(20) NOT NULL" }; RedshiftTableDesc redshiftTableDesc = new RedshiftTableDesc( TESTING_TABLE_NAME, columnNames, columnDefs, null, null); RedshiftScheme redshiftScheme = new RedshiftScheme(inputFormatClass, fields, columnNames); RedshiftTap redshiftTap = new RedshiftTap(jdbcurlRedshift + "/" + databaseRedshift, userNameRedshift, passwordRedshift, null, null, redshiftTableDesc, redshiftScheme, SinkMode.REPLACE, false, true); redshiftTap.setBatchSize(1); Flow parseFlow = new Hadoop2TezFlowConnector().connect(source, redshiftTap, parsePipe); parseFlow.complete(); } }