java.io.NotSerializableException When executing a flow with source as JDBCTap

190 views
Skip to first unread message

Jitendra

unread,
Aug 6, 2010, 1:59:37 AM8/6/10
to cascading-user
Hi,
I am quite new to cascading. I am trying to create a simple flow with
table in DB as source and textline file as sink.
I am applying Concatenate function to concatenate url_id and url_addr
and write this result to file.

My code is as follows.

package root;

import java.util.Properties;

public class CustomDbTap
{
private String tableName = "url_tracker";
private static final String[] _urlsSinkColumnNames = {"url_id",
"url_addr","url_status","url_crawl_depth"};
private static final String[] _urlsSinkColumnDefs = {"INT",
"VARCHAR(255)", "VARCHAR(45)", "INTEGER"};
private String[] primaryKeys = {"url_id"};
private String driver = "com.mysql.jdbc.Driver";
public static final Fields FIELDS = new Fields("URL_ID",
"URL_ADDR", "URL_STATUS", "URL_CRAWL_DEPTH");

public Tap createDbTap(String connectionUrl,String user,String
password){
TableDesc tableDesc = new TableDesc( tableName,
_urlsSinkColumnNames, _urlsSinkColumnDefs, primaryKeys );
Tap urlsTap = new JDBCTap( connectionUrl, driver, tableDesc,
new JDBCScheme( FIELDS, _urlsSinkColumnNames));
return urlsTap;
}

private class Concatenate extends BaseOperation implements
Function {

public Concatenate(){
super(new Fields("line"));
}
@Override
public void operate(FlowProcess arg0, FunctionCall arg1) {
TupleEntry args = arg1.getArguments();
int url_id = args.getInteger(1);
String url_addr = args.getString(2);
System.out.println(url_addr);
Tuple tuple = new Tuple();
tuple.add(url_id+" - "+url_addr);
arg1.getOutputCollector().add(tuple);

}

}

public static void main(String[] args){
CustomDbTap obj = new CustomDbTap();
Pipe pipe = new Pipe("url pipe");
pipe = new Each(pipe, obj.new Concatenate(),FIELDS.RESULTS);
Tap sourceTap = obj.createDbTap("jdbc:mysql://<connection url>:
3306/test", "user","pass");
Tap sinkTap = new Hfs (new TextLine(new Fields("line")),"/tmp/
bixo");
Properties props = new Properties();

FlowConnector.setApplicationJarClass(props,CustomDbTap.class );
FlowConnector con = new FlowConnector(props);
Flow flow = con.connect(sourceTap, sinkTap,pipe );
flow.complete();

}
}

On executing it, It gives following Exception. I am pasting few lines
of exception.

Exception in thread "main" cascading.flow.FlowException: unhandled
exception
at cascading.flow.Flow.complete(Flow.java:699)
at root.CustomDbTap.main(CustomDbTap.java:70)
Caused by: java.io.NotSerializableException: root.CustomDbTap
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:
1156)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:
1509)

Can any one point out, where am I wrong.?
It would be of great help.

Thanks
Jitendra

Jitendra

unread,
Aug 6, 2010, 2:53:12 AM8/6/10
to cascading-user
Sorry, I got the problem.
I created a seperate public class Concatenate() and used "new
Concatenate()" in place of "obj.new Concatenate()" in line
pipe = new Each(pipe, obj.new Concatenate(),FIELDS.RESULTS);

This worked.
Thanks
Reply all
Reply to author
Forward
0 new messages