Microtiger
unread,Jul 28, 2009, 1:49:28 AM7/28/09Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to China Linux Fans
各位好,我使用select和多线程设计一个并发服务器,实现对客户端请求的并发处理,这样设想:每当select到一个socket有数据可以读写的
时候就创建一个线程进行数据处理,代码如下:
/
*-------------------------------------------------------------------------------------------------------
* server.c
*
* a tcp connection on server side.
* use select to accept multi-client connections.
*
* date:2009.07.08 over
*
*--------------------------------------------------------------------------------------------------------
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <sys/select.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <errno.h>
#include <unistd.h>
#if 1 /*07.24 add */
#include <pthread.h> //for pthread
#include <errno.h> //for EINTR
#include <netdb.h> //for struct servent
#include <stdarg.h> //for va_list
#include <netinet/in.h>
#include <sys/resource.h>
#endif
#include "mysocket.h"
#define TRANSFER_PORT 4009
#define MAX_LISTEN 128
#if 1 /*07.24 add*/
#define QLEN 32 /*max connection queue length*/
#define BUFSIZE 4096
#define INTERVAL 5 /*secs*/
#endif
struct
{
pthread_mutex_t st_mutex; /*互斥*/
unsigned int st_concount; /*通信中的连接数*/
unsigned int st_contotal; /*已经完成通信的连接数*/
unsigned long st_contime; /*总连接时间*/
unsigned long st_bytecount; /*每个连接所传输的平均字节数*/
} stats;
extern int errno;
int init_transfer();
void fini_transfer();
int recv_data(int sock, char *buf, int len);
int sig_handler(void);
void sig_handler_func(int sig);
int select_conns();
int transfer_sock;
int err_exit(const char *format, ...); //07.24 add
//错误处理
void EOP(const char* error_code)
{
printf("sender error: %s;\n",error_code);
exit(1);
}
/*
*print error and exit
* 07.24 add
*/
int err_exit(const char *format, ...)
{
va_list args;
va_start(args, format);
vfprintf(stderr, format, args);
va_end(args);
exit(1);
}
void fini_transfer()
{
int ret;
ret = shutdown(transfer_sock, SHUT_RDWR);
ret = close(transfer_sock);
if(ret < 0)
perror("\nfini_transfer: ");
else
printf("\n\n %s: success shutdown transfer socket!\n
\n", __FUNCTION__);
}
//得到指定文件的长度
int get_file_length(char *filename)
{
int fsize = 0,temp;
char buf[512];
FILE * file=fopen(filename,"r");
if (file==NULL)
EOP("no such file.");
while( (temp=fread(buf,1,512,file))!=0 )
{
if(temp==-1)
EOP("fread file error.");
fsize+=temp;
}
fclose(file);
return fsize;
}
int recv_data(int sock, char *buf, int len)
{
int data = 0;
memset(buf, 0, len);
data = recv(sock, buf, len, 0);
if(data < 0)
{
perror("recv error: ");
return -1;
}
else if (data == 0)
{
printf("client shutdown the socket %d … \n", sock);
return 0;
}
printf("recv %d data.\n", data);
return data;
}
void sig_handler_func(int sig)
{
fini_transfer();
exit(0);
}
int sig_handler(void)
{
int ret = 0;
struct sigaction sact;
ret = sigemptyset(&sact.sa_mask);
if(ret < 0)
return ret;
sact.sa_flags = 0;
sact.sa_handler = sig_handler_func;
ret = sigaction(SIGQUIT, &sact, NULL);
if(ret < 0)
return ret;
sact.sa_flags = 0;
sact.sa_handler = sig_handler_func;
ret = sigaction(SIGINT, &sact, NULL);
if(ret < 0)
return ret;
sact.sa_flags = 0;
sact.sa_handler = sig_handler_func;
ret = sigaction(SIGKILL, &sact, NULL);
if(ret < 0)
return ret;
return 0;
}
/*
*接收数据的函数
*参数sockfd是已经建立的socket连接的描述符
*/
static int rcv_data(int sockfd)
{
size_t e = 0;
FILE *fd = NULL;
unsigned int count = 0;
char buf4receive[MAXLINE + 1] = {0};
#if 0
printf("input the filename you want to saved: "); //07.23 add
scanf("%s",file_name); // 07.23
add
int flen = get_file_length ( file_name ); //07.23 add
#endif
if (sockfd < 0) /*如果描述小于零则直接退出*/
{
printf("ERROR:bad socket!\n");
return (-1);
}
fd = fopen("myfiles","w"); /*创建接收文件,命名为myfiles*/
if (fd == NULL) /*若创建文件失败则退出*/
{
printf("ERROR:create file failed!\n");
return (-1);
}
while (1)
{
count = readn(sockfd, buf4receive, MAXLINE); /*从socket连接读取数据*/
if (count > 0) /*如果count大于零则表示读到了数据*/
{
// decode_data(count,buf4receive,buf4receive); /*对数据进行解密*/
e = fwrite(buf4receive, sizeof(char), count,fd); /*将数据写到文件*/
/*e是实际写到文件中的字节数,如果实际写的和要求的不同,则表示写文件出错*/
if (e != count)
{
printf("ERROR:write data to file failed!\n");
fclose(fd);
return (-1);
}
}
//MT
#if 0
if (count > 0)
{
printf("readed the data\n");
}
#endif
else if (count == 0) /*如果count等于零表示数据接收完*/
{
printf("OK:file transmittion completted!\n");
fclose(fd);
return (0);
}
else if (count == -1) /*如果count等于负一则表示读数据出错*/
{
printf("ERROR:read data failed!\n");
fclose(fd);
return (-2);
}
}
}
/*
*prestats --- print server statistical data
*输出连接的相关统计信息
*/
void prstats(void)
{
time_t now;
while(1)
{
(void) sleep(INTERVAL); // sleep INTERVAL sec
(void) pthread_mutex_lock(&stats.st_mutex);
now = time(0);
(void) printf("--- %s", ctime(&now));
(void) printf("%-32s: %u\n", "current
connections",stats.st_concount);
(void) printf("%-32s: %u\n", "completed connections",
stats.st_contotal);
if (stats.st_contotal)
{
(void) printf("%-32s: %.2f (secs)\n", "average complete connection
time",
(float) stats.st_contime / (float)
stats.st_contotal);
(void) printf("%-32s: %.2f (secs)\n", "average byte count",
(float) stats.st_bytecount / (float)
(stats.st_contotal + stats.st_concount));
}
(void) printf("%-32s: %lu\n\n", "total byte count",
stats.st_bytecount);
(void) pthread_mutex_unlock(&stats.st_mutex);
}
}
int main()
{
int i;
int ssock;
int ret = 0;
//MT char buf[4096] = { 0 };
int len;
int client_fd[MAX_LISTEN];
int max_fd = 0;
int sock = -1;
//MT pid_t childpid;
//MT pid_t pid;
// char file_name[128],ip_addr[20]; //07.23 add
struct sockaddr_in addr;
fd_set client_conns; /* 声明fd_set 集合来保存要检测的 socket句柄 */
struct timeval tv; /* 声明一个时间变量来保存时间 */
struct sockaddr_in client_addr;
tv.tv_sec = 20;
tv.tv_usec = 0; /* 设置select等待的最大时间为1.5秒*/
pthread_t th; //07.24 add
pthread_attr_t ta; //07.24 add
//MT init_transfer();
sock = socket(PF_INET, SOCK_STREAM, 0);
if(sock == -1)
{
perror("failed to create socket.\n");
return -1;
}
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr("0.0.0.0");
addr.sin_port = htons(TRANSFER_PORT);
if(bind(sock, (struct sockaddr *) &addr, sizeof(struct
sockaddr_in)) == -1)
{
perror("Failed to bind socket.\n");
return -1;
}
if(listen(sock, MAX_LISTEN) == -1)
{
printf("Failed to bind socket. errno: %d\n", errno);
return -1;
}
transfer_sock = sock;
printf(" %s: init transfer socket %d success\n",
__FUNCTION__, sock);
sig_handler();
for(i = 0; i < MAX_LISTEN; i ++)
client_fd[i] = -1; /*初始化为-1*/
FD_ZERO(&client_conns); /* 用select函数之前先把集合清零 */
max_fd = transfer_sock;
(void)pthread_attr_init(&ta); //暂时不用
(void)pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
(void)pthread_mutex_init(&stats.st_mutex, 0);
#if 0
if (pthread_create(&th, &ta, (void* (*)(void *))prstats, 0) < 0) //
监控程序 07.24
err_exit("pthread_create(prstats):%s\n", strerror(errno));
#endif
while(1)
{
/*
* NOTE:这里必须把所有正在listen的socket重新加入select队列,
* 如果不把该socket重新加入,则select无法监听这个socket的状态变化,也就无法accept新的连接。
*/
FD_ZERO(&client_conns); //每次循环都要清空集合,以便
检测描述符变化
// FD_CLR(transfer_sock, &client_conns); /* 在文件描述符集合中删除一个文件描述符*/
FD_SET(transfer_sock, &client_conns); /* 把要检测的句柄socket加入到集合里 */
#if 0
ret = select(max_fd + 1, &client_conns, NULL, NULL, &tv); /* 检测上面设置到集
合client_conns里的句柄是否有可读信息 */
if (ret < 0)
{
perror("select(): ");
exit(-1);
}
else if (ret == 0)
{
printf(" select timeout …\n"); /* 说明在设定的时间值1.5秒的时间内,socket的状态没有发生变化
*/
continue; /*再次轮询*/
}
else
{
/*
*说明等待时间还未到1.5秒,socket的状态发生了变化
*ret这个返回值记录了发生状态变化的句柄的数目,
*由于只监视了socket这一个句柄,所以这里一定ret=1,
*如果同时有多个句柄发生变化返回的就是句柄的总和
*/
printf("ret=%d\n", ret);
}
printf("select return something … \n");
#endif
switch(select(max_fd + 1, &client_conns, NULL, NULL, &tv)) //select使
用
{
case -1:
perror("select(): ");
break; //select错误,退出程序
case 0:
printf(" select timeout …\n");
break; //再次轮询
default:
if(FD_ISSET(transfer_sock, &client_conns)) //测试sock是否可读,即是否网络上有数
据
{
len = sizeof(struct sockaddr_in);
memset(&client_addr, 0, len);
ssock = accept(transfer_sock, (struct sockaddr *)
&client_addr, &len );
if (ssock < 0)
{
if (errno == EINTR)
continue;
err_exit("accept:%s\n", strerror(errno));
}
}
}
#if 0
/* 如果有新的客户连接*/
if(FD_ISSET(transfer_sock, &client_conns)) /* 先判断一下socket这外被
监视的句柄是否真的变成可读 */
{
len = sizeof(struct sockaddr_in);
memset(&client_addr, 0, len);
ssock = accept(transfer_sock, (struct sockaddr *)
&client_addr, &len );
if (ssock < 0)
{
if (errno == EINTR)
continue;
err_exit("accept:%s\n", strerror(errno));
}
#endif //07.28
// pthread_create(&th, &ta, (void * (*) (void *))rcv_data, (void *)
client_fd[i]);
/*
* 把新连接加入select队列中
*/
for (i = 0; i < MAX_LISTEN; i ++)
{
if(client_fd[i] != -1)
continue;
client_fd[i] = ssock;
// printf("got a connection from %s.\n", inet_ntoa(client_fd
[i].addr.sin_addr));
break;
}
FD_SET(ssock , &client_conns); //把ssock加入到
client_conns
if(max_fd < ssock)
max_fd = ssock;
printf("add client socket connection %d to select\n", ssock);
continue;
// }
for(i = 0; i < MAX_LISTEN; i ++)
{
if(client_fd[i] == -1)
continue;
if(!FD_ISSET(client_fd[i], &client_conns)) //检察一下于client_conns关联的
文件句柄client_fd[i]是否有数据可读
continue;
printf("OK:I received a connection!\n");
if (pthread_create(&th, &ta, (void * (*) (void *))rcv_data,
(void *)client_fd[i]) < 0) //07.28 modify
// ret = pthread_create(&th, &ta, (void * (*) (void *))rcv_data,
(void *)client_fd[i]);
err_exit("pthread_create: %s\n", strerror(errno)); //07.24 add
//07.24 ret = rcv_data(client_fd[i]); /*接收数据函数*/
#if 0
if(ret <= 0)
{
close(client_fd[i]);
FD_CLR(client_fd[i], &client_conns);
printf("del socket %d from select pipe.\n", client_fd[i]);
client_fd[i] = -1;
}
#endif //07.28
}
max_fd = transfer_sock;
for(i = 0; i < MAX_LISTEN; i ++)
{
if(max_fd < client_fd[i])
{
max_fd = client_fd[i];
}
}
printf("this cycle of select loop over, max_fd = %d\n", max_fd);
printf("Note: the file transported successful, waiting for
transporting another one!\n");
}
fini_transfer();
return 0;
}
/*
------------------------------------------------------------------------------------------------------------
*
*File Name: mysocket.c
*Description: defination of the basic functions that were declared in
mysocket.h
*Date:2009/06/18
*Author:
*modify:2009/07/08
*
*-------------------------------------------------------------------------------------------------------------
*/
#include "mysocket.h"
ssize_t readn(int fd, void *buf, size_t length)
{
size_t nleft = length;
char *ptr = buf;
ssize_t nread;
while (nleft > 0)
{
if ((nread = read(fd, ptr, nleft)) < 0)
{
if (errno == EINTR) /*中断错误,继续写*/
nread = 0;
else
return (-1); /*其他错误,只要退出*/
}else if (nread == 0)
{
break; /*end of file*/
}
nleft -= nread;
ptr += nread; /*从剩余的地方继续写*/
}
return (length - nleft);
}
ssize_t writen(int fd, const void *buf, size_t length)
{
size_t nleft = length;
const char *ptr = buf;
ssize_t nwritten;
while ( nleft > 0)
{
if ((nwritten = write(fd, ptr, nleft)) <= 0)
{
if (nwritten < 0 && errno == EINTR)
{
nwritten = 0; /*invoke write() again.*/
}else
{
return (-1); /*error!*/
}
}
nleft -= nwritten;
ptr += nwritten;
}
return (length);
}
static int read_cnt;
static char *read_ptr = NULL;
static char read_buf[MAXLINE];
static ssize_t n_read(int fd, char *ptr)
{
if (read_cnt <= 0)
{
again:
if ((read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0)
{
if (errno == EINTR)
{
goto again;
}
return (-1);
}else if (read_cnt == 0)
{
return (0);
}
read_ptr = read_buf;
}
read_cnt--;
*ptr = *read_ptr++;
return (1);
}
ssize_t read_line(int fd, void *vptr, size_t maxlen)
{
ssize_t n,rc;
char c,*ptr;
ptr = vptr;
for (n = 1; n < maxlen; n++)
{
if ((rc = n_read(fd,&c)) == 1)
{
*ptr++ = c;
if (c == '\n') break; /*new line is stored.*/
}else if (rc == 0)
{
*ptr = 0;
return (n - 1); /*end of file,n - 1 bytes were read.*/
}else
{
return (-1); /*error occured.*/
}
}
*ptr = 0;
return (n);
}
#Makefile
CC = gcc
CFLAGS = -Wall -O2 -g -I$(@D)
AR = ar
EXEC = server
OBJS = server.o mysocket.o
LIBPTHREAD= -lpthread
all:$(EXEC)
$(EXEC): $(OBJS)
$(CC) $(LDFLAGS) -o $@ $(OBJS) $(LIBPTHREAD)
clean:
rm -f *.o server $(OBJ)
//////////////////////////////////////////////////////////////
//client 端程序
//////////////////////////////////////////////////////////////
/*
-----------------------------------------------------------------------------------------------------------
* File Name: client.c
* Description:
* Date:2009/06/18
* usage : ./client IP_ADDR
*-------------------------------------------------------------------------------------------------------------
*/
#include "mysocket.h"
#include "tcp_connect.h"
//MT#include "des.h"
/*loacl variables*/
//MTstatic des_context ctx; /*des 加密算法函数使用到的上下文*/
//MTstatic char des_key[8] = {0x20, 0x05, 0xAF, 0x45, 0xA3, 0xF1,
0x80, 0x78}; /*加密密码*/
/*loacl functions*/
//MTstatic int encrypt_data(const int count, char *input, char
*output); /*加密数据函数*/
static int send_data(int sockfd); /*传送数据函数*/
int main(int argc, char **argv)
{
/*
*127.0.0.1为本机IP,即如果server和client运行于同一台机器的话则用该IP
*如果其分别运行于不同机器,则此IP必须设置为server所在机器IP
*/
char *host = "192.168.1.1";
int sockfd;
if ((argc!=2) || (argc > 2))
{
printf("Usage:%ss[hostname or IP address]\n", argv[0]);
exit(1);
}
if (argc == 2)
{
host = argv[1];
}
//MT des_set_key(&ctx, des_key); /*设置DES解密密码*/
sockfd = tcp_connect(host, "4009"); /*使用TCP协议连接服务器的4009端口*/
if (sockfd < 0)
{
printf("sockfd is %d\n",sockfd);
printf("\tERROR:Make sure that the server have been in runing.\n");
exit(1);
}
send_data(sockfd); /*如果连接成功则开始传输数据*/
close(sockfd);
exit(0);
}
#if 0
/
*---------------------------------------------------------------------------------------------------------------------
* encrypt_data函数每次只能对8个字节进行加密,
* 如果数据count不能被8整除,则剩余的数据不加密
*---------------------------------------------------------------------------------------------------------------------
*/
static int
encrypt_data(const int count, char *input, char *output)
{
int n;
char *iptr = input;
char *optr = output;
if(!count || input == NULL || output == NULL)
return (-1);
for(n = count / 8; n > 0; n--)
{
des_encrypt(&ctx, iptr, optr);
iptr += 8;
optr += 8;
}
#if 0 //How to deal with last bytes less than 8? I have no idea
about it for now. Do nothing may be a good idea.
m = n % 8;
bzero(optr + m,8 - m);
des_encrypt(&ctx, iptr, optr);
#endif
return (0);
}
#endif
/*
*数据传输
*/
static int send_data(int sockfd)
{
FILE *fd = NULL;
unsigned int count = 0;
char buf4send[MAXLINE + 1] = {0};
fd = fopen("./test.avi", "r"); /*打开当前目录下文件*/
if (fd == NULL)
{
printf("ERROR:Open test file failed.\n");
close(sockfd);
exit(1);
}
while ((count = fread(buf4send, sizeof(char), MAXLINE,fd))) /*从文件中读取数据
*/
{
//MT encrypt_data(count, buf4send, buf4send); /*对从文件中读取的数据进行加密*/
writen(sockfd, buf4send, count); /*传输给服务器*/
}
fclose(fd);
return (0);
}
现在程序不能正常工作,我对select和多线程编程理解的都很肤浅,第一次编写这样的程序,希望maillist的各位前辈能多多指点下,万分感
谢!