Cascading-JDBC - Is reading data from MySQL server to another MySQL server supported?

71 views
Skip to first unread message

Kishor Baindoor

unread,
May 24, 2016, 5:21:23 AM5/24/16
to cascading-user
Hi,

I was trying to write sample code, to read data from one MySQL server and to load it into table in another mysql server. 

Job is completing successfully, but target table is not loaded with any record. If I declare primary key in target table then it fails with error "Duplicate value for primay key" even though there are no duplicates.

In below code, in case if I keep same mysql server for both source and target it completes successfully. If I keep two different server for source and target, then it doesn't work.

It will be really helpful if someone can help me in solving it.



public class TableToTable {

public static void main(String[] args) throws SQLException, ClassNotFoundException, IOException {

// String jdbcurl1 ="jdbc:mysql://ip1:3306/cascading_jdbc?user=root&password=mysql";
String jdbcurl1 ="jdbc:mysql://ip2:3306/cascading_jdbc?user=root&password=mysql";
String jdbcurl2 ="jdbc:mysql://ip1:3306/cascading_jdbc?user=root&password=mysql";
// String jdbcurl2 ="jdbc:mysql://ip2:3306/cascading_jdbc?user=root&password=mysql";


 String driverName="com.mysql.jdbc.Driver";


 Class<? extends DBInputFormat> inputFormatClass = MySqlDBInputFormat.class;

 String TESTING_TABLE_NAME_SOURCE = "testingtable12";
 String TESTING_TABLE_NAME_TARGET = "testingtable13";
 
//    Class.forName( driverName );
//    Connection connection = DriverManager.getConnection( jdbcurl1 );
//    connection.setAutoCommit( true );
       
   Fields fields = new Fields( new Comparable[]{"num", "lwr", "upr"}, new Type[]{int.class, String.class, String.class} );
   Pipe parsePipe = new Pipe( "insert" );
   String[] columnNames = {"num", "lwr", "upr"};
   String[] columnDefs = {"INT NOT NULL", "VARCHAR(100) NOT NULL", "VARCHAR(100) NOT NULL"};
//    String[] primaryKeys = {"num", "lwr"};
   String[] primaryKeys = null;
   
   TableDesc tableDescS = new TableDesc( TESTING_TABLE_NAME_SOURCE, columnNames, columnDefs, primaryKeys );
   JDBCScheme schemeS = new JDBCScheme(inputFormatClass, fields, columnNames );
   JDBCTap sourceTap = new JDBCTap( jdbcurl1, driverName, tableDescS, schemeS,SinkMode.REPLACE);
   sourceTap.setBatchSize( 1 );
   
   TableDesc tableDescT = new TableDesc( TESTING_TABLE_NAME_TARGET, columnNames, columnDefs, primaryKeys );
   JDBCScheme schemeT = new JDBCScheme(inputFormatClass, fields, columnNames );
   JDBCTap targetTapT = new JDBCTap( jdbcurl2, driverName, tableDescT, schemeT, SinkMode.REPLACE);
   targetTapT.setBatchSize( 1 );

   Flow<?> parseFlow = new Hadoop2MR1FlowConnector().connect( sourceTap, targetTapT, parsePipe );
   parseFlow.complete();

}
}

Andre Kelpe

unread,
May 24, 2016, 6:06:38 AM5/24/16
to cascading-user
Can you share the logs of what happens with 2 servers? I don't see,
why it would not work tbh.

- André
> --
> You received this message because you are subscribed to the Google Groups
> "cascading-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cascading-use...@googlegroups.com.
> To post to this group, send email to cascadi...@googlegroups.com.
> Visit this group at https://groups.google.com/group/cascading-user.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/cascading-user/5501311d-507b-4f4d-9864-29cf22847f0c%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
André Kelpe
an...@concurrentinc.com
http://concurrentinc.com

Kishor Baindoor

unread,
May 24, 2016, 7:18:17 AM5/24/16
to cascading-user
Hi André ,

Thanks for reply. Please check below log. There is no error message, but target table is not loaded with any records. Same code loads the target table if it's same mySql server.




Log:
16/05/24 16:35:14 WARN jdbc.JDBCTap: using sink mode: REPLACE, consider UPDATE to prevent DROP TABLE from being called during Flow or Cascade setup
16/05/24 16:35:14 WARN jdbc.JDBCTap: using sink mode: REPLACE, consider UPDATE to prevent DROP TABLE from being called during Flow or Cascade setup
16/05/24 16:35:15 INFO util.Util: resolving application jar from found main method on: packageName.TableToTable
16/05/24 16:35:15 INFO planner.HadoopPlanner: using application jar: null
16/05/24 16:35:15 INFO property.AppProps: using app.id: DB0C83B3DAF441908A3D402BBB8B144B
16/05/24 16:35:16 INFO flow.Flow: [insert] executed rule registry: MapReduceHadoopRuleRegistry, completed as: SUCCESS, in: 00:00.144
16/05/24 16:35:16 INFO flow.Flow: [insert] rule registry: MapReduceHadoopRuleRegistry, supports assembly with steps: 1, nodes: 1
16/05/24 16:35:16 INFO flow.Flow: [insert] rule registry: MapReduceHadoopRuleRegistry, result was selected using: 'default comparator: selects plan with fewest steps and fewest nodes'
16/05/24 16:35:16 INFO jdbc.JDBCScheme: receiving final sink fields 'num', 'lwr', 'upr' | int, String, String
16/05/24 16:35:16 INFO Configuration.deprecation: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
16/05/24 16:35:17 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
16/05/24 16:35:17 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative
16/05/24 16:35:17 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
16/05/24 16:35:17 INFO jdbc.JDBCTap: creating connection: jdbc:mysql://ip2:3306/jdbc?user=root&password=root
16/05/24 16:35:18 INFO util.Version: Concurrent, Inc - Cascading 3.0.1
16/05/24 16:35:18 INFO flow.Flow: [insert] starting
16/05/24 16:35:18 INFO flow.Flow: [insert]  source: JDBCTap{connectionUrl='jdbc:mysql://ip1:3306/jdbc?user=root&password=root', driverClassName='com.mysql.jdbc.Driver', tableDesc=TableDesc{tableName='testingtable12', columnNames=[num, lwr, upr], columnDefs=[INT NOT NULL, VARCHAR(100) NOT NULL, VARCHAR(100) NOT NULL], primaryKeys=[num]}}
16/05/24 16:35:18 INFO flow.Flow: [insert]  sink: JDBCTap{connectionUrl='jdbc:mysql://ip2:3306/jdbc?user=root&password=root', driverClassName='com.mysql.jdbc.Driver', tableDesc=TableDesc{tableName='testingtable13', columnNames=[num, lwr, upr], columnDefs=[INT NOT NULL, VARCHAR(100) NOT NULL, VARCHAR(100) NOT NULL], primaryKeys=[num]}}
16/05/24 16:35:18 INFO flow.Flow: [insert]  parallel execution of steps is enabled: false
16/05/24 16:35:18 INFO flow.Flow: [insert]  executing total steps: 1
16/05/24 16:35:18 INFO flow.Flow: [insert]  allocating management threads: 1
16/05/24 16:35:18 INFO jdbc.JDBCTap: creating connection: jdbc:mysql://ip2:3306/jdbc?user=root&password=root
16/05/24 16:35:18 INFO jdbc.JDBCTap: creating connection: jdbc:mysql://ip2:3306/jdbc?user=root&password=root
16/05/24 16:35:18 INFO jdbc.JDBCTap: creating connection: jdbc:mysql://ip2:3306/jdbc?user=root&password=root
16/05/24 16:35:18 INFO jdbc.JDBCUtil: executing update: CREATE TABLE testingtable13 ( num INT NOT NULL, lwr VARCHAR(100) NOT NULL, upr VARCHAR(100) NOT NULL, PRIMARY KEY( num ) )
16/05/24 16:35:18 INFO jdbc.JDBCTap: creating connection: jdbc:mysql://ip2:3306/jdbc?user=root&password=root
16/05/24 16:35:18 INFO flow.Flow: [insert] starting step: (1/1) ...43-4bb3-bb14-3a9b4f304364
16/05/24 16:35:18 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/24 16:35:18 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/24 16:35:19 WARN mapreduce.JobSubmitter: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
16/05/24 16:35:19 INFO db.DBConfiguration: opening db connection: jdbc:mysql://ip1:3306/jdbc?user=root&password=root
16/05/24 16:35:19 INFO db.DBInputFormat: creating DB input split with start: 0, end: 2, chunks: 1
16/05/24 16:35:19 INFO mapreduce.JobSubmitter: number of splits:1
16/05/24 16:35:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local828310614_0001
16/05/24 16:35:20 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/05/24 16:35:20 INFO flow.Flow: [insert] submitted hadoop job: job_local828310614_0001
16/05/24 16:35:20 INFO flow.Flow: [insert] tracking url: http://localhost:8080/
16/05/24 16:35:20 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/05/24 16:35:20 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
16/05/24 16:35:20 WARN output.FileOutputCommitter: Output Path is null in setupJob()
16/05/24 16:35:20 INFO mapred.LocalJobRunner: Waiting for map tasks
16/05/24 16:35:20 INFO mapred.LocalJobRunner: Starting task: attempt_local828310614_0001_m_000000_0
16/05/24 16:35:20 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
16/05/24 16:35:20 INFO mapred.Task:  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@5463cd7b
16/05/24 16:35:20 INFO mapred.MapTask: Processing split: cascading.tap.hadoop.io.MultiInputSplit@1ae05b3c
16/05/24 16:35:20 INFO db.DBConfiguration: opening db connection: jdbc:mysql://ip1:3306/jdbc?user=root&password=root
16/05/24 16:35:20 INFO db.DBInputFormat: Executing select query
16/05/24 16:35:20 INFO db.DBInputFormat: SELECT num, lwr, upr FROM testingtable12 testingtable12
16/05/24 16:35:20 INFO mapred.MapTask: numReduceTasks: 0
16/05/24 16:35:20 INFO db.DBConfiguration: opening db connection: jdbc:mysql://ip1:3306/jdbc?user=root&password=root
16/05/24 16:35:20 INFO hadoop.FlowMapper: cascading version: 3.0.1
16/05/24 16:35:20 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
16/05/24 16:35:21 INFO hadoop.FlowMapper: flow node id: B967B6FE2E234E67B363DDCE738EC22A, ordinal: 0
16/05/24 16:35:21 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/05/24 16:35:21 INFO hadoop.FlowMapper: sourcing from: JDBCTap{connectionUrl='jdbc:mysql://ip1:3306/jdbc?user=root&password=root', driverClassName='com.mysql.jdbc.Driver', tableDesc=TableDesc{tableName='testingtable12', columnNames=[num, lwr, upr], columnDefs=[INT NOT NULL, VARCHAR(100) NOT NULL, VARCHAR(100) NOT NULL], primaryKeys=[num]}}
16/05/24 16:35:21 INFO hadoop.FlowMapper: sinking to: JDBCTap{connectionUrl='jdbc:mysql://ip2:3306/jdbc?user=root&password=root', driverClassName='com.mysql.jdbc.Driver', tableDesc=TableDesc{tableName='testingtable13', columnNames=[num, lwr, upr], columnDefs=[INT NOT NULL, VARCHAR(100) NOT NULL, VARCHAR(100) NOT NULL], primaryKeys=[num]}}
16/05/24 16:35:21 INFO hadoop.FlowMapper: flow node id: B967B6FE2E234E67B363DDCE738EC22A, mem on start (mb), free: 94, total: 116, max: 1365
16/05/24 16:35:21 INFO db.DBOutputFormat: executing batch [totstmts: 1][crntstmts: 1][batch: 1]
16/05/24 16:35:21 INFO db.DBOutputFormat: records:1
16/05/24 16:35:21 INFO db.DBOutputFormat: executing batch [totstmts: 2][crntstmts: 1][batch: 1]
16/05/24 16:35:21 INFO db.DBOutputFormat: records:1
16/05/24 16:35:21 INFO hadoop.FlowMapper: flow node id: B967B6FE2E234E67B363DDCE738EC22A, mem on close (mb), free: 92, total: 116, max: 1365
16/05/24 16:35:21 INFO mapred.LocalJobRunner: 
16/05/24 16:35:21 INFO mapred.Task: Task:attempt_local828310614_0001_m_000000_0 is done. And is in the process of committing
16/05/24 16:35:21 INFO mapred.LocalJobRunner: map
16/05/24 16:35:21 INFO mapred.Task: Task 'attempt_local828310614_0001_m_000000_0' done.
16/05/24 16:35:21 INFO mapred.LocalJobRunner: Finishing task: attempt_local828310614_0001_m_000000_0
16/05/24 16:35:21 INFO mapred.LocalJobRunner: map task executor complete.
16/05/24 16:35:21 WARN output.FileOutputCommitter: Output Path is null in commitJob()
16/05/24 16:35:25 INFO flow.Flow: [insert]  completed in: 00:06.982

Andre Kelpe

unread,
May 24, 2016, 7:26:06 AM5/24/16
to cascading-user
Hmm, that does look like it works, are you sure you verified the
correct server? What happens if you have an intermediate tap on HDFS?

db1 -> file -> db2

Does that work correctly?

- André
> --
> You received this message because you are subscribed to the Google Groups
> "cascading-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cascading-use...@googlegroups.com.
> To post to this group, send email to cascadi...@googlegroups.com.
> Visit this group at https://groups.google.com/group/cascading-user.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/cascading-user/32385c15-153c-4c7b-b4cd-cb50f9a62d13%40googlegroups.com.
Message has been deleted
Message has been deleted

Kishor Baindoor

unread,
May 25, 2016, 10:17:47 AM5/25/16
to cascading-user
Yes. Server details are correct. It works fine for db1 -> file -> db2 .

If I use Hadoop2TezFlowConnector() , then it works well. Data is loaded into target table.

Currently I am using  Hadoop2MR1FlowConnector() as we are not using Tez for project. 
In this case instead of loading data into target table in target server (ip2), its loads data into source server(ip1). That's the reason I was seeing target table empty in target server.

Looks like it's creating single jobConf for single MR job and it creates just one connection instead of two.( as per code in cascading.jdbc.db.DBConfiguration).  If I add a groupBy pipe in between source Tap and target Tap it works fine as it creates 2 MR jobs. Each MR job will have one JobConf.

I thought of raising issue in cascading-jdbc project, but i wont be able to add test case having Hadoop2MR1FlowConnector() as cascading-hadoop2-mr1 jar is present.


Andre Kelpe

unread,
May 25, 2016, 10:53:28 AM5/25/16
to cascading-user
Indeed, the configuration object is only storing one url at a time. We
should split that into 2, one for the InputFormat and one for the
OutputFormat and then it will work.

https://github.com/Cascading/cascading-jdbc/blob/3.0/cascading-jdbc-core/src/main/java/cascading/jdbc/db/DBConfiguration.java#L69-L75

The same is true for username and password.

Feel free to give it a shot and send me a PR. Happy to review and
integrate. In any case, please file a gh issue for this.

- André
Reply all
Reply to author
Forward
0 new messages