iocp 多次投递 保证序列吗?

36 views
Skip to first unread message

papamms

unread,
Dec 17, 2006, 11:13:47 AM12/17/06
to 高性能网络编程邮件列表
我改了一下 微软的例子代码。 我现在做的事情是
读一个2M 或者更大的文件到ram.
当有连接的时候,就把所有的数据都发给那个连接。
因为文件太大了。所以要多次投递

我的代码如下

// Module Name: iocmplt.cpp
//
// Description:
//
// This sample illustrates how to develop a simple echo server
Winsock
// application using the completeion port I/O model. This
// sample is implemented as a console-style application and simply
prints
// messages when connections are established and removed from the
server.
// The application listens for TCP connections on port 5150 and
accepts them
// as they arrive. When this application receives data from a
client, it
// simply echos (this is why we call it an echo server) the data
back in
// it's original form until the client closes the connection.
//
// Compile:
//
// cl -o iocmplt iocmplt.cpp ws2_32.lib
//
// Command Line Options:
//
// iocmplt.exe
//
// Note: There are no command line options for this sample.

#include <winsock2.h>
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>


#define PORT 5150
#define DATA_BUFSIZE 8192

typedef struct
{
OVERLAPPED Overlapped; // &Ouml;&Oslash;&micro;&thorn;io
WSABUF DataBuf; // WSABUF
CHAR *Buffer;//
&Otilde;&aelig;&Ecirc;&micro;&micro;&Auml;&Ecirc;&yacute;&frac34;&Yacute;&Auml;&Uacute;&Egrave;&Yacute;
DWORD BytesSEND; //
&Euml;&Iacute;&sup3;&ouml;&Egrave;&yen;&micro;&Auml;&Ecirc;&yacute;&frac34;&Yacute;&Ecirc;&yacute;&Aacute;&iquest;
DWORD BytesRECV; //
&Ecirc;&Otilde;&micro;&frac12;&micro;&Auml;&Ecirc;&yacute;&frac34;&Yacute;&Ecirc;&yacute;&Aacute;&iquest;
} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;

class usera
{
public:
SOCKET Socket;
unsigned int sendtimes;
};


typedef struct
{
SOCKET Socket;
usera *pusers;

} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID);

char *Pdata = NULL;
CRITICAL_SECTION cs ;

void lock()
{
EnterCriticalSection (&cs);
}

void unlock()
{
LeaveCriticalSection (&cs);
}
void main(void)
{
SOCKADDR_IN InternetAddr;
SOCKET Listen;
SOCKET Accept;
HANDLE CompletionPort;
SYSTEM_INFO SystemInfo;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
unsigned int i;
DWORD RecvBytes;
DWORD Flags;
DWORD ThreadID;
WSADATA wsaData;
DWORD Ret;
FILE * datafile;

unsigned int filelen = 0;
InitializeCriticalSection (&cs) ;


datafile = fopen("1.txt", "rb");
if (!datafile) return ;
fseek(datafile, 0, 2);
filelen = ftell(datafile);

Pdata = new char [filelen +1];
Pdata[filelen] = 0;
fseek(datafile, 0, 0);
filelen = 0;


while ( fread(Pdata+filelen, 1, 1, datafile) == 1)
{
filelen+=1;
}

fclose(datafile);
Pdata[filelen] = 0;
if ((Ret = WSAStartup(0x0202, &wsaData)) != 0)
{
printf("WSAStartup failed with error %d\n", Ret);
return;
}

// Setup an I/O completion port.
if ((CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
NULL, 0, 0)) == NULL)
{
printf( "CreateIoCompletionPort failed with error: %d\n",
GetLastError());
return;
}

// Determine how many processors are on the system.

GetSystemInfo(&SystemInfo);

// Create worker threads based on the number of processors available
on the
// system. Create two worker threads for each processor.


for(i = 0; i < SystemInfo.dwNumberOfProcessors * 2; i++)
{
HANDLE ThreadHandle;

// Create a server worker thread and pass the completion port to the
thread.

if ((ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread,
CompletionPort,
0, &ThreadID)) == NULL)
{
printf("CreateThread() failed with error %d\n", GetLastError());
return;
}

// Close the thread handle
CloseHandle(ThreadHandle);
}

// Create a listening socket
&frac12;¨&Aacute;&cent;&Otilde;ì&Igrave;&yacute;socket

if ((Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,
WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
{
printf("WSASocket() failed with error %d\n", WSAGetLastError());
return;
}

InternetAddr.sin_family = AF_INET;
InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
InternetAddr.sin_port = htons(PORT);

// °&Ntilde;&para;&Euml;&iquest;&Uacute;bind
&Eacute;&Iuml;&Egrave;&yen;

if (bind(Listen, (PSOCKADDR) &InternetAddr, sizeof(InternetAddr)) ==
SOCKET_ERROR)
{
printf("bind() failed with error %d\n", WSAGetLastError());
return;
}

// Prepare socket for listening

if (listen(Listen, 5) == SOCKET_ERROR)
{
printf("listen() failed with error %d\n", WSAGetLastError());
return;
}

// Accept connections and assign to the completion port.

while(TRUE) //
&Oacute;&Agrave;&Ocirc;&para;&iquest;&ordf;&Ecirc;&frac14;&Ntilde;&shy;&raquo;·
{
if ((Accept = WSAAccept(Listen, NULL, NULL, NULL, 0)) ==
SOCKET_ERROR)
{
printf("WSAAccept() failed with error %d\n", WSAGetLastError());
return;
}
// accept &Ograve;&raquo;&cedil;&ouml;client

usera *puser = new usera;
puser->sendtimes = 0;
puser->Socket = Accept;

unsigned int send_times = 0;

while ( send_times < filelen) {

if ((PerHandleData = (LPPER_HANDLE_DATA) GlobalAlloc(GPTR,
sizeof(PER_HANDLE_DATA))) == NULL)
{
printf("GlobalAlloc() failed with error %d\n", GetLastError());
return;
}

if (send_times == 0) {

if (CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD)
PerHandleData,
0) == NULL)
{
printf("CreateIoCompletionPort failed with error %d\n",
GetLastError());
return;
}

}

PerHandleData->pusers = puser;


// Create per I/O socket information structure to associate with the

// WSARecv call below.

lock();

if ((PerIoData = (LPPER_IO_OPERATION_DATA) GlobalAlloc(GPTR,
sizeof(PER_IO_OPERATION_DATA))) == NULL)
{
printf("GlobalAlloc() failed with error %d\n", GetLastError());
return;
}

ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));//
&frac12;&laquo;io&para;&Euml;&iquest;&Uacute;
&Ouml;&Atilde;&Icirc;&ordf;&iquest;&Otilde;
PerIoData->BytesSEND = 0;// send
×&Ouml;&frac12;&Uacute;&Icirc;&ordf;0
PerIoData->BytesRECV = 0;//
&Ecirc;&Otilde;&micro;&frac12;×&Ouml;&frac12;&Uacute;


if ((filelen -send_times)>= 1000) {

PerIoData->Buffer= Pdata + send_times;
PerIoData->DataBuf.len = 1000;//
Buffer&micro;&Auml;&sup3;¤&para;&Egrave;
send_times += 1000;

}
else
{

PerIoData->Buffer= Pdata + send_times;
PerIoData->DataBuf.len = filelen -send_times;//
Buffer&micro;&Auml;&sup3;¤&para;&Egrave;
send_times = filelen;

}

printf(" sum send_times is %d \n", send_times);

PerHandleData->pusers->sendtimes += PerIoData->DataBuf.len;

PerIoData->DataBuf.buf = PerIoData->Buffer;//
&Ecirc;&Otilde;&micro;&frac12;&micro;&Auml;Buffer;

Flags = 0;
//
//
unlock();

if (WSASend(Accept, &(PerIoData->DataBuf), 1, &PerIoData->BytesSEND,
0,
&(PerIoData->Overlapped), NULL) == SOCKET_ERROR)

{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return;
}
}

}
send_times = 0;
}
}

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)//
·&thorn;&Icirc;&ntilde;&sup1;¤×÷&Iuml;&szlig;&sup3;&Igrave;
{
HANDLE CompletionPort = (HANDLE) CompletionPortID;//
&acute;&Oacute;&sup2;&Icirc;&Ecirc;&yacute;&raquo;&ntilde;&Egrave;&iexcl;&Iacute;ê&sup3;&Eacute;&para;&Euml;&iquest;&Uacute;&para;&Ocirc;&Iuml;óhandle
DWORD BytesTransferred;
LPOVERLAPPED Overlapped;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
DWORD SendBytes, RecvBytes;
DWORD Flags;

while(TRUE)// &Euml;&Agrave;&Ntilde;&shy;&raquo;·
{

if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,//
&raquo;&ntilde;&Egrave;&iexcl;&para;&Oacute;&Aacute;&ETH;&Ouml;&ETH;iocp&Ouml;&ETH;&micro;&Auml;×&acute;&Igrave;&not;
(LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) ==
0)
{
printf("GetQueuedCompletionStatus failed with error %d\n",
GetLastError());
return 0;
}


// First check to see if an error has occured on the socket and if so
// then close the socket and cleanup the SOCKET_INFORMATION structure
// associated with the socket.

if (BytesTransferred == 0)//
&micro;±·&cent;&Euml;&Iacute;&frac12;&Oacute;&Ecirc;&Otilde;&Ecirc;&yacute;&frac34;&Yacute;&Icirc;&ordf;0
{
printf("Closing socket %d\n", PerHandleData->Socket);

if (closesocket(PerHandleData->Socket) == SOCKET_ERROR)
{
printf("closesocket() failed with error %d\n", WSAGetLastError());
return 0;
}

GlobalFree(PerHandleData);//
&Ccedil;&aring;&sup3;&yacute;&micro;&yen;&Iuml;ò&Ecirc;&yacute;&frac34;&Yacute;
GlobalFree(PerIoData);//
&Ccedil;&aring;&sup3;&yacute;&Ecirc;&yacute;&frac34;&Yacute;
continue;
}

if ( BytesTransferred == PerIoData->BytesSEND) {
lock();
PerHandleData->pusers->sendtimes -=BytesTransferred;
unlock();
printf("BytesTransferred == PerIoData->BytesSEND %d need send\n",
PerHandleData->pusers->sendtimes);
//GlobalFree(PerHandleData);//
&Ccedil;&aring;&sup3;&yacute;&micro;&yen;&Iuml;ò&Ecirc;&yacute;&frac34;&Yacute;
GlobalFree(PerIoData);//
&Ccedil;&aring;&sup3;&yacute;&Ecirc;&yacute;&frac34;&Yacute;
if (PerHandleData->pusers->sendtimes ==0)
{
closesocket(PerHandleData->pusers->Socket);
//GlobalFree(PerHandleData);

GlobalFree(PerHandleData);
}

continue;
}

if ( BytesTransferred < PerIoData->BytesSEND) { //
·&cent;&Euml;&Iacute;&micro;&Auml;&Ecirc;&yacute;&frac34;&Yacute;&ETH;&iexcl;&Oacute;&Uacute;&Ograve;&ordf;&Euml;&Iacute;&micro;&Auml;&Ecirc;&yacute;&frac34;&Yacute;
lock();
PerHandleData->pusers->sendtimes -=BytesTransferred;
unlock();
printf(" %d need send\n", PerHandleData->pusers->sendtimes);
printf("send not full buff\n");
ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
//PerIoData->BytesRECV = ;
PerIoData->BytesSEND = PerIoData->BytesSEND - BytesTransferred;

PerIoData->DataBuf.len= PerIoData->BytesSEND;//
Buffer&micro;&Auml;&sup3;¤&para;&Egrave;
PerIoData->Buffer= PerIoData->Buffer + BytesTransferred;
PerIoData->DataBuf.buf= PerIoData->Buffer;//
&Ecirc;&Otilde;&micro;&frac12;&micro;&Auml;Buffer;

if (WSASend(PerHandleData->Socket, &(PerIoData->DataBuf), 1,
&(PerIoData->BytesSEND), 0,
&(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSASend() failed with error %d\n", WSAGetLastError());
return 0;
}
}
}
}


}

sunway

unread,
Dec 17, 2006, 8:30:13 PM12/17/06
to 高性能网络编程邮件列表
大家遇到过吗,在带宽不够的情况下,完成端口没有完成投递给他的-工作就返回了完成包

这个老帖子里有分析的,以后发这类帖子建议先看看老帖子

sunway

unread,
Dec 17, 2006, 8:31:24 PM12/17/06
to 高性能网络编程邮件列表
最好不要贴大量的代码,大多数人不会乐意看大量的代码,然后帮你找BUG.

papamms

unread,
Dec 17, 2006, 8:47:32 PM12/17/06
to 高性能网络编程邮件列表
看过老帖子的。 但是老帖子没有代码 看不出原因。

这个代码 在我比较老的机器上运行 有时候正常
有时候错误。

在我的新机器上 基本就时不对。

我说一下 做法。

主线程 拿到accept 过来的socket 以后,就循环使用WSASend
把数据都发出去,然后记数。

在主线程里面 使用wsasend
是否是正确的,我不清楚。或者要在工作线程里面send
才对。

工作线程,
就是把每次投递都完成,然后把计数器减小。
当计数器为0 就断开。

我贴代码,只是想让大家跑跑看
看代码在不同机器上是否一样。

按照我的想法。 服务器端会显示出
有多少字节需要发送 ,最后数值变为0, 断开连接。

但实际的情况不是这样的。
有时候能出现我预想的这个情况,大多数情况不行,显示还有内容没有发送
成功,但我的客户端显示已经拿到了足够的内容。(
我没有检测拿到的数据是否乱序,但字符总量是正确的)

papamms

unread,
Dec 17, 2006, 8:52:51 PM12/17/06
to 高性能网络编程邮件列表
typedef struct
{
OVERLAPPED Overlapped;
WSABUF DataBuf;
CHAR Buffer[DATA_BUFSIZE];
DWORD BytesSEND;
DWORD BytesRECV;
} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;


typedef struct
{
SOCKET Socket;
} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;


这2个结构是 m$ 例子里面的。
当一个连接建立起来的时候 需要建立一个
PerHandleData

PER_IO_OPERATION_DATA 结构

还需要 CreateIoCompletionPort((HANDLE) Accept, CompletionPort,
(DWORD) PerHandleData, 0)
建立一个完成相关完成端口。但是好像不能多次建立这个过程,也就是一个socket
只能建立一次。
是否是这样?

papamms

unread,
Dec 17, 2006, 10:16:30 PM12/17/06
to 高性能网络编程邮件列表
解决了 逻辑问题了。

发件人: papamms - 查看个人资料
日期: 2006年12月18日(星期一) 上午9时52分
电子邮件: "papamms" <ppmsn2...@gmail.com>
尚未评分评级:
显示选项
回复 | 答复作者 | 转发 | 打印 | 显示个别帖子 |
显示原始邮件 | 删除 | 报告滥用行为 |
查找此作者的帖子


} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;


typedef struct
{
SOCKET Socket;


} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;

只能建立一次。
是否是这样?

这个还要确认一下。

christanxw

unread,
Dec 18, 2006, 8:13:24 PM12/18/06
to 高性能网络编程邮件列表
IOCP是不会保证投递序列按投递顺序返回完成包的。《windows网络编程》说的很清楚。如果你先投递1个10K的操作,在投递一个20K的操作,你GetIoCompletion的时候可能先获取20K那个操作的完成包。
Reply all
Reply to author
Forward
0 new messages