Advertisement

What UDP networking-libraries for networking do you use serverside in 2021 ?

Started by March 02, 2021 07:49 AM
40 comments, last by taby 3 years, 7 months ago

hplus0603 said:

10,000 threads is a bad idea. In the best of worlds, you have one thread per core. One thread per connection is a fairly common idea but is generally thought of as a mistake by those who try it :-)

Do you recommend that the thread id be cached, so that packets of any given sender are always handled by the same thread?

taby said:

No worries! To be fair though, I used to be a WAN analyst. ?

@taby ok fair enough - keep going at it, then we are 2 on the journey together ?

Advertisement

Yeah, man. I just wanted to show you how small a server code can be. ?

Here is the code that uses multithreading – one thread per CPU core based on std::thread::hardware_concurrency(). It does no load balancing other than the round robin packet distribution.

#include <winsock2.h>
#include <Ws2tcpip.h>
#include <windows.h>
#pragma comment(lib, "ws2_32")

#include <iostream>
#include <string>
#include <vector>
#include <map>
#include <list>
#include <sstream>
#include <chrono>
#include <thread>
#include <mutex>
#include <atomic>
using namespace std;


SOCKET udp_socket = INVALID_SOCKET;
enum program_mode { send_mode, receive_mode };


void print_usage(void)
{
	cout << "  USAGE:" << endl;
	cout << "   Receive mode:" << endl;
	cout << "    udpspeed PORT_NUMBER" << endl;
	cout << endl;
	cout << "   Send mode:" << endl;
	cout << "    udpspeed TARGET_HOST PORT_NUMBER" << endl;
	cout << endl;
	cout << "   ie:" << endl;
	cout << "    Receive mode: udpspeed 1920" << endl;
	cout << "    Send mode:    udpspeed www 342" << endl;
	cout << "    Send mode:    udpspeed 127.0.0.1 950" << endl;
	cout << endl;
}

bool verify_port(const string& port_string, unsigned long int& port_number)
{
	for (size_t i = 0; i < port_string.length(); i++)
	{
		if (!isdigit(port_string[i]))
		{
			cout << "  Invalid port: " << port_string << endl;
			cout << "  Ports are specified by numerals only." << endl;
			return false;
		}
	}

	istringstream iss(port_string);
	iss >> port_number;

	if (port_string.length() > 5 || port_number > 65535 || port_number == 0)
	{
		cout << "  Invalid port: " << port_string << endl;
		cout << "  Port must be in the range of 1-65535" << endl;
		return false;
	}

	return true;
}

bool init_winsock(void)
{
	WSADATA wsa_data;
	WORD ver_requested = MAKEWORD(2, 2);

	if (WSAStartup(ver_requested, &wsa_data))
	{
		cout << "Could not initialize Winsock 2.2.";
		return false;
	}

	if (LOBYTE(wsa_data.wVersion) != 2 || HIBYTE(wsa_data.wVersion) != 2)
	{
		cout << "Required version of Winsock (2.2) not available.";
		return false;
	}

	return true;
}

bool init_options(const int& argc, char** argv, enum program_mode& mode, string& target_host_string, long unsigned int& port_number)
{
	if (!init_winsock())
		return false;

	string port_string = "";

	if (2 == argc)
	{
		mode = receive_mode;
		port_string = argv[1];
	}
	else if (3 == argc)
	{
		mode = send_mode;
		target_host_string = argv[1];
		port_string = argv[2];
	}
	else
	{
		print_usage();
		return false;
	}

	return verify_port(port_string, port_number);
}

void cleanup(void)
{
	// if the socket is still open, close it
	if (INVALID_SOCKET != udp_socket)
		closesocket(udp_socket);

	// shut down winsock
	WSACleanup();
}


class packet
{
public:

	vector<char> packet_buf;
	std::chrono::high_resolution_clock::time_point time_stamp;
	string ip_addr;
};


void thread_func(atomic_bool& stop, atomic_bool& thread_done, const atomic<size_t> &thread_id, vector<packet>& vc, mutex& m, vector<string>& vs)
{
	thread_done = false;

	long long unsigned int total_bytes_received = 0;

	while (!stop)
	{
		m.lock();

		if (vc.size() > 0)
		{
			for (size_t i = 0; i < vc.size(); i++)
			{
				// Do stuff with packet buffers here
				total_bytes_received += vc[i].packet_buf.size();

				ostringstream oss;
				oss << "thread id: " << thread_id << ", bytes received: " << vc[i].packet_buf.size() << ", IP address: " << vc[i].ip_addr << endl;

				vs.push_back(oss.str());
			}

			vc.clear();
		}

		m.unlock();
	}

	thread_done = true;
}


class recv_stats
{
public:

	thread t;
	atomic_bool stop = false;
	atomic_bool thread_done = false;
	mutex m;
	atomic<size_t> thread_id;

	vector<packet> packets;
	vector<string> reports;

	recv_stats(void)
	{
		t = thread(thread_func, ref(stop), ref(thread_done), ref(thread_id), ref(packets), ref(m), ref(reports));
	}

	~recv_stats(void)
	{
		stop = true;

		while (false == thread_done)
		{
			// cout << "Waiting for thread to return" << endl;
		}

		t.join();
	}
};


int main(int argc, char** argv)
{
	cout << endl << "udpspeed_4 1.0 - UDP speed tester" << endl << "Copyright 2021, Shawn Halayka" << endl << endl;

	program_mode mode = receive_mode;

	string target_host_string = "";
	long unsigned int port_number = 0;

	const long unsigned int tx_buf_size = 1450;
	vector<char> tx_buf(tx_buf_size, 0);

	const long unsigned int rx_buf_size = 8196;
	vector<char> rx_buf(rx_buf_size, 0);

	if (!init_options(argc, argv, mode, target_host_string, port_number))
	{
		cleanup();
		return 1;
	}

	if (send_mode == mode)
	{
		cout << "  Sending on port " << port_number << " - CTRL+C to exit." << endl;

		struct addrinfo hints;
		struct addrinfo* result;

		memset(&hints, 0, sizeof(struct addrinfo));
		hints.ai_family = AF_INET;
		hints.ai_socktype = SOCK_DGRAM;
		hints.ai_flags = 0;
		hints.ai_protocol = IPPROTO_UDP;

		ostringstream oss;
		oss << port_number;

		if (0 != getaddrinfo(target_host_string.c_str(), oss.str().c_str(), &hints, &result))
		{
			cout << "  getaddrinfo error." << endl;
			freeaddrinfo(result);
			cleanup();
			return 2;
		}

		if (INVALID_SOCKET == (udp_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)))
		{
			cout << "  Could not allocate a new socket." << endl;
			freeaddrinfo(result);
			cleanup();
			return 3;
		}

		while (1)
		{
			if (SOCKET_ERROR == (sendto(udp_socket, &tx_buf[0], tx_buf_size, 0, result->ai_addr, sizeof(struct sockaddr))))
			{
				cout << "  Socket sendto error." << endl;
				freeaddrinfo(result);
				cleanup();
				return 4;
			}
		}

		freeaddrinfo(result);
	}
	else if (receive_mode == mode)
	{
		cout << "  Thread count: " << std::thread::hardware_concurrency() << endl;
		cout << "  Receiving on UDP port " << port_number << " - CTRL+C to exit." << endl;

		struct sockaddr_in my_addr;
		struct sockaddr_in their_addr;
		int addr_len = 0;

		my_addr.sin_family = AF_INET;
		my_addr.sin_port = htons(static_cast<unsigned short int>(port_number));
		my_addr.sin_addr.s_addr = INADDR_ANY;
		memset(&(my_addr.sin_zero), '\0', 8);
		addr_len = sizeof(struct sockaddr);

		if (INVALID_SOCKET == (udp_socket = socket(AF_INET, SOCK_DGRAM, 0)))
		{
			cout << "  Could not allocate a new socket." << endl;
			cleanup();
			return 5;
		}

		if (SOCKET_ERROR == bind(udp_socket, reinterpret_cast<struct sockaddr*>(&my_addr), sizeof(struct sockaddr)))
		{
			cout << "  Could not bind socket to port " << port_number << "." << endl;
			cleanup();
			return 6;
		}

		vector<recv_stats> senders(std::thread::hardware_concurrency());

		for (size_t i = 0; i < senders.size(); i++)
			senders[i].thread_id = i;

		size_t index = 0;

		while (1)
		{
			timeval timeout;
			timeout.tv_sec = 0;
			timeout.tv_usec = 100000; // one hundred thousand microseconds is one-tenth of a second

			fd_set fds;
			FD_ZERO(&fds);
			FD_SET(udp_socket, &fds);

			int select_ret = select(0, &fds, 0, 0, &timeout);

			if (SOCKET_ERROR == select_ret)
			{
				cout << "  Socket select error." << endl;
				cleanup();
				return 7;
			}
			else if (0 < select_ret)
			{
				int temp_bytes_received = 0;

				if (SOCKET_ERROR == (temp_bytes_received = recvfrom(udp_socket, &rx_buf[0], rx_buf_size, 0, reinterpret_cast<struct sockaddr*>(&their_addr), &addr_len)))
				{
					cout << "  Socket recvfrom error." << endl;
					cleanup();
					return 8;
				}

				ostringstream oss;
				oss << static_cast<int>(their_addr.sin_addr.S_un.S_un_b.s_b1) << ".";
				oss << static_cast<int>(their_addr.sin_addr.S_un.S_un_b.s_b2) << ".";
				oss << static_cast<int>(their_addr.sin_addr.S_un.S_un_b.s_b3) << ".";
				oss << static_cast<int>(their_addr.sin_addr.S_un.S_un_b.s_b4);

				packet p;
				p.packet_buf = rx_buf;
				p.packet_buf.resize(temp_bytes_received);
				p.ip_addr = oss.str();
				p.time_stamp = std::chrono::high_resolution_clock::now();

				senders[index % std::thread::hardware_concurrency()].m.lock();
				senders[index % std::thread::hardware_concurrency()].packets.push_back(p);
				senders[index % std::thread::hardware_concurrency()].m.unlock();

				index++;
			}

			for (vector<recv_stats>::iterator i = senders.begin(); i != senders.end(); i++)
			{
				i->m.lock();

				for (size_t j = 0; j < i->reports.size(); j++)
					cout << i->reports[j] << endl;

				i->reports.clear();

				i->m.unlock();
			}
		}
	}

	cleanup();

	return 0;
}

joopyM said:

@hplus0603

When you have 5,000 players, each wanting to send a message to each of the 5,000 other players, each player will receive 5,000 messages, which ends up being 25,000,000 messages. There's no way around that, other than designing your game to not use “broadcast to everyone” very much. Interest management and sharded game design are the two main tools to use to avoid that.

Yes sharding is something i am looking into in the architecture here also as i dont know or cant predict what number is the max number of players on a single instance.

In reality the full broadcast 1 player to 4999 players probably wont be reality, but could be that all the player groups that are associated with other players are doing it so in theory full activity on all players - also i would have a ping functionality that would need to go to or from the client to keep the route alive & able to spot disconnected players so there will be continously traffic.

But if im able to make an engine that works with 1000 and my target is 5000 then i would be OK to split the ‘streamers' out on more instances by using some kind of central mechanism ex like Redis that knows on which server a player is located and then do inter-server communication and then from that server to player ( requires ofcourse the servers are a pool on their own network for speed for interserver comms ).

But am trying to understand this from a heavy-traffic perspective so at least i dont create a bottle neck in my code when i start coding.

One question though :

When im sending data to players - lets say this worst case scenario where i a queue of packets buildt up on all 5000 players - then i can do ( pesudo ):

#1

for (i=0;i<5000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 

which processes sending data to client1,2,3,4 up to 5000 - meaning client #5000 will always be the unlucky one getting data sent as the last in that loop as it will take time to get through that loop from client 1 to client 5000.

Alternative could be :

#2 
ThreadClient1()
{
  if (GotData( client1 ) & itsTimeToSend )
    {
        sendto (playerConnection[1],,,,)
    }
}

And then have 5000 threads ( one sending thread per client ) / one thread per player that will processes autonomously.

My worry here is the overload of threads and at some point i assume not so scaleable - or maybe this isnt a problem on linux ? ( i grew up learning to dont overuse threads and keep them to a minimum based it on processor if possible ).

Third alternative could be to pass on the sends to a pool of worker threads, abit more dynamic than the example here but to keep pseudo simple we could do 5 workerthreads :

#3:

WorkerThread0001_1000()
{
for (i=0;i<1000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 
}
WorkerThread1001_2000()
{
for (i=1001;i<2000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 
}
WorkerThread2001_3000()
{
for (i=2001;i<3000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 
}
WorkerThread3001_4000()
{
for (i=3001;i<4000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 
}
WorkerThread4001_5000()
{
for (i=4001;i<5000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 
}

Then we split up the handling of the send in lots of 1000.

Question here is :

a)

would no #1 be faster than no #3 ? is there some sort of bottleneck anyway in the networkstack when sending UDP data to clients - or would i be dividing the time spent to handle all players there in 5 ?

b)

back in the days threads were to be used with caution so im nervous about doing a model like in #2 also dont feel its scaleable BUT if this is perfectly ok ( on linux ) and threads are lightweight then ofcourse it creates a nice and easy code but simply not sure if its the right way to do and i have the freedom to just spawn threads like it was candy ?

Thanks alot you & others here are really helping me alot here to understand what mechanisms and both opening some doors i havent thought of and closing some that i have - hope you guys dont get tired of me as i will continue asking and will also try to a diagram soon of the idea i have for a sharded setup but just need to get the concepts right here for the actual transport-part.

@hplus0603 @swiftcoder do you guys got any opinions on the above ?

taby said:

Yeah, man. I just wanted to show you how small a server code can be. ?

Yes but i never asked how small a server code can be - im asking how its done right in production ?

Advertisement

taby said:
Do you recommend that the thread id be cached, so that packets of any given sender are always handled by the same thread?

The thing you typically want to care about here is which thread owns which data. If you process incoming messages on random threads, then they are going to have to acquire and release locks to any game data they need to modify (which can be a major performance drag). If you hash incoming messages consistently to the same threads, then those threads can have much more predictable access patterns to the underlying data, allowing you to minimise locking.

Tristam MacDonald. Ex-BigTech Software Engineer. Future farmer. [https://trist.am]

joopyM said:

taby said:

Yeah, man. I just wanted to show you how small a server code can be. ?

Yes but i never asked how small a server code can be - im asking how its done right in production ?

This is why organizations need development and test environments to complement the production environment, so that they'll know whether or not any particular code works under heavy loads. Small code is reliable code. If you haven't yet found something on google or github, then you're probably going to have to do it yourself. That said, if you want something done right, you do it yourself.

swiftcoder said:

taby said:
Do you recommend that the thread id be cached, so that packets of any given sender are always handled by the same thread?

The thing you typically want to care about here is which thread owns which data. If you process incoming messages on random threads, then they are going to have to acquire and release locks to any game data they need to modify (which can be a major performance drag). If you hash incoming messages consistently to the same threads, then those threads can have much more predictable access patterns to the underlying data, allowing you to minimise locking.

Cool. Will try that

On that note, if you're on C++, the “strand” concept from boost::asio can be helpful.

It uses the boost service executor (reactor) to queue work that needs to be done for some particular object, using an object (strand) specific queue. That way, the first thread that gets to an object, acquires the strand, and does work, and when more work comes in for that same object, it's chained to the strand, and gets executed when that initial thread is done with the first work.

That way, you can have as many threads as you have cores, and let the strands take care of distributing the work to ready threads.

enum Bool { True, False, FileNotFound };

This topic is closed to new replies.

Advertisement