[hpc4finance] r2 committed - check in server source

1 view
Skip to first unread message

hpc4f...@googlecode.com

unread,
Jan 17, 2010, 2:53:48 AM1/17/10
to hpc4fin...@googlegroups.com
Revision: 2
Author: warunapww
Date: Sat Jan 16 23:52:53 2010
Log: check in server source
http://code.google.com/p/hpc4finance/source/detail?r=2

Added:
/trunk/hpc4server
/trunk/hpc4server/HPC4FServer.cpp
/trunk/hpc4server/HPC4FServer.h
/trunk/hpc4server/Makefile
/trunk/hpc4server/RequestHandler.cpp
/trunk/hpc4server/RequestHandler.h
/trunk/hpc4server/TestHPC4FServer.cpp
/trunk/hpc4server/defs.h
/trunk/hpc4server/tpool.cpp
/trunk/hpc4server/tpool.h

=======================================
--- /dev/null
+++ /trunk/hpc4server/HPC4FServer.cpp Sat Jan 16 23:52:53 2010
@@ -0,0 +1,366 @@
+/*
+ Copyright (c) 2008 by contributors:
+
+ * Damitha Premadasa
+ * Nilendra Weerasinghe
+ * Thilina Dampahala
+ * Waruna Ranasinghe - (http://warunapw.blogspot.com)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+/*
+// //
+// // PROJECT : HPC4Finance
+// // MODULE : HPC4FServer which runs in Master Node
+// // FILE : HPC4FServer.cpp
+// // AUTHOR : Waruna Ranasinghe (waru...@gmail.com)
+// // DESC : The implementation of the HPC4FServer class is done here.
Each
+// request is processed using a new thread.
+// // TODO : Failover and Load Balancing
+// // HISTORY : Date of Creation: 5-Oct-2008
+// Modified: 13-Nov-2008 :
Implemented 'WriteErrorLog()', 'WriteAccessLog()'
+// Modified: 14-Jan-2009 : Handle tickets -> get a ticket
request and return a unique ticket id.
+// Modified: 19-Jan-2009 : Initialize pz_ErrorMSG and
pmap_Tickets, free pmap_Tickets in distructor
+// : Bug fix: chek whether the original ticket
holder made the request
+// : Modified WriteAccessLog to print a customized
message
+// //
+*/
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
+#include "HPC4FServer.h"
+#include "RequestHandler.h"
+
+//**************************************************************************************************
+HPC4FServer::HPC4FServer()
+{
+ i_Port = 8045;
+ pz_HostAddr = ANY_ADDR;
+ f_ErrorLog = stdout;
+ f_AccessLog = stdout;
+ printf("Port: %d\n", i_Port);
+ bzero(z_ClientAddr, IP_ADDR_LEN);
+ WriteAccessLog();
+ pmap_Tickets = new map<int, char*>();
+ tpool = new Tpool(HPC4FSERVER_MIN_THREADS, HPC4FSERVER_MAX_QUEUE, 0);
+ pz_ErrorMSG = NULL;
+}
+
+//**************************************************************************************************
+HPC4FServer::HPC4FServer( char *pzHost, int iPort )
+{
+ i_Port = iPort;
+ if (pzHost == ANY_ADDR)
+ pz_HostAddr = ANY_ADDR;
+ else
+ pz_HostAddr = strdup(pzHost);
+ f_ErrorLog = stdout;
+ f_AccessLog = stdout;
+ bzero(z_ClientAddr, IP_ADDR_LEN);
+ WriteAccessLog();
+ pmap_Tickets = new map<int, char*>();
+ tpool = new Tpool(HPC4FSERVER_MIN_THREADS, HPC4FSERVER_MAX_QUEUE, 0);
+// printf("Port: %d \tHost: %s\n", this->i_Port, this->pz_HostAddr);
+ pz_ErrorMSG = NULL;
+}
+
+//**************************************************************************************************
+HPC4FServer::HPC4FServer( char *pzHost, int iPort, FILE *fAccessLog, FILE
*fErrorLog)
+{
+ i_Port = iPort;
+ if (pzHost == ANY_ADDR)
+ pz_HostAddr = ANY_ADDR;
+ else
+ pz_HostAddr = strdup(pzHost);
+ if (fAccessLog)
+ f_AccessLog = fAccessLog;
+ else
+ f_AccessLog = stdout;
+ if (fErrorLog)
+ f_ErrorLog = fErrorLog;
+ else
+ f_ErrorLog = stdout;
+ bzero(z_ClientAddr, IP_ADDR_LEN);
+ WriteAccessLog();
+ pmap_Tickets = new map<int, char*>();
+ tpool = new Tpool(HPC4FSERVER_MIN_THREADS, HPC4FSERVER_MAX_QUEUE, 0);
+// printf("Port: %d \tHost: %s\n", this->i_Port, this->pz_HostAddr);
+ pz_ErrorMSG = NULL;
+}
+
+//**************************************************************************************************
+HPC4FServer::~HPC4FServer()
+{
+ if (pz_HostAddr)
+ delete pz_HostAddr;
+ tpool->Destroy(1);
+ delete tpool;
+ map<int, char*>::iterator it;
+ for (it = pmap_Tickets->begin(); it != pmap_Tickets->end(); it++)
+ {
+ delete [](*it).second;
+ }
+ delete pmap_Tickets;
+ if(pz_ErrorMSG != NULL)
+ delete pz_ErrorMSG;
+}
+
+//**************************************************************************************************
+#ifdef _WIN32
+bool HPC4FServer::WinSockDLLVersionOK()
+{
+ WORD wVersionRequested;
+ WSADATA wsaData;
+ int iErr;
+
+ wVersionRequested = MAKEWORD( 2, 2 );
+
+ iErr = WSAStartup( wVersionRequested, &wsaData );
+
+ /* Found a usable winsock dll? */
+ if( iErr != 0 )
+ return false;
+
+ /* Confirm that the WinSock DLL supports 2.2.
+ * Note that if the DLL supports versions greater
+ * than 2.2 in addition to 2.2, it will still return
+ * 2.2 in wVersion since that is the version we
+ * requested
+ * */
+
+ if( LOBYTE( wsaData.wVersion ) != 2 || HIBYTE( wsaData.wVersion ) != 2 )
+ {
+
+ /* Tell the user that we could not find a usable WinSock DLL.*/
+ WSACleanup( );
+ return false;
+ }
+
+ /* The WinSock DLL is acceptable. Proceed. */
+ return true;
+}
+#endif
+
+//**************************************************************************************************
+bool HPC4FServer::CreateHPC4FServer()
+{
+ struct sockaddr_in ssAddr;
+
+#ifdef _WIN32
+ if(!WinSockDLLVersionOK())
+ return false;
+#endif
+
+ i_ServerSock = socket(AF_INET, SOCK_STREAM, 0);
+ if (i_ServerSock < 0)
+ {
+ //calling WSAGetLastError to get the error details in windows
+ WriteErrorLog("", strerror(errno));
+ printf("Error: %s\n", strerror(errno));
+ return false;
+ }
+
+#ifdef SO_REUSEADDR
+ int opt = 1;
+ setsockopt(i_ServerSock, SOL_SOCKET, SO_REUSEADDR,
(char*)&opt,sizeof(int));
+#endif
+
+ bzero(&ssAddr, sizeof(ssAddr));
+ ssAddr.sin_family = AF_INET;
+
+ if (pz_HostAddr == NULL)
+ {
+ ssAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ }
+ else
+ {
+ ssAddr.sin_addr.s_addr = inet_addr(pz_HostAddr);
+ }
+
+ ssAddr.sin_port = htons((u_short)i_Port);
+
+ if (bind(i_ServerSock,(struct sockaddr *)&ssAddr,sizeof(ssAddr)) <0)
+ {
+ WriteErrorLog("", strerror(errno));
+ close(i_ServerSock);
+ return false;
+ }
+
+ if(listen(i_ServerSock, MAX_QUEUE_LEN) < 0)
+ {
+ WriteErrorLog("", strerror(errno));
+ close(i_ServerSock);
+ return false;
+ }
+
+ return true;
+}
+
+//**************************************************************************************************
+bool HPC4FServer::SetErrorLog( FILE *fp )
+{
+ f_ErrorLog = fp;
+ return true;
+}
+
+//**************************************************************************************************
+bool HPC4FServer::SetAccessLog( FILE *fp )
+{
+ f_AccessLog = fp;
+ return true;
+}
+
+//**************************************************************************************************
+enum eSelectStatus HPC4FServer::GetConnection( struct timeval *stTimeout )
+{
+ int iResult;
+ int iClientSock;
+ int issAddrLen;
+ char *pzIP;
+ struct sockaddr_in ssAddr;
+ fd_set fdSet;
+ pthread_t thread;
+
+ FD_ZERO(&fdSet);
+ FD_SET((unsigned)i_ServerSock, &fdSet);
+ iResult = 0;
+
+ while(iResult == 0)
+ {
+ iResult = select(i_ServerSock + 1, &fdSet, 0, 0, stTimeout);
+ if (iResult < 0)
+ return ERR;
+ if (stTimeout != 0 && iResult == 0)
+ return TIMEOUT;
+ if (iResult > 0)
+ break;
+ }
+
+ bzero(&ssAddr, sizeof(ssAddr));
+ issAddrLen = sizeof(ssAddr);
+
+ iClientSock = accept(i_ServerSock,(struct sockaddr *)&ssAddr,
(socklen_t*)&issAddrLen);
+ pzIP = inet_ntoa(ssAddr.sin_addr);
+ strcpy(z_ClientAddr, pzIP);
+
+// WriteAccessLog();
+
+ /* The request is handled by a seperate thread. The main thread will NOT
wait
+ * untill the thread exit. It will continue to listen to new requests
+ * */
+ RequestHandler *pReqHan = new RequestHandler(iClientSock, pzIP, i_Port,
i_ServerSock, f_ErrorLog, f_AccessLog);
+ pReqHan->ReadRequest();
+ if (pReqHan->IsTicket())
+ {
+ char pzReferenceID[33];
+ int iTicket = tpool->GetNextReferenceID();
+ sprintf(pzReferenceID, "%d", iTicket);
+ if (pReqHan->WriteSockData(pzReferenceID, strlen(pzReferenceID)) > 0)
+ {
+ (*pmap_Tickets)[iTicket] = strdup(z_ClientAddr);
+ }
+ else
+ {
+ printf("Error: %s\n", strerror(errno));
+ WriteErrorLog("", strerror(errno));
+ }
+ return TIMEOUT;
+ }
+ else if (!pReqHan->IsAbortRequest())
+ {
+ int iTicket = pReqHan->GetTicket();
+ if ((*pmap_Tickets)[iTicket] != NULL)
+ {
+ if((strcmp((*pmap_Tickets)[iTicket], z_ClientAddr)) != 0)
+ {
+ pReqHan->WriteSockData("error,Access Denied!", strlen("error,Access
Denied!"));
+ pz_ErrorMSG = strdup("Access Violation");
+ printf("Caution: %s\n", pz_ErrorMSG);
+ WriteAccessLog();
+ return TIMEOUT;
+ }
+ else
+ {
+ pmap_Tickets->erase(iTicket);
+ }
+ }
+ else
+ {
+ pReqHan->WriteSockData("error,Access Denied!", strlen("error,Access
Denied!"));
+ pz_ErrorMSG = strdup("Access Violation");
+ printf("Caution: %s\n", pz_ErrorMSG);
+ WriteAccessLog();
+ return TIMEOUT;
+ }
+ }
+
+ ////////////////////////////////////////////////////////////// Check the
thread function- it shud b removed
+ if (tpool->AddWork((void*)0, (void*)pReqHan) == -1)
+ {
+ WriteErrorLog("", "TPOOL ERROR\n"); // CHK whether errno set when there
is a thread error
+ return ERR;//EXIT_FAILURE;
+ }
+
+ return TIMEOUT;//PENDING_REQUEST;
+}
+
+void HPC4FServer::WriteErrorLog( char *pzLevel, char *pzMSG )
+{
+ char dateBuf[30];
+ struct tm *timePtr;
+ time_t clock;
+
+
+ if (f_ErrorLog == NULL)
+ return;
+ clock = time(NULL);
+ timePtr = localtime(&clock);
+ strftime(dateBuf, 30, "%a %b %d %T %Y", timePtr);
+ fprintf(f_ErrorLog, "%s:%s [%s] %s\n", __FILE__, __LINE__, dateBuf,
pzMSG);
+ fflush(f_ErrorLog);
+}
+
+//**************************************************************************************************
+void HPC4FServer::WriteAccessLog()
+{
+ char dateBuf[30];
+ struct tm *timePtr;
+ time_t clock;
+ int responseCode;
+
+
+ if (f_AccessLog == NULL)
+ return;
+ clock = time(NULL);
+ timePtr = localtime(&clock);
+ strftime(dateBuf, 30, "%d/%b/%Y:%T %Z", timePtr);
+ if (*z_ClientAddr != 0)
+ {
+ if (pz_ErrorMSG)
+ fprintf(f_AccessLog, "%s - - [%s] -- %s \n", z_ClientAddr, dateBuf,
pz_ErrorMSG);
+ else
+ fprintf(f_AccessLog, "%s - - [%s] \n", z_ClientAddr, dateBuf);
+
+ fflush(f_AccessLog);
+ }
+ else
+ {
+ fprintf(f_AccessLog, "HPC4FServer is Started, Host: %s Port: %d - - [%s]
\n", pz_HostAddr, i_Port, dateBuf);
+ fflush(f_AccessLog);
+ }
+}
+
+//
**************************************************************************************************
+
=======================================
--- /dev/null
+++ /trunk/hpc4server/HPC4FServer.h Sat Jan 16 23:52:53 2010
@@ -0,0 +1,103 @@
+/*
+ Copyright (c) 2008 by contributors:
+
+ * Damitha Premadasa
+ * Nilendra Weerasinghe
+ * Thilina Dampahala
+ * Waruna Ranasinghe - (http://warunapw.blogspot.com)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+/*
+// //
+// // PROJECT : HPC4Finance
+// // MODULE : HPC4FServer which runs in Master Node
+// // FILE : HPC4FServer.h
+// // AUTHOR : Waruna Ranasinghe (waru...@gmail.com)
+// // DESC : The header file defines the HPC4FServer class, its
properties
+// and methods. This is satisfactorily tested with UNIX
+// environment while partially implemented for WINDOWS.
+// // TODO : Failover and Load Balancing
+// // HISTORY : Date of Creation: 5-Oct-2008
+// Modified: 24-Nov-2008: tpool.h
+// Modified: 19-Jan-2009: #include <map>, pz_ErrorMSG,
pmap_Tickets
+// //
+*/
+
+#ifndef _HPC4FSERVER_H_
+#define _HPC4FSERVER_H_
+
+#include "defs.h"
+
+#ifdef _WIN32
+#include <winsock2.h>
+#include <io.h>
+#else
+#include <unistd.h>
+#include <sys/file.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include "tpool.h"
+#endif
+
+#include <map>
+
+using namespace std;
+
+enum eSelectStatus
+{
+ ERR = -1,
+ TIMEOUT = 0,
+ PENDING_REQUEST = 1
+};
+
+class HPC4FServer
+{
+private:
+ int i_ServerSock;
+ int i_Port;
+ char z_ClientAddr[IP_ADDR_LEN];
+ char *pz_HostAddr;
+ FILE *f_ErrorLog;
+ FILE *f_AccessLog;
+ char *pz_ErrorMSG;
+
+ map<int, char*> *pmap_Tickets;
+
+ Tpool *tpool;
+
+ void WriteErrorLog(char *pzLevel, char *pzMSG);
+ void WriteAccessLog();
+#ifdef _WIN32
+ bool WinSockDLLVersionOK();
+#endif
+public:
+ HPC4FServer();
+ HPC4FServer(char *pzHost, int iPort);
+ HPC4FServer( char *pzHost, int iPort, FILE *fAccessLog, FILE *fErrorLog);
+ virtual ~HPC4FServer();
+
+ bool CreateHPC4FServer();
+ enum eSelectStatus GetConnection(struct timeval *stTimeout);
+
+ bool SetErrorLog(FILE *fp); // this should call before
call 'GetConnection'
+ bool SetAccessLog(FILE *fp);// this should call before
call 'GetConnection'
+};
+
+#endif // _HPC4FSERVER_H_
+
=======================================
--- /dev/null
+++ /trunk/hpc4server/Makefile Sat Jan 16 23:52:53 2010
@@ -0,0 +1,35 @@
+all: HPC4FServer
+
+HPC4FServer: TestHPC4FServer.o HPC4FServer.o RequestHandler.o tpool.o
+ g++ -g -lpthread TestHPC4FServer.o HPC4FServer.o RequestHandler.o tpool.o
-o HPC4FServer
+
+TestHPC4FServer.o: TestHPC4FServer.cpp HPC4FServer.h
+ g++ -g -c TestHPC4FServer.cpp
+
+HPC4FServer.o: HPC4FServer.cpp HPC4FServer.h RequestHandler.h defs.h
tpool.h
+ g++ -g -c HPC4FServer.cpp
+
+RequestHandler.o: RequestHandler.cpp RequestHandler.h defs.h
+ g++ -g -c RequestHandler.cpp
+
+tpool.o: tpool.h tpool.cpp RequestHandler.h
+ g++ -g -c tpool.cpp -o tpool.o
+clean:
+ rm -rf *o HPC4FServer HPC4FServerS
+
+HPC4FServerS: TestHPC4FServerS.o HPC4FServerS.o RequestHandlerS.o tpoolS.o
+ g++ -O3 -lpthread TestHPC4FServerS.o HPC4FServerS.o RequestHandlerS.o
tpoolS.o -o HPC4FServerS
+
+TestHPC4FServerS.o: TestHPC4FServer.cpp HPC4FServer.h
+ g++ -O3 -c TestHPC4FServer.cpp -o TestHPC4FServerS.o
+
+HPC4FServerS.o: HPC4FServer.cpp HPC4FServer.h RequestHandler.h defs.h
tpool.h
+ g++ -O3 -c HPC4FServer.cpp -o HPC4FServerS.o
+
+RequestHandlerS.o: RequestHandler.cpp RequestHandler.h defs.h
+ g++ -O3 -c RequestHandler.cpp -o RequestHandlerS.o
+
+tpoolS.o: tpool.h tpool.cpp RequestHandler.h
+ g++ -O3 -c tpool.cpp -o tpoolS.o
+cleanS:
+ rm -rf *o HPC4FServerS
=======================================
--- /dev/null
+++ /trunk/hpc4server/RequestHandler.cpp Sat Jan 16 23:52:53 2010
@@ -0,0 +1,477 @@
+/*
+ Copyright (c) 2008 by contributors:
+
+ * Damitha Premadasa
+ * Nilendra Weerasinghe
+ * Thilina Dampahala
+ * Waruna Ranasinghe - (http://warunapw.blogspot.com)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+/*
+// //
+// // PROJECT : HPC4Finance
+// // MODULE : HPC4FServer which runs in Master Node
+// // FILE : RequestHandler.cpp
+// // AUTHOR : Waruna Ranasinghe (waru...@gmail.com)
+// // DESC : The implementation of the HPC4FServer class is done
here. Each
+// request is processed using a new thread.
+// // TODO : Failover
+// // HISTORY : Date of Creation: 5-Oct-2008
+// Modified: 13-Nov-2008 :
implemented 'WriteAccesslog()', 'WriteErrorLog()'
+// 17-Nov-2008 : added 'StrTokenizer'
+// 07-Jan-2009 : changed 'mpiexec' to 'mpdrun' to have
an alias to the job
+// 14-Jan-2009 : GetTicket, IsTicket, IsAbortRequest is
added
+// 15-Jan-2009 : GetMachineFileName aded for load
balancing
+// //
+*/
+
+#include <stdio.h>
+#include <memory.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string>
+#include "RequestHandler.h"
+#include "defs.h"
+
+using namespace std;
+
+//**************************************************************************************************
+RequestHandler::RequestHandler(int iClientSock, char *pzClientIPAddr, int
iPort, int iServerSock, FILE *fErrorLog, FILE *fAccessLog)
+{
+ i_ClientSock = iClientSock;
+ i_Port = iPort;
+ i_ServerSock = iServerSock;
+ i_ReadBufRemain = 0;
+ b_FullyReadSockData = false;
+ pz_ReadBuf = NULL;
+ pz_Request = NULL;
+ bzero(z_ReadBuf, MAX_READ_BUF_LEN + 1);
+ if (pzClientIPAddr)
+ strncpy(z_ClientAddr, pzClientIPAddr, IP_ADDR_LEN);
+ else
+ *z_ClientAddr = 0;
+ if (fErrorLog)
+ {
+ f_ErrorLog = fErrorLog;
+ }
+ else
+ f_ErrorLog = NULL;
+ if (fAccessLog)
+ {
+ f_AccessLog = fAccessLog;
+ }
+ else
+ f_AccessLog = NULL;
+}
+
+//**************************************************************************************************
+RequestHandler::~RequestHandler()
+{
+ if(pz_ReadBuf)
+ delete[] pz_ReadBuf;
+ if(pz_Request)
+ free(pz_Request);
+}
+
+//**************************************************************************************************
+void RequestHandler::EndRequest()
+{
+// WriteAccessLog();
+ shutdown(i_ClientSock,2);
+ close(i_ClientSock);
+}
+
+//**************************************************************************************************
+bool RequestHandler::ReadRequest()
+{
+ int iRead = 0;
+ string sRequest;
+
+ while (!b_FullyReadSockData)
+ {
+ bzero(z_ReadBuf, MAX_READ_BUF_LEN + 1);
+ iRead = ReadSockData();
+ if (iRead < 0)
+ {
+ WriteErrorLog("", strerror(errno));
+ printf("Error: %s\n", strerror(errno));
+ }
+ if (iRead < MAX_READ_BUF_LEN)
+ b_FullyReadSockData = true;
+ if (iRead > 0)
+ {
+ sRequest.append(z_ReadBuf, iRead);
+ //write in the error log
+ }
+ // iRead should be handled for errors
+ }
+ b_FullyReadSockData = false;
+
+ if (sRequest.length() <= 0)
+ {
+ WriteErrorLog("", "Invalid Reguest");
+ WriteSockData(INVALID_REQUEST, strlen(INVALID_REQUEST));
+ return false;
+ }
+
+ pz_Request = strdup(sRequest.c_str());
+ printf("Request Read: %s\n", sRequest.c_str());
+ WriteAccessLog();
+ return true;
+}
+
+
+//**************************************************************************************************
+bool RequestHandler::IsTicket()
+{
+ if (strcmp(pz_Request, "ticket") == 0)
+ return true;
+ return false;
+}
+
+//**************************************************************************************************
+bool RequestHandler::IsAbortRequest()
+{
+ if (strncmp(pz_Request, "mpdkilljob", 10) == 0)
+ return true;
+ return false;
+}
+
+//**************************************************************************************************
+int RequestHandler::GetTicket()
+{
+ int iTicket;
+ char *pzTemp = strdup(pz_Request);
+ char *pzPos = strstr(pzTemp, "-a");
+ iTicket = atoi(pzPos + 3);
+ delete []pzTemp;
+ return iTicket;
+}
+
+//**************************************************************************************************
+int RequestHandler::ReadSockData()
+{
+#ifdef _WIN32
+ return( recv(i_ClientSock, z_ReadBuf, MAX_READ_BUF_LEN, 0)); // in error
SOCKET_ERROR
+#else
+ return( read(i_ClientSock, z_ReadBuf, MAX_READ_BUF_LEN));
+#endif
+}
+
+//**************************************************************************************************
+int RequestHandler::WriteSockData(char *pzBuf, int iLen)
+{
+#ifdef _WIN32
+ return( send(i_ClientSock, pzBuf, iLen, 0));
+#else
+ return( write(i_ClientSock, pzBuf, iLen));
+#endif
+}
+
+//**************************************************************************************************
+bool RequestHandler::ProcessRequest()
+{
+ bool bAbort = false;
+ if (IsAbortRequest())
+ {
+ bAbort = true;
+ }
+ int iNoParams;
+ char **pStrings = NULL;
+ int fdStdoutPipe[2];
+ int iChildPID, iExecvp = 100;
+ string sData;
+ int t = 0;
+ char szBuffer[BUF_SIZE + 1];
+
+ if (!bAbort)
+ {
+
+// setbuf(stdout, (char *) 0);
+
+ if (pipe(fdStdoutPipe) < 0)
+ {
+ WriteErrorLog("", strerror(errno));
+ printf("Error: %s\n", strerror(errno));
+ return false;
+ }
+
+ int iSize = 0;
+ char *pzCurrStr;
+ int iNoStr = 2;
+ pStrings = (char **)realloc(pStrings, sizeof(char*) * iNoStr);
+// pStrings[0] = strdup("mpiexec");
+ pStrings[0] = strdup("mpdrun");
+ pStrings[1] = strdup("-n");
+
+ bool bNxtAlias = false;
+ bool bAliasFound = false;
+ bool bMachinefileAdded = false;
+ char *pzMachineFile;
+ int iNodes;
+ pzCurrStr = strtok(pz_Request," ");
+ if (pzCurrStr != NULL)
+ iNodes = atoi(pzCurrStr);
+ while (pzCurrStr != NULL)
+ {
+ if (pStrings == NULL)
+ {
+ printf("Error: %s\n", strerror(errno));
+ exit(0);
+ }
+ if (bNxtAlias)
+ {
+ pzMachineFile = GetMachineFileName(pzCurrStr, iNodes);
+ bNxtAlias = false;
+ bAliasFound = true;
+ }
+ else if ((strcmp(pzCurrStr, "-a") == 0) && !bAliasFound)
+ bNxtAlias = true;
+ else if (bAliasFound && !bMachinefileAdded)
+ {
+ iNoStr += 2;
+ pStrings = (char **)realloc(pStrings, sizeof(char*) * iNoStr);
+ pStrings[iNoStr - 2] = strdup("-hf");
+ pStrings[iNoStr - 1] = strdup(pzMachineFile);
+ bMachinefileAdded = true;
+ }
+
+ iNoStr++;
+ pStrings = (char **)realloc(pStrings, sizeof(char*) * iNoStr);
+ pStrings[iNoStr - 1] = strdup(pzCurrStr);
+ pzCurrStr = strtok (NULL, " ");
+ }
+ pStrings = (char **)realloc(pStrings, sizeof(char*) * (iNoStr + 1));
+ pStrings[iNoStr] = (char *)0;
+ pzReferenceID = strdup(pStrings[4]);
+ iNoParams = iNoStr;
+ }
+ else
+ {
+ pStrings = new char*[4];
+ char *pzCurrStr;
+ int x = 0;
+ pzCurrStr = strtok(pz_Request," ");
+ while (pzCurrStr != NULL)
+ {
+ pStrings[x] = strdup(pzCurrStr);
+ pzCurrStr = strtok (NULL, " ");
+ x++;
+ }
+ pStrings[3] = (char *)0;
+ iNoParams = 3;
+ }
+
+ char **args = pStrings;
+
+ int nOutRead = 1;
+ if ((iChildPID = fork()) < 0)
+ {
+ WriteErrorLog("", strerror(errno));
+ printf("Error: %s\n", strerror(errno));
+ return false;
+ }
+ else if (iChildPID == 0) //In the Child
+ {
+ if (!bAbort)
+ {
+ close(fdStdoutPipe[READ_HANDLE]);
+ if(dup2(fdStdoutPipe[WRITE_HANDLE], fileno(stdout)) == -1)
+ {
+ WriteErrorLog("", strerror(errno));
+ printf("In the Child :PID : %d : %s\n", iChildPID, strerror(errno));
+ return false;
+ }
+ close(fdStdoutPipe[WRITE_HANDLE]);
+ }
+
+ iExecvp = execvp(*args, args);
+ if (iExecvp == -1)
+ {
+ WriteErrorLog("", strerror(errno));
+ printf("In the Child :Error : %s\n", strerror(errno));
+ }
+// printf("After execute beep: iExecvp = %d\n", iExecvp); // this will
not be printed.
+ }
+ else //in the main process
+ {
+ for (int i = 0; i < (t + iNoParams); i++)
+ {
+ free(args[i]);
+ }
+ free(args);
+
+ if (!bAbort)
+ {
+ if (close(fdStdoutPipe[1]) == 0)
+ {
+ int loops = 0;
+ while (nOutRead > 0)
+ {
+ bzero(szBuffer, BUF_SIZE + 1);
+ nOutRead = read(fdStdoutPipe[READ_HANDLE], szBuffer, BUF_SIZE);
+ if (nOutRead < 0)
+ {
+ WriteErrorLog("", strerror(errno));
+ printf("Error: %s\n", strerror(errno));
+ }
+ loops++;
+ if(nOutRead > 0)
+ sData.append(szBuffer, strlen(szBuffer));
+ }
+ }
+ }
+ }
+ if (!bAbort)
+ {
+ if (sData.empty())
+ sData.append("error,job killed");
+ printf("Data: \n%s\n", sData.c_str());
+
+ if (WriteSockData((char *)sData.c_str(), strlen(sData.c_str())) > 0)
+ {
+ return true;
+ }
+ else
+ {
+ // printf("Error: %s\n", strerror(errno));
+ WriteErrorLog("", strerror(errno));
+ return false;
+ }
+ }
+ return true;
+}
+
+//**************************************************************************************************
+void RequestHandler::WriteErrorLog( char *pzLevel, char *pzMSG )
+{
+ char dateBuf[30];
+ struct tm *timePtr;
+ time_t clock;
+
+ WriteErrorLog("", strerror(errno));
+ if (f_ErrorLog == NULL)
+ return;
+ clock = time(NULL);
+ timePtr = localtime(&clock);
+ strftime(dateBuf, 30, "%a %b %d %T %Y", timePtr);
+ if (*z_ClientAddr != 0)
+ {
+ fprintf(f_ErrorLog, "%s:%s [%s] [client %s] %s\n",__FILE__, __LINE__,
dateBuf, z_ClientAddr, pzMSG);
+ fflush(f_AccessLog);
+ }
+ else
+ {
+ fprintf(f_ErrorLog, "%s:%s [%s] %s\n",__FILE__, __LINE__, dateBuf,
pzMSG);
+ fflush(f_AccessLog);
+ }
+}
+
+//**************************************************************************************************
+void RequestHandler::WriteAccessLog()
+{
+ char dateBuf[30];
+ struct tm *timePtr;
+ time_t clock;
+
+ if (f_AccessLog == NULL)
+ return;
+ clock = time(NULL);
+ timePtr = localtime(&clock);
+ strftime(dateBuf, 30, "%d/%b/%Y:%T %Z", timePtr);
+ fprintf(f_AccessLog, "%s:%d - - [%s] - %s\n", z_ClientAddr, i_ClientSock,
dateBuf, pz_Request);
+ fflush(f_AccessLog);
+}
+
+//
**************************************************************************************************
+char **RequestHandler::StrTokenizer(char *pzStr, int &iNoStr)
+{
+ int iSize = 0;
+ char *pzCurrStr;
+ char **pStrings = NULL;
+ iNoStr = 0;
+
+ pzCurrStr = strtok(pzStr," ");
+ while (pzCurrStr != NULL)
+ {
+ iNoStr++;
+ if(iNoStr > iSize)
+ {
+ pStrings = (char **)realloc(pStrings, sizeof(char*) * iNoStr);
+ if (pStrings == NULL)
+ {
+ printf("Error: %s\n", strerror(errno));
+ exit(0);
+ }
+
+ }
+ pStrings[iNoStr - 1] = strdup(pzCurrStr);
+ pzCurrStr = strtok (NULL, " ");
+ }
+
+ return pStrings;
+}
+
+//
**************************************************************************************************
+char *RequestHandler::GetMachineFileName(char *pzAlias, int iNodes)
+{
+ pthread_mutex_lock(&mutex_FileLock);
+
+ string sFile;
+ char zLine[20];
+ FILE *fpHosts = fopen("/home/cig4/mpd.hosts", "r+");
+ char *pzResult = NULL;
+ int n = 0;
+ int iPos = 0;
+
+ string sHFName("/home/cig4/mpich2-install/bin/hf");
+ string sHF;
+ sHFName += pzAlias;
+ FILE *fpHF = fopen(sHFName.c_str(), "w");
+
+ pzResult = fgets(zLine, 20, fpHosts);
+ while (pzResult != NULL)
+ {
+ if (n < iNodes)
+ {
+ sFile.append(zLine);
+ sHF.append(zLine, strlen(zLine) - 3);
+ sHF.append("\n");
+ if (n == iNodes - 1)
+ {
+ fputs(sHF.c_str(), fpHF);
+ fclose(fpHF);
+ }
+ }
+ else
+ {
+ sFile.insert(iPos, zLine);
+ iPos += strlen(zLine);
+ }
+ n++;
+ pzResult = fgets(zLine, 20, fpHosts);
+ }
+ if (n < iNodes)
+ {
+ fputs(sHF.c_str(), fpHF);
+ fclose(fpHF);
+ }
+ fseek(fpHosts, 0, SEEK_SET);
+ fputs(sFile.c_str(), fpHosts);
+ fclose(fpHosts);
+ pthread_mutex_unlock(&mutex_FileLock);
+ return strdup(sHFName.c_str());
+}
+
+//
**************************************************************************************************
=======================================
--- /dev/null
+++ /trunk/hpc4server/RequestHandler.h Sat Jan 16 23:52:53 2010
@@ -0,0 +1,89 @@
+/*
+ Copyright (c) 2008 by contributors:
+
+ * Damitha Premadasa
+ * Nilendra Weerasinghe
+ * Thilina Dampahala
+ * Waruna Ranasinghe - (http://warunapw.blogspot.com)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+/*
+// //
+// // PROJECT : HPC4Finance
+// // MODULE : HPC4FServer which runs in Master Node
+// // FILE : RequestHandler.h
+// // AUTHOR : Waruna Ranasinghe (waru...@gmail.com)
+// // DESC : The header file defines the RequestHandler class, its
properties
+// and methods. This is satisfactorily tested with UNIX
+// environment while partially implemented for WINDOWS.
+// // TODO :
+// // HISTORY : Date of Creation: 5-Oct-2008
+// Modified: 16-Nov-2008 : 'strtokenizer' function added.
+// Modified: 14-Jan-2009 : GetTicket, IsTicket,
IsAbortRequest is added
+// Modified: 15-Jan-2009 : GetMachineFileName aded for load
balancing
+// //
+*/
+
+#ifndef _REQUESTHANDLER_H_
+#define _REQUESTHANDLER_H_
+
+#include <netdb.h>
+#include <pthread.h>
+#include "defs.h"
+
+class RequestHandler
+{
+private:
+ int i_ServerSock;
+ int i_Port;
+ int i_ClientSock;
+ int i_ReadBufRemain;
+ bool b_FullyReadSockData;
+ char *pz_Request;
+ char *pz_ReadBuf;
+ char z_ClientAddr[IP_ADDR_LEN];
+ char z_ReadBuf[MAX_READ_BUF_LEN + 1];
+
+ FILE *f_ErrorLog;
+ FILE *f_AccessLog;
+
+ int ReadSockData(); /* read socket data to z_ReadBuf, Here if the return
value is equal to the
+ * MAX_READ_BUF_LEN, then theres more to read at socket.
+ * So b_FullyReadSockData should be false */
+// int ReadChar(char *pcChar); // if needs to tokanize, read char by char
from the z_ReadBuf
+ void WriteErrorLog(char *pzLevel, char *pzMSG);
+ void WriteAccessLog();
+
+ char **StrTokenizer(char *pzStr, int &iNoStr);
+public:
+ RequestHandler(int iClientSock, char *pzClientIPAddr, int iPort, int
iServerSock, FILE *fErrorLog, FILE *fAccessLog);
+ virtual ~RequestHandler();
+
+ bool ProcessRequest();
+ void EndRequest();
+ bool ReadRequest();
+
+ bool IsTicket();
+ bool IsAbortRequest();
+ char *GetMachineFileName(char *pzAlias, int iNodes);
+ int GetTicket();
+
+ int WriteSockData(char *pzBuf, int iLen); // write data to Socket
+
+ char *pzReferenceID;
+ pthread_mutex_t mutex_FileLock;
+};
+
+#endif //_REQUESTHANDLER_H_
+
=======================================
--- /dev/null
+++ /trunk/hpc4server/TestHPC4FServer.cpp Sat Jan 16 23:52:53 2010
@@ -0,0 +1,140 @@
+/*
+ Copyright (c) 2008 by contributors:
+
+ * Damitha Premadasa
+ * Nilendra Weerasinghe
+ * Thilina Dampahala
+ * Waruna Ranasinghe - (http://warunapw.blogspot.com)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+/*
+// //
+// // PROJECT : HPC4Finance
+// // MODULE : HPC4FServer which runs in Master Node
+// // FILE : TestHPC4FServer.cpp
+// // AUTHOR : Waruna Ranasinghe (waru...@gmail.com)
+// // DESC : The implementation of the HPC4FServer is done here.
+// // TODO :
+// // HISTORY : Date of Creation: 5-Oct-2008
+// Modified: 19-Jan-2009 : added signal handling
+// //
+*/
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <signal.h>
+#include "HPC4FServer.h"
+
+FILE *fpA;
+FILE *fpE;
+
+void finalize(int iParam)
+{
+ printf("\nHPC4FServer Shutting Down Sequence is started....\nClosing
files....\n");
+ fclose(fpA);
+ fclose(fpE);
+ printf("BYE!\n");
+ fflush(stdout);
+ exit(0);
+}
+
+int main(int argc, char **argv)
+{
+ signal(SIGINT, &finalize);
+
+ int iPort = 8080;
+ char *pzHost = NULL;
+ int iOption;
+ int iErrFlag = 0;
+ int c;
+
+ /* It is possible to run the server without specifying the Host or Port
+ * So that the default Host = localhost, Port = 8080.
+ * It is also possible to specify the Host and the port when starts
+ * the server by using options "-h <host_ip> -p <port>"
+ * */
+
+ while ( (iOption = getopt(argc, argv, "h:p:")) != -1 )
+ {
+ switch (iOption)
+ {
+ case 'h':
+ pzHost = optarg;
+ break;
+ case 'p':
+ iPort = atoi(optarg);
+ break;
+ default:
+ iErrFlag++;
+ }
+ }
+
+ if (iErrFlag)
+ {
+ printf("Usage: %s [-h <host ip>] [-p <port>]\n", argv[0]);
+ exit (1);
+ }
+
+ struct timeval stTimeout;
+ enum eSelectStatus eResult;
+
+ /* Initiate the server with Host and the listening port */
+ fpA = fopen("access.log", "a");
+ fpE = fopen("error.log", "a");
+ if (fpA == NULL)
+ {
+ printf("ERROR FPA: %s\n", strerror(errno));
+ }
+ if (fpE == NULL)
+ {
+ printf("ERROR FPE: %s\n", strerror(errno));
+ }
+ HPC4FServer server(pzHost, iPort, fpA , fpE);
+// server.SetAccessLog(fp);
+ if (!server.CreateHPC4FServer())
+ {
+ printf("Error Creating the HPC4FServer!\n");
+ return -1;
+ }
+
+ printf("Welcome to HPC4FServer! Listening on port %d\n", iPort);
+
+// server.SetAccessLog(stdout);
+// server.SetErrorLog(stdout);
+
+ while(true)
+ {
+ /* Timeout value has to be set over and over again,
+ * since in Linux systems timeout value may be modified
+ * during the "select" call
+ * */
+
+ stTimeout.tv_sec = 0;
+ stTimeout.tv_usec = 50;
+ eResult = server.GetConnection(&stTimeout);
+ if (eResult == TIMEOUT)
+ {
+ continue;
+ }
+ if (eResult < ERR)
+ {
+ printf("Error ... \n");
+ continue;
+ }
+ }
+
+ return 0;
+}
+
=======================================
--- /dev/null
+++ /trunk/hpc4server/defs.h Sat Jan 16 23:52:53 2010
@@ -0,0 +1,67 @@
+/*
+ Copyright (c) 2008 by contributors:
+
+ * Damitha Premadasa
+ * Nilendra Weerasinghe
+ * Thilina Dampahala
+ * Waruna Ranasinghe - (http://warunapw.blogspot.com)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/*
+// //
+// // PROJECT : HPC4Finance
+// // MODULE : HPC4FServer which runs in Master Node
+// // FILE : defs.h
+// // AUTHOR : Waruna Ranasinghe (waru...@gmail.com)
+// // DESC : The header file defines the constants and other required
definitions
+// for achieve interoperability amoung different operating systems
+// // TODO :
+// // HISTORY : Date of Creation: 5-Oct-2008
+// Modified: 6-Nov-2008 : added #ifdef _REQUESTHANDLER_H_
+// //
+*/
+
+#ifndef _DEFS_H_
+#define _DEFS_H_
+
+#ifdef _WIN32
+#define snprintf _snprintf
+#define index strchr
+#define rindex strrchr
+#define SIGPIPE SIGABRT
+#endif //_WIN32
+#define bzero(a, b) memset((a),0,(b))
+
+#define MAX_QUEUE_LEN 128
+#define IP_ADDR_LEN 17
+#define MAX_READ_BUF_LEN 4096
+#define MAX_TIMEOUT 5
+#define HPC4F_MAX_BUF 8192
+#define INVALID_REQUEST "-1"
+#define ANY_ADDR NULL
+
+#ifdef _REQUESTHANDLER_H_
+#define P_NOWAIT _P_NOWAIT
+#define READ_HANDLE 0
+#define WRITE_HANDLE 1
+#define BUF_SIZE 512
+#endif //_REQUESTHANDLER_H_
+
+//#ifdef _HPC4FSERVER_H_
+#define HPC4FSERVER_MAX_THREADS 100
+#define HPC4FSERVER_MIN_THREADS 50
+#define HPC4FSERVER_MAX_QUEUE 10
+//#endif //_HPC4FSERVER_H_
+#endif //_DEFS_H_
=======================================
--- /dev/null
+++ /trunk/hpc4server/tpool.cpp Sat Jan 16 23:52:53 2010
@@ -0,0 +1,240 @@
+/*
+ Copyright (c) 2008 by contributors:
+
+ * Damitha Premadasa
+ * Nilendra Weerasinghe
+ * Thilina Dampahala
+ * Waruna Ranasinghe - (http://warunapw.blogspot.com)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+/*
+// //
+// // PROJECT : HPC4Finance
+// // MODULE : HPC4FServer which runs in Master Node
+// // FILE : tpool.cpp
+// // AUTHOR : Waruna Ranasinghe (waru...@gmail.com)
+// // DESC : This file implements the Tpool class which is used to
create thread pools and
+// the tpoll_thread function which is run by the threads.
+// // TODO :
+// // HISTORY : Date of Creation: 20-Nov-2008
+// Modified: 14-Jan-2009 : added GetNextReferenceID
+// //
+*/
+
+#include "tpool.h"
+#include <stdlib.h>
+#include <string.h>
+#include "RequestHandler.h"
+
+//**********************************************************************************************************
+void *tpool_thread(void *tpool_)
+{
+ Tpool *tpool = (Tpool *)tpool_;
+ tstRequest *my_workp;
+ for (;;)
+ {
+ pthread_mutex_lock(&(tpool->mutex_QueueLock));
+ while ((tpool->i_CurQueueSize == 0) && (!tpool->b_Shutdown))
+ {
+ pthread_cond_wait(&(tpool->cond_QueueNotEmpty),
&(tpool->mutex_QueueLock));
+ }
+ if (tpool->b_Shutdown)
+ {
+ pthread_mutex_unlock(&(tpool->mutex_QueueLock));
+ pthread_exit(NULL);
+ }
+
+ my_workp = tpool->pst_ReqQueueHead;
+ tpool->i_CurQueueSize--;
+
+ if (tpool->i_CurQueueSize == 0)
+ tpool->pst_ReqQueueHead = tpool->pst_ReqQueueTail = NULL;
+ else
+ tpool->pst_ReqQueueHead = my_workp->next;
+ if ((!tpool->b_DoNotBlock) && (tpool->i_CurQueueSize==
(tpool->i_MaxQueueSize - 1)))
+ pthread_cond_broadcast(&(tpool->cond_QueueNotFull));
+ if (tpool->i_CurQueueSize == 0)
+ pthread_cond_signal(&(tpool->cond_QueueEmpty));
+ pthread_mutex_unlock(&(tpool->mutex_QueueLock));
+ RequestHandler *pRequestHandler = (RequestHandler *)my_workp->arg;
+ if (!pRequestHandler->ProcessRequest())
+ printf("Error While processing request\n");
+ pRequestHandler->EndRequest();
+ printf("Request Ended\n\n");
+ //(*(my_workp->routine))(my_workp->arg);
+ if (!pRequestHandler->IsAbortRequest())
+ {
+ pthread_mutex_lock(&(tpool->mutex_QueueLock));
+ tpool->piReferences[atoi(pRequestHandler->pzReferenceID)] = -1;
+ pthread_mutex_unlock(&(tpool->mutex_QueueLock));
+ }
+ free(my_workp);
+ }
+}
+
+//**********************************************************************************************************
+Tpool::Tpool(int iNoThreads, int iMaxQueueSize, bool bDoNotBlock)
+{
+ int iReturn;
+ /* initialize the fields */
+ i_NoThreads = iNoThreads;
+ i_MaxQueueSize = iMaxQueueSize;
+ b_DoNotBlock = bDoNotBlock;
+ if ((p_threads = (pthread_t *)malloc(sizeof(pthread_t)*iNoThreads)) ==
NULL)
+ perror("malloc"), exit(-1);
+ i_CurQueueSize = 0;
+ pst_ReqQueueHead = NULL;
+ pst_ReqQueueTail = NULL;
+ b_QueueClosed = 0;
+ b_Shutdown = 0;
+ if ((iReturn = pthread_mutex_init(&(mutex_QueueLock), NULL)) != 0)
+ fprintf(stderr,"pthread_mutex_init %s",strerror(iReturn)),
exit(-1);
+ if ((iReturn = pthread_cond_init(&(cond_QueueNotEmpty), NULL)) != 0)
+ fprintf(stderr,"pthread_cond_init %s",strerror(iReturn)),
exit(-1);
+ if ((iReturn = pthread_cond_init(&(cond_QueueNotFull), NULL)) != 0)
+ fprintf(stderr,"pthread_cond_init %s",strerror(iReturn)),
exit(-1);
+ if ((iReturn = pthread_cond_init(&(cond_QueueEmpty), NULL)) != 0)
+ fprintf(stderr,"pthread_cond_init %s",strerror(iReturn)),
exit(-1);
+
+ piReferences = new int[iNoThreads];
+ /* create threads */
+ for (int i = 0; i != iNoThreads; i++)
+ {
+ if ((iReturn = pthread_create( &(p_threads[i]), NULL, tpool_thread,
(void *)this)) != 0)
+ fprintf(stderr,"pthread_create %d",iReturn), exit(-1);
+ piReferences[i] = -1;
+ }
+ printf("\n"), fflush(stdout);
+}
+
+//**********************************************************************************************************
+int Tpool::AddWork(void *routine, void *arg)
+{
+ tstRequest *workp;
+ pthread_mutex_lock(&mutex_QueueLock);
+ if ((i_CurQueueSize == i_MaxQueueSize) && b_DoNotBlock)
+ {
+ pthread_mutex_unlock(&mutex_QueueLock);
+ return -1;
+ }
+ while ((i_CurQueueSize == i_MaxQueueSize) && (!(b_Shutdown ||
b_QueueClosed)))
+ {
+ pthread_cond_wait(&cond_QueueNotFull, &mutex_QueueLock);
+ }
+ if (b_Shutdown || b_QueueClosed)
+ {
+ pthread_mutex_unlock(&mutex_QueueLock);
+ return -1;
+ }
+ /* allocate work structure */
+ workp = (tstRequest *)malloc(sizeof(tstRequest));
+ workp->routine = (void (*)(void*))routine;
+ workp->arg = arg;
+ workp->next = NULL;
+ if (i_CurQueueSize == 0)
+ {
+ pst_ReqQueueTail = pst_ReqQueueHead = workp;
+ pthread_cond_broadcast(&cond_QueueNotEmpty);
+ }
+ else
+ {
+ (pst_ReqQueueTail)->next = workp;
+ pst_ReqQueueTail = workp;
+ }
+ i_CurQueueSize++;
+ pthread_mutex_unlock(&mutex_QueueLock);
+ return 1;
+}
+
+//**********************************************************************************************************
+int Tpool::Destroy(int iFinish)
+{
+ int iReturn;
+ tstRequest *cur_nodep;
+ if ((iReturn = pthread_mutex_lock(&(mutex_QueueLock))) != 0)
+ fprintf(stderr,"pthread_mutex_lock %d",iReturn), exit(-1);
+ /* Is a shutdown already in progress? */
+ if (b_QueueClosed || b_Shutdown)
+ {
+ if ((iReturn = pthread_mutex_unlock(&(mutex_QueueLock))) != 0)
+ fprintf(stderr,"pthread_mutex_unlock %d",iReturn), exit(-1);
+ return 0;
+ }
+
+ b_QueueClosed = 1;
+ /* If the iFinish flag is set, wait for workers to drain queue */
+ if (iFinish == 1)
+ {
+ while (i_CurQueueSize != 0)
+ {
+ if ((iReturn = pthread_cond_wait(&(cond_QueueEmpty),
&(mutex_QueueLock))) != 0)
+ fprintf(stderr,"pthread_cond_wait %d",iReturn), exit(-1);
+ }
+ }
+ b_Shutdown = 1;
+
+ if ((iReturn = pthread_mutex_unlock(&(mutex_QueueLock))) != 0)
+ fprintf(stderr,"pthread_mutex_unlock %d",iReturn), exit(-1);
+ /* Wake up any workers so they recheck shutdown flag */
+ if ((iReturn = pthread_cond_broadcast(&(cond_QueueNotEmpty))) != 0)
+ fprintf(stderr,"pthread_cond_broadcast %d",iReturn), exit(-1);
+ if ((iReturn = pthread_cond_broadcast(&(cond_QueueNotFull))) != 0)
+ fprintf(stderr,"pthread_cond_broadcast %d",iReturn), exit(-1);
+
+ /* Wait for workers to exit */
+ for(int i = 0; i < i_NoThreads; i++)
+ {
+ if ((iReturn = pthread_join(p_threads[i],NULL)) != 0)
+ fprintf(stderr,"pthread_join %d",iReturn), exit(-1);
+ }
+
+ /* Now free pool structures */
+ free(p_threads);
+ while(pst_ReqQueueHead != NULL)
+ {
+ cur_nodep = pst_ReqQueueHead->next;
+ pst_ReqQueueHead = pst_ReqQueueHead->next;
+ free(cur_nodep);
+ }
+
+ delete []piReferences;
+ return 0;
+}
+
+//**********************************************************************************************************
+Tpool::~Tpool()
+{
+
+}
+
+//**********************************************************************************************************
+int Tpool::GetNextReferenceID()
+{
+ pthread_mutex_lock(&mutex_QueueLock);
+ int i = 0;
+ while (piReferences[i] != -1 && i < i_NoThreads )
+ {
+ i++;
+ if (i == i_NoThreads)
+ i = 0;
+ }
+
+ // if i == i_NoThreads send error msg
+ piReferences[i] = i;
+ pthread_mutex_unlock(&mutex_QueueLock);
+
+ return i;
+}
+
+//**********************************************************************************************************
=======================================
--- /dev/null
+++ /trunk/hpc4server/tpool.h Sat Jan 16 23:52:53 2010
@@ -0,0 +1,75 @@
+/*
+ Copyright (c) 2008 by contributors:
+
+ * Damitha Premadasa
+ * Nilendra Weerasinghe
+ * Thilina Dampahala
+ * Waruna Ranasinghe - (http://warunapw.blogspot.com)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+/*
+// //
+// // PROJECT : HPC4Finance
+// // MODULE : HPC4FServer which runs in Master Node
+// // FILE : tpool.h
+// // AUTHOR : Waruna Ranasinghe (waru...@gmail.com)
+// // DESC : This header file defines the Tpool class which is used to
create thread pools
+// // TODO :
+// // HISTORY : Date of Creation: 20-Nov-2008
+// Modified: 14-Jan-2009 : added GetNextReferenceID
+// //
+*/
+
+#ifndef _TPOOL_H_
+#define _TPOOL_H_
+
+#include <stdio.h>
+#include <pthread.h>
+
+typedef struct stRequest
+{
+ void (*routine)(void *);
+ void *arg;
+ struct stRequest *next;
+} tstRequest;
+
+class Tpool
+{
+public:
+ int *piReferences;
+ unsigned int i_NoThreads;
+ bool b_QueueClosed;
+ unsigned int i_MaxQueueSize;
+ bool b_DoNotBlock;
+ bool b_Shutdown;
+ /* pool state */
+ pthread_mutex_t mutex_QueueLock;
+ pthread_cond_t cond_QueueNotEmpty;
+ pthread_cond_t cond_QueueNotFull;
+ pthread_cond_t cond_QueueEmpty;
+ pthread_t *p_threads;
+ int i_CurQueueSize;
+ tstRequest *pst_ReqQueueHead;
+ tstRequest *pst_ReqQueueTail;
+
+ Tpool(int iNoThreads, int iMaxQueueSize, bool bDoNotBlock);
+ ~Tpool();
+ int AddWork(void *routine, void *arg);
+ int Destroy(int iFinish);
+ int GetNextReferenceID();
+};
+
+//void *tpool_thread(void *tpool_);
+
+#endif //_TPOOL_H_

Reply all
Reply to author
Forward
0 new messages