Jitendra
unread,Aug 6, 2010, 1:59:37 AM8/6/10Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Sign in to report message
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to cascading-user
Hi,
I am quite new to cascading. I am trying to create a simple flow with
table in DB as source and textline file as sink.
I am applying Concatenate function to concatenate url_id and url_addr
and write this result to file.
My code is as follows.
package root;
import java.util.Properties;
public class CustomDbTap
{
private String tableName = "url_tracker";
private static final String[] _urlsSinkColumnNames = {"url_id",
"url_addr","url_status","url_crawl_depth"};
private static final String[] _urlsSinkColumnDefs = {"INT",
"VARCHAR(255)", "VARCHAR(45)", "INTEGER"};
private String[] primaryKeys = {"url_id"};
private String driver = "com.mysql.jdbc.Driver";
public static final Fields FIELDS = new Fields("URL_ID",
"URL_ADDR", "URL_STATUS", "URL_CRAWL_DEPTH");
public Tap createDbTap(String connectionUrl,String user,String
password){
TableDesc tableDesc = new TableDesc( tableName,
_urlsSinkColumnNames, _urlsSinkColumnDefs, primaryKeys );
Tap urlsTap = new JDBCTap( connectionUrl, driver, tableDesc,
new JDBCScheme( FIELDS, _urlsSinkColumnNames));
return urlsTap;
}
private class Concatenate extends BaseOperation implements
Function {
public Concatenate(){
super(new Fields("line"));
}
@Override
public void operate(FlowProcess arg0, FunctionCall arg1) {
TupleEntry args = arg1.getArguments();
int url_id = args.getInteger(1);
String url_addr = args.getString(2);
System.out.println(url_addr);
Tuple tuple = new Tuple();
tuple.add(url_id+" - "+url_addr);
arg1.getOutputCollector().add(tuple);
}
}
public static void main(String[] args){
CustomDbTap obj = new CustomDbTap();
Pipe pipe = new Pipe("url pipe");
pipe = new Each(pipe, obj.new Concatenate(),FIELDS.RESULTS);
Tap sourceTap = obj.createDbTap("jdbc:mysql://<connection url>:
3306/test", "user","pass");
Tap sinkTap = new Hfs (new TextLine(new Fields("line")),"/tmp/
bixo");
Properties props = new Properties();
FlowConnector.setApplicationJarClass(props,CustomDbTap.class );
FlowConnector con = new FlowConnector(props);
Flow flow = con.connect(sourceTap, sinkTap,pipe );
flow.complete();
}
}
On executing it, It gives following Exception. I am pasting few lines
of exception.
Exception in thread "main" cascading.flow.FlowException: unhandled
exception
at cascading.flow.Flow.complete(Flow.java:699)
at root.CustomDbTap.main(CustomDbTap.java:70)
Caused by: java.io.NotSerializableException: root.CustomDbTap
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:
1156)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:
1509)
Can any one point out, where am I wrong.?
It would be of great help.
Thanks
Jitendra