Advertisement

IOCP AcceptEx is Not Updating IOCP Queue.

Started by May 14, 2015 11:17 PM
4 comments, last by Avilius 9 years, 5 months ago
I'm trying to modify a game server to move from asynchronous sockets to a more scalable IOCP solution. The thing is, information on IOCP in conjunction with Winsock is pretty hard to find online, and the examples I find are very much conflicting/untested. Microsoft's documentation is surprisingly inconsistent/buggy and scarce, and based on what I've found, it seems like it is somewhat of a black magic. Before butchering the server, I decided to implement a small chat program to familiarize myself with the new model. Unfortunately, I cannot continue until I can get the server to accept connection. If someone could link me to a quality source of documentation/tutorials I will be very greatful.

Anyways, the problem I am having is that AcceptEx is not adding a message to the IOCP queue when a client tries to connect to it. When I attempt to connect to the server via the telnet client packaged with Windows, I still do not receive any messages. I made sure that the initial calls to AcceptEx are successful, and although it does return FALSE, WSAGetLastError() yields the value of WSA_IO_PENDING, which, to my understanding, is a good value.

I am not exactly certain whether this is due to a bug in the API, or me just misusing the API (likely the latter), but I cannot find any more information on the matter.

In advance, brace yourselves. This example has been hacked together in hopes of at least getting something on screen. I cut out the seemingly irrelevant parts of the code.

I'm running Windows 7. I hope I have provided enough information.
void LoadFunctions(SOCKET listen){	GUID accept_ex_guid               = WSAID_ACCEPTEX;	GUID get_accept_ex_sockaddrs_guid = WSAID_GETACCEPTEXSOCKADDRS;	int result;	DWORD bytes;	result = WSAIoctl(listen,	                  SIO_GET_EXTENSION_FUNCTION_POINTER,                      &accept_ex_guid,	                  sizeof(GUID),                       &accept_ex, sizeof(LPFN_ACCEPTEX),                       &bytes,	                  nullptr,	                  nullptr);	assert(result != SOCKET_ERROR);	result = WSAIoctl(listen,	                  SIO_GET_EXTENSION_FUNCTION_POINTER,                      &get_accept_ex_sockaddrs_guid,	                  sizeof(GUID),                       &get_accept_ex_sockaddrs, sizeof(LPFN_GETACCEPTEXSOCKADDRS),                       &bytes,	                  nullptr,	                  nullptr);	assert(result != SOCKET_ERROR);}std::deque<SOCKET> clients;int main(int argc, char** argv){	SOCKET listen_socket;	std::vector<std::thread> threads;	std::vector<HANDLE> ports;		for(size_t i = 0; i < NUM_THREADS; i++)		threads.push_back(std::thread(WorkerThread, nullptr));	Network::Initialize();	listen_socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);	assert(listen_socket != INVALID_SOCKET);	CreateListener(listen_socket);	LoadFunctions(listen_socket);	iocp_handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, NUM_THREADS);	assert(iocp_handle);	while(true)	{		BOOL result = FALSE;		assert(listen(listen_socket, 256) == 0);		while(clients.size() < 10)		{			context_info_t* info = new context_info_t;			SOCKET client_socket = INVALID_SOCKET;			int wsa_error;			memset(info, 0, sizeof(context_info_t));			info->buffer.size = ARRAYSIZE(info->buffer.buffer);			client_socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);			assert(client_socket != INVALID_SOCKET);			info->state = SOCKET_DISCONNECTED;			info->client_socket = client_socket;			info->listen_socket = listen_socket;						if((result = PostAccept(info, iocp_handle)))			{				wsa_error = WSAGetLastError();				if(wsa_error == WSA_IO_PENDING)					clients.push_back(info->client_socket);			}		}	}	for(size_t i = 0; i < NUM_THREADS; i++)		threads[i].join();	CloseHandle(iocp_handle);	::shutdown(listen_socket, SD_BOTH);	::closesocket(listen_socket);	Network::Shutdown();	return 0;}static bool PostAccept(context_info_t* info, HANDLE iocp_handle){	BOOL  result;	DWORD nb;	int   wsa_error;	result = accept_ex(info->listen_socket,				       info->client_socket,				       &info->buffer.buffer,				       info->buffer.size - ((sizeof(sockaddr_in) + 16) * 2),				       sizeof(sockaddr_in) + 16,				       sizeof(sockaddr_in) + 16, 				       &nb,				       &info->overlapped);			wsa_error = WSAGetLastError();	if((!result) && (wsa_error != WSA_IO_PENDING))	{		std::cout << "Error: " << WSAGetLastError() << std::endl;		return false;	}	return true;}static HANDLE HandleAccept(context_info_t* info, HANDLE iocp_handle){	int result;	sockaddr_in* local;	sockaddr_in* remote;	int locallen;	int remotelen;	HANDLE handle;	IoEvent* io_event = new IoEvent;	io_event->type = EVENT_ACCEPT;	io_event->info = info;		if(result == SOCKET_ERROR)	{		std::cout << "Network Failure - setsockopt returned SOCKET_ERROR: " << WSAGetLastError() << "\n";		abort();	}	handle = CreateIoCompletionPort(reinterpret_cast<HANDLE>(info->client_socket), iocp_handle, reinterpret_cast<LONG_PTR>(io_event), NUM_THREADS);	get_accept_ex_sockaddrs(info->buffer.buffer,				            info->buffer.size,				            sizeof(sockaddr_in) + 16,				            sizeof(sockaddr_in) + 16,				            reinterpret_cast<sockaddr**>(&local),				            &locallen,				            reinterpret_cast<sockaddr**>(&remote),				            &remotelen);	std::cout << "Connecting to client at " << inet_ntoa(remote->sin_addr) << std::endl;	auto& it = std::find(clients.begin(), clients.end(), info->client_socket);	if(it != clients.end())		clients.erase(it);	return handle;}static DWORD WINAPI WorkerThread(void* parameter){	DWORD       size       =  0;	ULONG_PTR   key        =  0;	OVERLAPPED* overlapped = nullptr;	IoEvent*    io_event   = nullptr;	for(;;)	{		BOOL status;		int  result;		overlapped = nullptr;		status     = GetQueuedCompletionStatus(iocp_handle, &size, &key, &overlapped, INFINITE);		io_event = reinterpret_cast<IoEvent*>(key);		//assert(overlapped != nullptr);		if(key == (DWORD)-1)			break;				if(!status)		{		}		if(io_event)		{			switch(io_event->type)			{				case EVENT_ACCEPT:					std::cout << "EVENT_ACCEPT\n";					io_event->info->state = SOCKET_ACCEPTED;					result = setsockopt(io_event->info->client_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, reinterpret_cast<char*>(&io_event->info->listen_socket), sizeof(SOCKET));					assert(result != SOCKET_ERROR);					//HandleAccept(io_event->info, iocp_handle);				break;			}		}		delete io_event;	}	return 0;}
First:

assert(listen(listen_socket, 256) == 0);
In non-debug builds, this code will not execute. You should never put code with wide effects inside an assert()!

Second:


	Network::Initialize();
	CreateListener(listen_socket);
	LoadFunctions(listen_socket);
What does Network::Initialize() do?
What does CreateListener() do?

Why do you ask the socket implementation for functions, instead of using AccetptEx() from mswsock.lib?
That is a rather poorly conceived extension that adds a fraction of one thousandth of one percept of performance to a typical application; it's only there for benchmark purposes on 20 year old systems.

static DWORD WINAPI WorkerThread(void* parameter)


Your threads will likely start and run into errors before you create the I/O completion port.
That's likely the actual problem in the above code snippets, but all of the other thing are questions, too.

auto& it = std::find(clients.begin(), clients.end(), info->client_socket);


"clients" is not synchronized between threads.

get_accept_ex_sockaddrs(info->buffer.buffer,
				            info->buffer.size,
				            sizeof(sockaddr_in) + 16,
				            sizeof(sockaddr_in) + 16,
				            reinterpret_cast<sockaddr**>(&local),
				            &locallen,
				            reinterpret_cast<sockaddr**>(&remote),
				            &remotelen);
AcceptEx() already returns the addresses into the original buffer; there is no need for this call.

Quoth MSDN on AcceptEx():

the addresses are written to the latter part of the buffer



I find the Windows I/O Completion Port and OVERLAPPED I/O system to be somewhat byzantine in design, but actually well documented and works as advertized.

If you want a simpler, still very efficient, wrapper for sockets, that works on both Windows and other platforms, I would recommend boost::asio.

https://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
https://msdn.microsoft.com/en-us/library/windows/desktop/aa363862%28v=vs.85%29.aspx
https://msdn.microsoft.com/en-us/library/windows/desktop/aa364986(v=vs.85).aspx
enum Bool { True, False, FileNotFound };
Advertisement

...Hm... it seems like the code in the OP is broken now...


In non-debug builds, this code will not execute. You should never put code with wide effects inside an assert()!

This code is more or less throwaway code; I don't really plan on using it in my main project at all. It's just a means of learning the API, then I will will probably end up deleting it later on. I assure you none of the code in my main project looks like that!

What does Network::Initialize() do?
What does CreateListener() do?


// The three functions' definitions are
Network::Initialize();
CreateListener(listen_socket);
LoadFunctions(listen_socket);

// these:
namespace Network
{
void Initialize()
{
	WSADATA wsa_data;
	int result = WSAStartup(MAKEWORD(2, 2), &wsa_data);
	assert(result == 0);
}
}

void CreateListener(SOCKET listener)
{
	sockaddr_in addr = {};
	int result;

	addr.sin_family      = AF_INET;
	addr.sin_addr.s_addr = INADDR_ANY;
	addr.sin_port        = htons(port);

	result = bind(listener, reinterpret_cast<sockaddr*>(&addr), sizeof(sockaddr_in));

	assert(result != SOCKET_ERROR);
}

// respectively. The LoadFunctions function simply just loaded AcceptEx and GetAcceptExSockaddrs.

Why do you ask the socket implementation for functions, instead of using AccetptEx() from mswsock.lib?
That is a rather poorly conceived extension that adds a fraction of one thousandth of one percept of performance to a typical application; it's only there for benchmark purposes on 20 year old systems.

I don't know. I read somewhere that instead of using mswsock.lib you should load the functions manually, because doing so is faster. I was a bit skeptical, however, I figured that whoever wrote that was more knowledgeable regarding that topic. Thanks though, I'm pretty happy I could remove that from my code.

Your threads will likely start and run into errors before you create the I/O completion port.
That's likely the actual problem in the above code snippets, but all of the other thing are questions, too.

I tried moving the thread initialization to after the completion port was created, but doing so did not seem to make any difference.

AcceptEx() already returns the addresses into the original buffer; there is no need for this call.

Again, I read somewhere that this is what I had to do. Thanks again for correcting that.

I tried moving the thread initialization to after the completion port was created, but doing so did not seem to make any difference.


That's somewhat surprising. It's a clear bug the way it's posted above.
Do you understand why I suggest this is a problem?

Also:

"clients" is not synchronized between threads.


Do you understand why this can be a problem?

My main concern is that, when the code you post is full of problems like this, it's very hard to tell whether the problem is "you are experienced in Windows system development, and the problem is that IOCP on sockets is legitimately hard to do," or whether the problem is "you're trying to learn three different new things at the same time, and thing two keeps tripping up thing three, making thing one not work."
enum Bool { True, False, FileNotFound };

Your code's quite hard to read and understand and ignoring all the issues that others have raised, I think your actual problem is here...

result = accept_ex(info->listen_socket,
info->client_socket,
&info->buffer.buffer,
info->buffer.size - ((sizeof(sockaddr_in) + 16) * 2),
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
&nb,
&info->overlapped);
You're passing a non zero side for the buffer which means you're asking for the accept not to return until data is sent on the connection by the connecting peer.
I'd suggest you take a look at this article and the working code that it contains: http://www.codeproject.com/Articles/2374/A-reusable-high-performance-socket-server-class

Thank you for all of your responses. I have been away for several days, and although I could read what you have written, I could not respond due to a lack of access to the code at home.

Here is my new code:


#include <iostream>
#include <functional>
#include <thread>
#include <string>
#include <vector>
#include <iomanip>
#include <mutex>
#include <deque>

#include "LoginServer.hpp"

static const size_t    NUM_THREADS     = std::thread::hardware_concurrency() * 2;
static const size_t    NUM_PRE_ACCEPTS = 10;
static const uint16_t  PORT            = 25347;
static const ULONG_PTR QUIT_KEY        = (ULONG_PTR)-1;

HANDLE iocp_handle;
std::deque<SOCKET> accept_sockets;
std::mutex accept_mutex;

static HANDLE PostAccept(context_info_t* info, HANDLE iocp_handle);
static DWORD WINAPI WorkerThread(void* parameter);

std::string GetLastErrorStr()
{
	DWORD error = GetLastError();
	if (error)
	{
		LPVOID lpMsgBuf;
		DWORD bufLen = FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | 
		                             FORMAT_MESSAGE_FROM_SYSTEM |
		                             FORMAT_MESSAGE_IGNORE_INSERTS,
		                             NULL,
		                             error,
		                             MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
		                             (LPSTR) &lpMsgBuf,
		                             0, NULL);
		if (bufLen)
		{
			LPCSTR lpMsgStr = (LPCSTR)lpMsgBuf;
			std::string result(lpMsgStr, lpMsgStr+bufLen);
      
			LocalFree(lpMsgBuf);

			return result;
		}
	}
	return std::string();
}

void CreateListener(SOCKET listener)
{
	sockaddr_in addr = {};
	int result;

	addr.sin_family      = AF_INET;
	addr.sin_addr.s_addr = INADDR_ANY;
	addr.sin_port        = htons(PORT);

	result = bind(listener, reinterpret_cast<sockaddr*>(&addr), sizeof(sockaddr_in));

	assert(result != SOCKET_ERROR);
}

size_t GetNumAccepts(std::deque<SOCKET>& accept_sockets)
{
	size_t result;

	accept_mutex.lock();
	result = accept_sockets.size();
	accept_mutex.unlock();
	return result;
}

int main(int argc, char** argv)
{
	SOCKET listen_socket;
	std::vector<std::thread> threads;
	std::vector<HANDLE> ports;

	Network::Initialize();

	listen_socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	assert(listen_socket != INVALID_SOCKET);
	CreateListener(listen_socket);

	iocp_handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, NUM_THREADS);
	assert(iocp_handle);

	for(size_t i = 0; i < NUM_THREADS; i++)
		threads.push_back(std::thread(WorkerThread, nullptr));

	while(true)
	{
		int result = FALSE;
		std::string command;

		result = listen(listen_socket, 256);
		assert(result == 0);

		while(GetNumAccepts(accept_sockets) < NUM_PRE_ACCEPTS)
		{
			context_info_t* info = new context_info_t;
			SOCKET          client_socket = INVALID_SOCKET;
			int             wsa_error;
			memset(info, 0, sizeof(context_info_t));

			info->buffer.size = ARRAYSIZE(info->buffer.buffer);

			client_socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

			assert(client_socket != INVALID_SOCKET);

			info->state = SOCKET_DISCONNECTED;
			info->client_socket = client_socket;
			info->listen_socket = listen_socket;
			
			PostAccept(info, iocp_handle);
		}

		// This command blocks, so the server will not issue any more
		// accept requests until something is entered... it is not
		// very important to fix right now but definitely later.
		if(std::getline(std::cin, command))
		{
			if(command == "quit")
				goto end;
		}
	}

end:
	PostQueuedCompletionStatus(iocp_handle, 0, QUIT_KEY, nullptr);

	for(size_t i = 0; i < NUM_THREADS; i++)
		threads[i].join();
	CloseHandle(iocp_handle);
	::shutdown(listen_socket, SD_BOTH);
	::closesocket(listen_socket);
	Network::Shutdown();

	return 0;
}

static HANDLE PostAccept(context_info_t* info, HANDLE iocp_handle)
{
	WSABUF buffer;
	DWORD  nb;
	int    wsa_error;
	BOOL   result;
	HANDLE handle = nullptr;
	
	buffer.buf = reinterpret_cast<char*>(info->buffer.buffer);
	buffer.len = info->buffer.size;

	result = AcceptEx(info->listen_socket,
                      info->client_socket,
                      info->buffer.buffer,
                      0,
                      sizeof(sockaddr_in) + 16,
                      sizeof(sockaddr_in) + 16, 
                      &nb,
                      &info->overlapped);

	wsa_error = WSAGetLastError();

	if((!result) && (wsa_error != WSA_IO_PENDING))
	{
		std::cout << "AcceptEx Error: " << WSAGetLastError() << std::endl;
	}

	else
	{

		IoEvent* io_event = new IoEvent;

		io_event->type = EVENT_ACCEPT;
		io_event->info = info;

		handle = CreateIoCompletionPort(reinterpret_cast<HANDLE>(info->client_socket), iocp_handle, reinterpret_cast<LONG_PTR>(io_event), NUM_THREADS);
	}

	accept_mutex.lock();
	accept_sockets.push_back(info->client_socket);
	accept_mutex.unlock();

	return handle;
}

static DWORD WINAPI WorkerThread(void* parameter)
{
	DWORD       size       =  0;
	ULONG_PTR   key        =  0;
	OVERLAPPED* overlapped = nullptr;
	IoEvent*    io_event   = nullptr;


	for(;;)
	{
		BOOL            status;
		int             result;
		int             wsa_error;
		WSABUF          buffer;
		context_info_t* info;

		overlapped = nullptr;
		status     = GetQueuedCompletionStatus(iocp_handle, &size, &key, &overlapped, INFINITE);

		if(key == QUIT_KEY)
			break;

		io_event   = reinterpret_cast<IoEvent*>(key);
		if(info)
			info = io_event->info;
		
		if(!status)
		{
		}

		if(io_event)
		{
			switch(io_event->type)
			{
				case EVENT_ACCEPT:
					std::cout << "EVENT_ACCEPT\n";
					io_event->info->state = SOCKET_ACCEPTED;

					result = setsockopt(io_event->info->client_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, reinterpret_cast<char*>(&io_event->info->listen_socket), sizeof(SOCKET));
					assert(result != SOCKET_ERROR);


					accept_mutex.lock();
					auto& it = std::find(accept_sockets.begin(), accept_sockets.end(), info->client_socket);
					if(it != accept_sockets.end())
						accept_sockets.erase(it);
					accept_mutex.unlock();
				break;
			}
		}

		delete io_event;
	}
	return 0;
}

hplus0603, I have addressed the issues you have pointed out, and cleaned the code up a bit.

Len Holgate, I read your article, however the last two non-trivial lines of the example are:


m_iocp.PostStatus((ULONG_PTR)m_listeningSocket, bytesReceived, 
                           pBuffer->GetAsOverlapped());

Am I supposed to manually use PostQueuedCompletionStatus?

This topic is closed to new replies.

Advertisement