[extrods] r253 committed - replication service

6 views
Skip to first unread message

codesite...@google.com

unread,
Nov 2, 2011, 2:19:51 AM11/2/11
to ext...@googlegroups.com
Revision: 253
Author: bing...@yahoo.com
Date: Tue Nov 1 23:18:38 2011
Log: replication service
http://code.google.com/p/extrods/source/detail?r=253

Added:
/trunk/CrossZoneRepl
/trunk/CrossZoneRepl/AvuMetaData.cpp
/trunk/CrossZoneRepl/AvuMetaData.h
/trunk/CrossZoneRepl/CrossZoneRepl.cpp
/trunk/CrossZoneRepl/Makefile
/trunk/CrossZoneRepl/crosszonerepl.cfg
/trunk/CrossZoneRepl/readme.txt
/trunk/CrossZoneRepl/run_CrossZoneRepl.sh
/trunk/CrossZoneRepl/utils.cpp
/trunk/CrossZoneRepl/utils.h

=======================================
--- /dev/null
+++ /trunk/CrossZoneRepl/AvuMetaData.cpp Tue Nov 1 23:18:38 2011
@@ -0,0 +1,126 @@
+#include "AvuMetaData.h"
+#include <iostream>
+#include <sstream>
+#include <fstream>
+#include <string>
+#include <vector>
+using namespace std;
+
+using namespace std;
+
+#define MAX_SQL 300
+#define BIG_STR 200
+
+int query_user_metadata(rcComm_t *conn, string& fileName,
vector<AuvMetaData>& userMetas) {
+ genQueryInp_t genQueryInp;
+ genQueryOut_t *genQueryOut;
+ int i1a[10];
+ int i1b[10];
+ int i2a[10];
+ char *condVal[10];
+ char v1[BIG_STR];
+ char v2[BIG_STR];
+
+ int t = fileName.rfind('/');
+ string parent_coll = fileName.substr(0, t);
+ string data_name = fileName.substr(t+1);
+
+ // cout << "parent_coll=" << parent_coll << endl;
+ // cout << "data_name=" << data_name << endl;
+
+ i1a[0]=COL_META_DATA_ATTR_NAME;
+ i1b[0]=0;
+ i1a[1]=COL_META_DATA_ATTR_VALUE;
+ i1b[1]=0;
+ i1a[2]=COL_META_DATA_ATTR_UNITS;
+ i1b[2]=0;
+
+ memset (&genQueryInp, 0, sizeof (genQueryInp_t));
+
+ genQueryInp.selectInp.inx = i1a;
+ genQueryInp.selectInp.value = i1b;
+ genQueryInp.selectInp.len = 3;
+
+ i2a[0]=COL_COLL_NAME;
+ sprintf(v1,"='%s'", parent_coll.c_str());
+ condVal[0]=v1;
+
+ i2a[1]=COL_DATA_NAME;
+ sprintf(v2,"='%s'", data_name.c_str());
+ condVal[1]=v2;
+
+ genQueryInp.sqlCondInp.inx = i2a;
+ genQueryInp.sqlCondInp.value = condVal;
+ genQueryInp.sqlCondInp.len=2;
+
+ genQueryInp.condInput.len=0;
+ genQueryInp.continueInx=0;
+
+ genQueryInp.maxRows= MAX_SQL_ROWS;
+ genQueryInp.continueInx=0;
+
+ bool stop_query = false;
+ while(!stop_query) {
+
+ t = rcGenQuery(conn, &genQueryInp, &genQueryOut);
+ if(t == CAT_NO_ROWS_FOUND) {
+ return 0;
+ }
+
+ if(t < 0) {
+ return t;
+ }
+
+ // get the AVUs
+ for(int i=0;i<genQueryOut->rowCnt;i++) {
+ AuvMetaData md;
+ md.att = genQueryOut->sqlResult[0].value +
i*genQueryOut->sqlResult[0].len;
+ md.val = genQueryOut->sqlResult[1].value +
i*genQueryOut->sqlResult[1].len;
+ md.unit = genQueryOut->sqlResult[2].value +
i*genQueryOut->sqlResult[2].len;
+ userMetas.push_back(md);
+ }
+
+ if(genQueryOut->continueInx > 0) {
+ genQueryInp.continueInx=genQueryOut->continueInx;
+ }
+ else {
+ freeGenQueryOut(&genQueryOut);
+ stop_query = true;
+ }
+ }
+
+ return 0;
+}
+
+static bool insideAuvArray(const AuvMetaData& auvMeta, const
vector<AuvMetaData>& auvMetaArray) {
+ for(int i=0; i< auvMetaArray.size(); i++) {
+ if((auvMeta.att == auvMetaArray[i].att) && (auvMeta.val ==
auvMetaArray[i].val)
+ && (auvMeta.unit == auvMetaArray[i].unit)) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void auv_array_rm_dups(vector<AuvMetaData>& newAuvArray, const
vector<AuvMetaData>& oldAuvArray) {
+ if(oldAuvArray.size() <= 0)
+ return;
+
+ if(newAuvArray.size() <= 0)
+ return;
+
+ bool done = false;
+ int i = 0;
+ while(!done) {
+ if(insideAuvArray(newAuvArray[i], oldAuvArray)) {
+ newAuvArray.erase(newAuvArray.begin() + i);
+ }
+ else {
+ i = i + 1;
+ }
+ if(i >= newAuvArray.size()) {
+ done = true;
+ }
+ }
+}
=======================================
--- /dev/null
+++ /trunk/CrossZoneRepl/AvuMetaData.h Tue Nov 1 23:18:38 2011
@@ -0,0 +1,24 @@
+#ifndef _AvuMetaData_h_
+#define _AvuMetaData_h_
+
+#include "rodsClient.h"
+#include "parseCommandLine.h"
+#include "rodsPath.h"
+#include "cpUtil.h"
+#include <string>
+#include <vector>
+
+using namespace std;
+
+class AuvMetaData {
+public:
+ string att;
+ string val;
+ string unit;
+};
+
+int query_user_metadata(rcComm_t *conn, string& fileName,
vector<AuvMetaData>& userMetas);
+void auv_array_rm_dups(vector<AuvMetaData>& newAuvArray, const
vector<AuvMetaData>& oldAuvArray);
+
+#endif
+
=======================================
--- /dev/null
+++ /trunk/CrossZoneRepl/CrossZoneRepl.cpp Tue Nov 1 23:18:38 2011
@@ -0,0 +1,699 @@
+/* The program is used to make replication across iRODS zones from UAL to
OCUL.
+ * Author: Bing Zhu
+ * Date Created: May 17, 2010
+ * Version 1.1
+ */
+
+#include <iostream>
+#include <sstream>
+#include <string>
+#include <algorithm>
+#include <vector>
+#include <fstream>
+
+#include "rodsClient.h"
+#include "parseCommandLine.h"
+#include "rodsPath.h"
+#include "cpUtil.h"
+#include "AvuMetaData.h"
+#include "utils.h"
+
+int log_fd;
+bool Debug = false;
+
+int totalNumFilesReplicated = 0;
+int totalNumFilesSkiped = 0;
+int totalNumFilesFailedToReplicate = 0;
+int totalNumFilesAddedNewMetadata = 0;
+
+using namespace std;
+
+class RodsEnvClass
+{
+public:
+ string irodsHost;
+ int irodsPort;
+ string irodsUserName;
+ string irodsUserPasswd;
+ string irodsDefResource;
+ string irodsZone;
+ string topCollection;
+};
+
+
+//=============================================================================
+
+#define BUFSIZE 20000
+
+static rcComm_t* connect_irods_server(RodsEnvClass & env);
+static void log_error_code(int errCode);
+static void print_config_info(RodsEnvClass & sEnv, RodsEnvClass & dEnvs);
+static int get_config(string& fname, RodsEnvClass & sEnv, RodsEnvClass &
dEnvs);
+static void close_irods_obj(rcComm_t *conn, int fd);
+static int open_irods_obj(rcComm_t *conn, string & obj_path, int
open_flag);
+static int read_irods_obj(rcComm_t *conn, int fd, char *buff, int len);
+static int create_irods_obj(rcComm_t *conn, string & obj_path, string &
resc);
+static int write_irods_obj(rcComm_t *conn, int fd, char *buff, int len);
+static void write_log(string& str);
+static void create_irods_dir(rcComm_t *conn, string & dir_name);
+static void debug_msg(string& msg);
+static void info_msg(string& msg);
+static int query_source_collection(rcComm_t *conn, RodsEnvClass& srcEnv,
string& topColl, bool recursive, vector<string>& src_files);
+static void print_coll_info(vector<string> & files);
+static void construct_dest_files(RodsEnvClass& srcEnv, vector<string>&
srcFiles, RodsEnvClass& destEnv, vector<string>& destFiles);
+static int replicate_single_file(rcComm_t *srcConn, string& srcFile,
rcComm_t *destConn, string& destFile, string& destResc);
+static void PrintUsage(void);
+static void write_log_and_exit(string& msg);
+static void program_start_msg();
+static void program_exit_msg();
+
+// ---------------------------------------
+// main function
+// ---------------------------------------
+int main(int argc, char **argv) {
+
+ RodsEnvClass srcEnv;
+ RodsEnvClass destEnv;
+
+ log_fd = 2; // stderr
+
+ program_start_msg();
+
+ vector<string> srcFilesToRepl;
+
+ rErrMsg_t errMsg;
+ rcComm_t *src_conn, *dest_conn;
+ string config_fname;
+
+ string msg;
+
+ int i, t;
+
+ string src_node_obj_path;
+
+ config_fname = "";
+ for(i=0;i <argc; i++) {
+ string ts = argv[i];
+ if(ts.find("-configfile=") != string::npos) {
+ config_fname = ts.substr(12);
+ }
+ }
+
+ if(config_fname.length() <= 0) {
+ PrintUsage();
+ msg = "failed to get config file name.\n";
+ program_exit_msg();
+ exit(1);
+ }
+
+ // read the content in config file
+ if(get_config(config_fname, srcEnv, destEnv) < 0) {
+ // error message is already logged.
+ program_exit_msg();
+ exit(1);
+ }
+
+ // tetsing the config info
+ if(Debug)
+ print_config_info(srcEnv, destEnv);
+
+ //////////////////////////////////////////////////////////////
+ // Start to make replicas
+ //////////////////////////////////////////////////////////////
+
+ // open connection to source iRODS server
+ msg = "connecting src irods server ...\n";
+ debug_msg(msg);
+ src_conn = connect_irods_server(srcEnv);
+ if(src_conn == NULL)
+ {
+ msg = string("ERROR: failed to connect to the source iRODS
server, ") + srcEnv.irodsHost + string(".\n");
+ write_log_and_exit(msg);
+ }
+ msg = "connection to source iRODS server established\n";
+ debug_msg(msg);
+
+ // open connection to destination iRODS server
+ msg = "connecting dest irods server ...\n";
+ debug_msg(msg);
+ dest_conn = connect_irods_server(destEnv);
+ if(dest_conn == NULL)
+ {
+ rcDisconnect(src_conn);
+ msg = string("ERROR: failed to connect to the destination iRODS
server, ") + destEnv.irodsHost + string(".\n");
+ write_log_and_exit(msg);
+ }
+ msg = "connection to destination iRODS server established\n";
+ debug_msg(msg);
+
+ // query the source collection
+ bool recusive = true;
+ if(query_source_collection(src_conn, srcEnv, srcEnv.topCollection,
recusive, srcFilesToRepl) < 0) {
+
+ rcDisconnect(src_conn);
+ rcDisconnect(dest_conn);
+
+ msg = string("ERROR: failed to query the source collection, ") +
srcEnv.topCollection + string(".\n");
+ write_log_and_exit(msg);
+ }
+
+ vector<string> destFilenamesWithPath;
+ construct_dest_files(srcEnv, srcFilesToRepl, destEnv,
destFilenamesWithPath);
+
+ /* check info. For Debug only
+ cerr << "src top: " << srcEnv.topCollection << endl;
+ print_coll_info(srcFilesToRepl);
+ cerr << "desc top: " << destEnv.topCollection << endl;
+ print_coll_info(destFilenamesWithPath);
+ */
+
+ for(i=0; i<srcFilesToRepl.size(); i++) {
+ t = replicate_single_file(src_conn, srcFilesToRepl[i], dest_conn,
destFilenamesWithPath[i], destEnv.irodsDefResource);
+ if(t < 0) {
+ log_error_code(t);
+ }
+ }
+
+ // disconnect from source iRODS
+ msg = "disconnecting from the src irods...\n";
+ debug_msg(msg);
+ rcDisconnect(src_conn);
+ rcDisconnect(dest_conn);
+
+ program_exit_msg();
+ return 0;
+}
+
+static int add_user_meta(rcComm_t *srcConn, string& srcFile, rcComm_t
*destConn, string& destFile, bool addMetaOnly) {
+
+ string msg;
+ size_t t;
+
+ vector<AuvMetaData> srcUserMetas;
+ t = query_user_metadata(srcConn, srcFile, srcUserMetas);
+ if(t < 0) {
+ return t;
+ }
+
+ vector<AuvMetaData> destUserMetas;
+ t = query_user_metadata(destConn, destFile, destUserMetas);
+ if(t < 0) {
+ return t;
+ }
+
+ auv_array_rm_dups(srcUserMetas, destUserMetas);
+
+ if(srcUserMetas.size() <= 0) {
+ return 0;
+ }
+
+ stringstream ss;
+ ss << "INFO: insering " << srcUserMetas.size() << " rows of metadata
into dest file," << destFile << ".\n";
+ msg = ss.str();
+ debug_msg(msg);
+
+ // adding medata into dest file.
+ for(int i=0; i<srcUserMetas.size(); i++) {
+ modAVUMetadataInp_t modAVUMetadataInp;
+ memset(&modAVUMetadataInp, 0, sizeof(modAVUMetadataInp_t));
+
+ char arg0[4];
+ char arg1[4];
+ char arg2[2048]; // full obj name
+ char arg3[1024]; // att name
+ char arg4[2048]; // att value
+ char arg5[1024]; // att unit
+
+ strcpy(arg0, "add");
+ strcpy(arg1, "-d");
+ strcpy(arg2, destFile.c_str());
+ strcpy(arg3, srcUserMetas[i].att.c_str());
+ strcpy(arg4, srcUserMetas[i].val.c_str());
+ if(srcUserMetas[i].unit.length() > 0)
+ strcpy(arg5, srcUserMetas[i].unit.c_str());
+ else
+ arg5[0] = '\0';
+
+ modAVUMetadataInp.arg0 = arg0;
+ modAVUMetadataInp.arg1 = arg1;
+ modAVUMetadataInp.arg2 = arg2;
+ modAVUMetadataInp.arg3 = arg3;
+ modAVUMetadataInp.arg4 = arg4;
+ modAVUMetadataInp.arg5 = arg5;
+ modAVUMetadataInp.arg6 = "";
+ modAVUMetadataInp.arg7 = "";
+ modAVUMetadataInp.arg8 = "";
+ modAVUMetadataInp.arg9 = "";
+ t = rcModAVUMetadata(destConn, &modAVUMetadataInp);
+ if(t < 0)
+ return t;
+ }
+
+ if(addMetaOnly) {
+ ++totalNumFilesAddedNewMetadata;
+ }
+
+ return 0;
+}
+
+static int replicate_single_file(rcComm_t *srcConn, string& srcFile,
rcComm_t *destConn, string& destFile, string& destResc) {
+
+ string msg = string("making replica for srcFile=") + srcFile +
string("destFile=") + destFile;
+ debug_msg(msg);
+
+ int src_fd, dest_fd;
+ char readbuff[BUFSIZE+1];
+ size_t t;
+
+ // check if the destFile already exist. If file exists, we don't need
to re-do file replication.
+ dest_fd = open_irods_obj(destConn, destFile, O_RDONLY);
+ if(dest_fd >= 0) {
+ close_irods_obj(destConn, dest_fd);
+ msg = string("INFO: the dest file already exists: ") + destFile +
string(".\n");
+ ++totalNumFilesSkiped;
+ debug_msg(msg);
+ t = add_user_meta(srcConn, srcFile, destConn, destFile, true);
+ return t;
+ }
+
+ // replicate the file to destination
+ src_fd = open_irods_obj(srcConn, srcFile, O_RDONLY);
+ if(src_fd < 0) {
+ msg = string("ERROR: failed to open input file for read in source
irods: ") + srcFile + string(".\n");
+ write_log(msg);
+ ++totalNumFilesFailedToReplicate;
+ return src_fd;
+ }
+
+ t = destFile.rfind('/');
+ if(t == string::npos) {
+ msg = string("ERROR: failed to extract parent collection from dest
file: ") + destFile + string(".\n");
+ write_log(msg);
+ ++totalNumFilesFailedToReplicate;
+ return 0;
+ }
+ string parent_coll = destFile.substr(0, t);
+ create_irods_dir(destConn, parent_coll);
+
+ // create the obj
+ dest_fd = create_irods_obj(destConn, destFile, destResc);
+ if(dest_fd < 0) {
+ msg = string("ERROR: failed to create file, ") + destFile +
string(".\n");
+ write_log(msg);
+ close_irods_obj(srcConn, src_fd);
+ ++totalNumFilesFailedToReplicate;
+ return dest_fd;
+ }
+
+ // read data loop
+ int bytesRead;
+ while((bytesRead = read_irods_obj(srcConn, src_fd, readbuff, BUFSIZE))
> 0) {
+ write_irods_obj(destConn, dest_fd, readbuff, bytesRead);
+ }
+
+ // close the objs and disconnect from dest node.
+ close_irods_obj(srcConn, src_fd);
+ close_irods_obj(destConn, dest_fd);
+
+ t = add_user_meta(srcConn, srcFile, destConn, destFile, false);
+
+ ++totalNumFilesReplicated;
+
+ return t;
+}
+
+static void create_irods_dir(rcComm_t *conn, string & dir_name)
+{
+ string msg;
+
+ msg = string("create_irods_dir(): parent dir=") + dir_name +
string(".\n");
+ debug_msg(msg);
+
+ collInp_t collCreateInp;
+ memset (&collCreateInp, 0, sizeof (collCreateInp));
+ addKeyVal(&collCreateInp.condInput, RECURSIVE_OPR__KW, "");
+ rstrcpy(collCreateInp.collName, (char *)dir_name.c_str(), MAX_NAME_LEN);
+ (void)rcCollCreate(conn, &collCreateInp);
+}
+
+static int write_irods_obj(rcComm_t *conn, int fd, char *buff, int len)
+{
+ int bytesWritten;
+
+ openedDataObjInp_t dataObjWriteInp;
+ bytesBuf_t dataObjWriteInpBBuf;
+
+ dataObjWriteInpBBuf.buf = buff;
+ dataObjWriteInpBBuf.len = len;
+ dataObjWriteInp.l1descInx = fd;
+
+ dataObjWriteInp.len = len;
+
+ bytesWritten = rcDataObjWrite (conn, &dataObjWriteInp,
&dataObjWriteInpBBuf);
+
+ return bytesWritten;
+}
+
+static int create_irods_obj(rcComm_t *conn, string & obj_path, string &
resc)
+{
+ dataObjInp_t dataObjCreateInp;
+
+ memset (&dataObjCreateInp, 0, sizeof (dataObjCreateInp));
+
+ strcpy(dataObjCreateInp.objPath, (char *)obj_path.c_str());
+ addKeyVal(&dataObjCreateInp.condInput, RESC_NAME_KW, (char
*)resc.c_str());
+ addKeyVal(&dataObjCreateInp.condInput, DATA_TYPE_KW, "generic");
+ addKeyVal(&dataObjCreateInp.condInput, RECURSIVE_OPR__KW, "");
+ dataObjCreateInp.createMode = 0750;
+ dataObjCreateInp.openFlags = O_WRONLY;
+ dataObjCreateInp.dataSize = -1;
+
+ int fd = rcDataObjCreate(conn, &dataObjCreateInp);
+
+ return fd;
+}
+
+static int read_irods_obj(rcComm_t *conn, int fd, char *buff, int len)
+{
+ openedDataObjInp_t DataObjReadInp;
+
+ bytesBuf_t DataObjReadOutBBuf;
+ DataObjReadOutBBuf.buf = buff;
+ DataObjReadOutBBuf.len = len;
+
+ DataObjReadInp.l1descInx = fd;
+ DataObjReadInp.len = len;
+ int n = rcDataObjRead(conn, &DataObjReadInp, &DataObjReadOutBBuf);
+
+ if(n <= 0)
+ return n;
+
+ return n;
+}
+
+static int open_irods_obj(rcComm_t *conn, string & obj_path, int open_flag)
+{
+ int fd = -1;
+ dataObjInp_t objOpenInp;
+ memset(&objOpenInp, 0, sizeof(objOpenInp));
+ objOpenInp.openFlags = open_flag; //O_RDONLY;
+ strcpy(objOpenInp.objPath, (char *)obj_path.c_str());
+ fd = rcDataObjOpen(conn, &objOpenInp);
+ return fd;
+}
+
+static void close_irods_obj(rcComm_t *conn, int fd)
+{
+ openedDataObjInp_t dataObjCloseInp;
+ memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
+ dataObjCloseInp.l1descInx = fd;
+ int t = rcDataObjClose (conn, &dataObjCloseInp);
+ if(t < 0) // just flag the error.
+ {
+ log_error_code(t);
+ }
+}
+
+static void write_log(string& str) {
+ write(log_fd, (char *)str.c_str(), str.length());
+}
+
+static void log_error_code(int errCode)
+{
+ char *errName = NULL;
+ char *errSubName = NULL;
+ char buff[2048];
+
+ errName = rodsErrorName(errCode, &errSubName);
+ sprintf(buff, "%s: %s", errName, errSubName);
+ write(log_fd, buff, strlen(buff));
+}
+
+static rcComm_t* connect_irods_server(RodsEnvClass & env)
+{
+ rcComm_t *src_conn = NULL;
+ rErrMsg_t errMsg;
+ int t;
+
+ src_conn = rcConnect ((char *)env.irodsHost.c_str(), env.irodsPort,
(char *)env.irodsUserName.c_str(),
+ (char *)env.irodsZone.c_str(), NO_RECONN,
&errMsg);
+ if(src_conn != NULL)
+ {
+ t = clientLoginWithPassword(src_conn, (char
*)env.irodsUserPasswd.c_str());
+ if(t < 0)
+ {
+ if(src_conn != NULL)
+ freeRcComm(src_conn);
+
+ log_error_code(t);
+ return NULL;
+ }
+ }
+
+ return src_conn;
+}
+
+static void print_env(RodsEnvClass & env)
+{
+ cout << " host:" << env.irodsHost << endl;
+ cout << " port:" << env.irodsPort << endl;
+ cout << " username:" << env.irodsUserName << endl;
+ cout << " password:" << env.irodsUserPasswd << endl;
+ cout << " resc:" << env.irodsDefResource << endl;
+ cout << " zone:" << env.irodsZone << endl;
+}
+
+static void print_config_info(RodsEnvClass & sEnv, RodsEnvClass & dEnvs)
+{
+ cout << "Debug=" << Debug << endl;;
+ cout << "=======the source irods===========\n";
+ print_env(sEnv);
+
+ cout << "=======the dest irods=============\n";
+ print_env(dEnvs);
+}
+
+static int get_config(string& fname, RodsEnvClass& srcEnv, RodsEnvClass&
destEnv) {
+ string line;
+ ifstream fin(fname.c_str());
+ if(!fin.is_open())
+ {
+ cerr << "ERROR: failed to open config file for read : " << fname <<
endl;
+ return -1;
+ }
+
+ string s;
+ while(fin.good()) {
+ getline(fin, line);
+
+ line = stringtrim(line);
+
+ if(line[0] == '#')
+ continue;
+
+ if(line.length() <= 0)
+ continue;
+
+ s = "Debug=";
+ if(line.find(s) != string::npos) {
+ string val = line.substr(s.length());
+ val = stringtrim(val);
+ if(val.compare("yes") == 0)
+ Debug = true;
+ }
+
+ s = "srcRodsHost=";
+ if(line.find(s) != string::npos) {
+ srcEnv.irodsHost = line.substr(s.length());
+ }
+
+ s = "srcRodsPort=";
+ if(line.find(s) != string::npos) {
+ string ts = line.substr(s.length());
+ srcEnv.irodsPort = atoi(ts.c_str());
+ }
+
+ s = "srcRodsDefResource=";
+ if(line.find(s) != string::npos) {
+ srcEnv.irodsDefResource = line.substr(s.length());
+ }
+
+ s = "srcRodsUserName=";
+ if(line.find(s) != string::npos) {
+ srcEnv.irodsUserName = line.substr(s.length());
+ }
+
+ s = "srcRodsUserPasswd=";
+ if(line.find(s) != string::npos) {
+ srcEnv.irodsUserPasswd = line.substr(s.length());
+ }
+
+ s = "srcRodsZone=";
+ if(line.find(s) != string::npos) {
+ srcEnv.irodsZone = line.substr(s.length());
+ }
+
+ s = "srcRodsSourceCollection=";
+ if(line.find(s) != string::npos) {
+ srcEnv.topCollection = line.substr(s.length());
+ }
+
+ s = "destRodsHost=";
+ if(line.find(s) != string::npos) {
+ destEnv.irodsHost = line.substr(s.length());
+ }
+
+ s = "destRodsPort=";
+ if(line.find(s) != string::npos) {
+ string ts = line.substr(s.length());
+ destEnv.irodsPort = atoi(ts.c_str());
+ }
+
+ s = "destRodsDefResource=";
+ if(line.find(s) != string::npos) {
+ destEnv.irodsDefResource = line.substr(s.length());
+ }
+
+ s = "destRodsUserName=";
+ if(line.find(s) != string::npos) {
+ destEnv.irodsUserName = line.substr(s.length());
+ }
+
+ s = "destRodsUserPasswd=";
+ if(line.find(s) != string::npos) {
+ destEnv.irodsUserPasswd = line.substr(s.length());
+ }
+
+ s = "destRodsZone=";
+ if(line.find(s) != string::npos) {
+ destEnv.irodsZone = line.substr(s.length());
+ }
+
+ s = "destRodsDestinationCollection=";
+ if(line.find(s) != string::npos) {
+ destEnv.topCollection = line.substr(s.length());
+ }
+ }
+ fin.close();
+}
+
+static void debug_msg(string& msg)
+{
+ if(Debug)
+ {
+ string dmsg = string("DEBUG: ") + msg;
+ write(log_fd, (char *)dmsg.c_str(), dmsg.length());
+ }
+}
+
+static void info_msg(string& msg)
+{
+ string infomsg = string("INFO: ") + msg;
+ debug_msg(infomsg);
+}
+
+static int query_source_collection(rcComm_t *conn,
+ RodsEnvClass& srcEnv, string& topColl, bool recursive,
+ vector<string>& src_files) {
+
+ char query_str[2048];
+ if(recursive) {
+ sprintf(query_str, "select COLL_NAME, DATA_NAME where COLL_NAME
like '%s%%'", topColl.c_str());
+ }
+ else {
+ sprintf(query_str, "select COLL_NAME, DATA_NAME where COLL_NAME
= '%s'", topColl.c_str());
+ }
+
+ genQueryInp_t genQueryInp;
+ memset (&genQueryInp, 0, sizeof (genQueryInp_t));
+ int t = fillGenQueryInpFromStrCond(query_str, &genQueryInp);
+ if(t < 0) {
+ return t;
+ }
+
+ genQueryOut_t *genQueryOut = NULL;
+
+ genQueryInp.maxRows= MAX_SQL_ROWS;
+ genQueryInp.continueInx=0;
+ t = rcGenQuery (conn, &genQueryInp, &genQueryOut);
+ if(t < 0) {
+ if(t == CAT_NO_ROWS_FOUND) // no file is found
+ return 0;
+
+ return t;
+ }
+
+ sqlResult_t *collNameStruct, *dataNameStruct;
+ string collName, dataName, irodsFilenameWithPath;
+ bool loop_stop = false;
+ while ((t == 0) && (!loop_stop)) {
+ for(int i=0;i<genQueryOut->rowCnt; i++) {
+ collNameStruct = getSqlResultByInx (genQueryOut, COL_COLL_NAME);
+ dataNameStruct = getSqlResultByInx (genQueryOut, COL_DATA_NAME);
+
+ collName = &collNameStruct->value[collNameStruct->len*i];
+ dataName = &dataNameStruct->value[dataNameStruct->len*i];
+
+ irodsFilenameWithPath = collName + "/" + dataName;
+
+ src_files.push_back(irodsFilenameWithPath);
+ }
+
+ if(genQueryOut->continueInx == 0) {
+ loop_stop = true;
+ }
+ else {
+ genQueryInp.continueInx=genQueryOut->continueInx;
+ t = rcGenQuery (conn, &genQueryInp, &genQueryOut);
+ }
+ }
+
+ freeGenQueryOut(&genQueryOut);
+ return 0;
+}
+
+static void print_coll_info(vector<string> & files) {
+ if(files.size() == 0) {
+ cerr << "no files found in the source collection.\n";
+ }
+
+ for(int i=0; i < files.size(); i++) {
+ cerr << files[i] << endl;
+ }
+}
+
+static void construct_dest_files(RodsEnvClass& srcEnv, vector<string>&
srcFiles, RodsEnvClass& destEnv, vector<string>& destFiles) {
+ int t = srcEnv.topCollection.length();
+ for(int i=0; i< srcFiles.size(); i++) {
+ string filename = destEnv.topCollection + "/" +
srcFiles[i].substr(t+1);
+ destFiles.push_back(filename);
+ }
+}
+
+static void program_start_msg() {
+ stringstream ss;
+ ss << "The program started at: " << getcurtime() << endl;
+ string msg = ss.str();
+ write_log(msg);
+}
+
+static void program_exit_msg() {
+ stringstream ss;
+ ss << "Files replicated: " << totalNumFilesReplicated << endl;
+ ss << "Files added new user-defined metadata: " <<
totalNumFilesAddedNewMetadata << endl;
+ ss << "Files skipped (already exist in replcation server): " <<
totalNumFilesSkiped << endl;
+ ss << "Files failed to be replicated: " <<
totalNumFilesFailedToReplicate << endl;
+ ss << "The program exit at: " << getcurtime() << endl << endl;
+ string msg = ss.str();
+ write_log(msg);
+}
+
+static void PrintUsage(void) {
+ cerr << "Usage: CrossZoneRepl -configfile=[configfilename]\n";
+}
+
+static void write_log_and_exit(string& msg) {
+ write_log(msg);
+ program_exit_msg();
+ exit(0);
+}
=======================================
--- /dev/null
+++ /trunk/CrossZoneRepl/Makefile Tue Nov 1 23:18:38 2011
@@ -0,0 +1,100 @@
+IRODS_SRC_HOME = ../../../irods/iRODS25
+
+CPP = c++
+
+ifndef buildDir
+buildDir = $(IRODS_SRC_HOME)
+endif
+
+include $(buildDir)/config/config.mk
+include $(buildDir)/config/platform.mk
+include $(buildDir)/config/directories.mk
+include $(buildDir)/config/common.mk
+#include $(IRODS_SRC_HOME)/config/config.mk
+#include $(IRODS_SRC_HOME)/config/platform.mk
+#include $(IRODS_SRC_HOME)/config/directories.mk
+#include $(IRODS_SRC_HOME)/config/common.mk
+
+SRC_DIR = .
+OBJ_DIR = .
+
+#CFLAGS=$(MY_CFLAG)
+#CFLAGS = $(CFLAGS_OPTIONS) $(LIB_INCLUDES) $(SVR_INCLUDES)
$(MODULE_CFLAGS)
+CFLAGS = -g $(CFLAGS_OPTIONS) $(LIB_INCLUDES) $(SVR_INCLUDES)
$(MODULE_CFLAGS)
+LDFLAGS += $(CL_LDADD) $(LIBRARY) $(MODULE_LDFLAGS)
+
+LOCAL_OBJS = $(OBJ_DIR)/CrossZoneRepl.o $(OBJ_DIR)/AvuMetaData.o
$(OBJ_DIR)/utils.o
+LOCAL_TARGET = ./CrossZoneRepl
+
+ifeq ($(OS_platform), solaris_platform)
+LDADD+=-lnsl -lsocket
+endif
+
+ifeq ($(OS_platform), solaris_platform)
+SO_FLAG = -G
+DYLIB =
+endif
+
+ifeq ($(OS_platform), linux_platform)
+endif
+
+ifeq ($(OS_platform),sunos_platform)
+SO_FLAG = -G
+endif
+
+ifeq ($(OS_platform),aix_platform)
+SO_FLAG = -G
+endif
+
+ifeq ($(OS_platform), osx_platform)
+CC = gcc
+SO_FLAG = -fno-common -dynamiclib
+endif
+
+ifeq ($(OS_platform),c90_platform)
+endif
+
+ifeq ($(OS_platform),sgi_platform)
+endif
+
+
+# SGI doesn't like -lm in the middle of the CC line
+ifeq ($(OS_platform), osx_platform)
+else
+ifeq ($(OS_platform), sgi_platform)
+else
+LDADD+=-lm
+endif
+endif
+
+ifdef GSI_AUTH
+ifeq ($(OS_platform), OS_platform_aix)
+LDADD+= $(LIB_GSI_AUTH) $(KRB_LIBS)
+else
+ifeq ($(OS_platform), c90_platform)
+LDADD+= $(LIB_GSI_AUTH) $(KRB_LIBS)
+else
+LDADD+= $(LIB_GSI_AUTH) $(KRB_LIBS) -z muldefs
+endif
+endif
+endif
+
+LDADD+=-lpthread
+
+
+all: $(LOCAL_TARGET)
+
+$(OBJ_DIR)/%.o: $(SRC_DIR)/%.c
+ $(CC) -c $(CFLAGS) -o $@ $<
+
+
+$(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp
+ $(CPP) -c $(CFLAGS) -o $@ $<
+
+
+$(LOCAL_TARGET): $(LOCAL_OBJS)
+ @echo "linking `basename $@`..."
+ $(CPP) $(SO_FLAG) -o $@ $(LOCAL_OBJS) $(LDFLAGS)
+
+clean:
+ rm -f $(LOCAL_TARGET) $(LOCAL_OBJS)
=======================================
--- /dev/null
+++ /trunk/CrossZoneRepl/crosszonerepl.cfg Tue Nov 1 23:18:38 2011
@@ -0,0 +1,23 @@
+# comment line starts with '#'
+#
+# debug value is 'yes' or 'no'
+Debug=no
+#
+#---iRODS top-level source collection
+srcRodsHost=fromHost
+srcRodsPort=1247
+srcRodsDefResource=fromResc
+srcRodsUserName=fromZoneUser
+srcRodsUserPasswd=password
+srcRodsZone=fromZone
+srcRodsSourceCollection=fromCollName_in_fromZone
+#
+#
+#---iRODS top-level destination collection
+destRodsHost=toHost
+destRodsPort=1247
+destRodsDefResource=toResc
+destRodsUserName=toZoneUser
+destRodsUserPasswd=password
+destRodsZone=toZone
+destRodsDestinationCollection=toCollName_in_toZone
=======================================
--- /dev/null
+++ /trunk/CrossZoneRepl/readme.txt Tue Nov 1 23:18:38 2011
@@ -0,0 +1,25 @@
+The program is used to replicate datasets, including files and
user-defined metadata, from one zone to
+another designated zone.
+
+1. Build the software
+ (1) Modify the "IRODS_SRC_HOME" in Makefile so that it points to the
iRODS software directory.
+ (2) run 'make'
+ (3) The executable, CrossZoneRep, should be created.
+
+2. Edit the configuration file for the cross-zone file replication
service, 'CrossZoneRepl.cfg'.
+ (1) add the irods source collection info: irods server, user account,
top source collection for replication.
+ (2) add the irods destination info: irds server, user account, top
iRODS collection for file replication.
+
+ Note: A sample 'crosszonerepl.cfg' is provided in the package.
+
+3. Manually test the program
+ Example: $ CrossZoneRepl
-configfile=/data/bzhu/irods_dev/corsszonerepl.cfg
+
+4. Deployment of the cross-zone file replication service
+
+ Method #1: The program can be deployed as a UNIX cron job that scans
a designated source iRODS collection and
+ its child collections periodically to replicate new files into the
specified destination collection in
+ another iRODS zone.
+
+ Method #2: The program can also be deployed inside iRODS as an micro
service for an external program that runs
+ periodically to do corss-zone file replication.
=======================================
--- /dev/null
+++ /trunk/CrossZoneRepl/run_CrossZoneRepl.sh Tue Nov 1 23:18:38 2011
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+LOG=/var/log/irods_repl.log
+
+pushd /opt/irods/CrossZoneRepl > /dev/null
+ ./CrossZoneRepl -configfile=crosszonerepl.cfg >> $LOG 2>&1
+popd > /dev/null
+
+exit 0
=======================================
--- /dev/null
+++ /trunk/CrossZoneRepl/utils.cpp Tue Nov 1 23:18:38 2011
@@ -0,0 +1,29 @@
+/* The program is used to make replication across iRODS zones from UAL to
OCUL.
+ * Author: Bing Zhu
+ * Date Created: May 17, 2010
+ * Version 1.1
+ */
+
+#include "utils.h"
+
+using namespace std;
+
+string getcurtime() {
+ time_t rawtime;
+ struct tm * timeinfo;
+ time(&rawtime);
+ timeinfo = localtime(&rawtime);
+ string timestr = asctime(timeinfo);
+ int n = timestr.length();
+ if((n > 0) && (timestr[n-1] == '\n')) {
+ timestr = timestr.substr(0, n-1);
+ }
+ return timestr;
+}
+
+string stringtrim(string& str) {
+ string drop = " ";
+ std::string r = str.erase(str.find_last_not_of(drop)+1);
+ r = r.erase(0, r.find_first_not_of(drop));
+ return r;
+}
=======================================
--- /dev/null
+++ /trunk/CrossZoneRepl/utils.h Tue Nov 1 23:18:38 2011
@@ -0,0 +1,21 @@
+/* The program is used to make replication across iRODS zones from UAL to
OCUL.
+ * Author: Bing Zhu
+ * Date Created: May 17, 2010
+ * Version 1.1
+ */
+
+#ifndef _UTILS_H__
+#define _UTILS_H__
+
+#include <iostream>
+#include <sstream>
+#include <string>
+#include <algorithm>
+#include <vector>
+
+using namespace std;
+
+string getcurtime();
+string stringtrim(string& str);
+
+#endif

Reply all
Reply to author
Forward
0 new messages