What UDP networking-libraries for networking do you use serverside in 2021 ?

Started by
40 comments, last by taby 3 years, 2 months ago

joopyM said:
what is serverside-mixing ?

You mix (combine) all the audio streams the player should be able to hear, on the server side, and then you just send the single mixed stream down to the client. Potentially means more processing work on the server, but uses a lot less bandwidth.

Tristam MacDonald. Ex-BigTech Software Engineer. Future farmer. [https://trist.am]

Advertisement

Say I want to code my own UDP server code. What's the best way to do that multithreaded? Perform the recvfrom in the main thread, then pass the data along to the threads as need be?

taby said:

Say I want to code my own UDP server code. What's the best way to do that multithreaded? Perform the recvfrom in the main thread, then pass the data along to the threads as need be?

Depends on the platform - on linux the “best” (for some definition thereof) approach is to open a new socket for each thread, using SO_REUSEPORT to all share the same connection. The kernel will then perform sticky load balancing across threads for you.

Tristam MacDonald. Ex-BigTech Software Engineer. Future farmer. [https://trist.am]

Excellent. Thank you, sir.

Here is the multithreaded version of the UDP receiver. One thread to use the socket, and one thread per incoming IP address.

#include <winsock2.h>
#include <Ws2tcpip.h>
#include <windows.h>
#pragma comment(lib, "ws2_32")

#include <iostream>
#include <string>
#include <vector>
#include <map>
#include <sstream>
#include <chrono>
#include <thread>
#include <mutex>
#include <atomic>
using namespace std;


SOCKET udp_socket = INVALID_SOCKET;
enum program_mode { send_mode, receive_mode };


void print_usage(void)
{
	cout << "  USAGE:" << endl;
	cout << "   Receive mode:" << endl;
	cout << "    udpspeed PORT_NUMBER" << endl;
	cout << endl;
	cout << "   Send mode:" << endl;
	cout << "    udpspeed TARGET_HOST PORT_NUMBER" << endl;
	cout << endl;
	cout << "   ie:" << endl;
	cout << "    Receive mode: udpspeed 1920" << endl;
	cout << "    Send mode:    udpspeed www 342" << endl;
	cout << "    Send mode:    udpspeed 127.0.0.1 950" << endl;
	cout << endl;
}

bool verify_port(const string& port_string, unsigned long int& port_number)
{
	for (size_t i = 0; i < port_string.length(); i++)
	{
		if (!isdigit(port_string[i]))
		{
			cout << "  Invalid port: " << port_string << endl;
			cout << "  Ports are specified by numerals only." << endl;
			return false;
		}
	}

	istringstream iss(port_string);
	iss >> port_number;

	if (port_string.length() > 5 || port_number > 65535 || port_number == 0)
	{
		cout << "  Invalid port: " << port_string << endl;
		cout << "  Port must be in the range of 1-65535" << endl;
		return false;
	}

	return true;
}

bool init_winsock(void)
{
	WSADATA wsa_data;
	WORD ver_requested = MAKEWORD(2, 2);

	if (WSAStartup(ver_requested, &wsa_data))
	{
		cout << "Could not initialize Winsock 2.2.";
		return false;
	}

	if (LOBYTE(wsa_data.wVersion) != 2 || HIBYTE(wsa_data.wVersion) != 2)
	{
		cout << "Required version of Winsock (2.2) not available.";
		return false;
	}

	return true;
}

bool init_options(const int& argc, char** argv, enum program_mode& mode, string& target_host_string, long unsigned int& port_number)
{
	if (!init_winsock())
		return false;

	string port_string = "";

	if (2 == argc)
	{
		mode = receive_mode;
		port_string = argv[1];
	}
	else if (3 == argc)
	{
		mode = send_mode;
		target_host_string = argv[1];
		port_string = argv[2];
	}
	else
	{
		print_usage();
		return false;
	}

	cout.setf(ios::fixed, ios::floatfield);
	cout.precision(2);

	return verify_port(port_string, port_number);
}

void cleanup(void)
{
	// if the socket is still open, close it
	if (INVALID_SOCKET != udp_socket)
		closesocket(udp_socket);

	// shut down winsock
	WSACleanup();
}


class packet
{
public:

	vector<char> packet_buf;
	std::chrono::high_resolution_clock::time_point time_stamp;
};


class stats
{
public:

	long long unsigned int total_elapsed_ticks = 0;
	long long unsigned int total_bytes_received = 0;
	long long unsigned int last_reported_at_ticks = 0;
	long long unsigned int last_reported_total_bytes_received = 0;

	double bytes_per_second = 0;

	double record_bps = 0;
};


void thread_func(atomic_bool& stop, atomic_bool& thread_done, vector<packet>& vc, mutex& m, stats &s, string &ip_addr)
{
	thread_done = false;

	while (!stop)
	{
		m.lock();

		if (vc.size() > 0)
		{
//			cout << ip_addr << endl;

			for (size_t i = 0; i < vc.size(); i++)
			{
				// Do stuff with packet here
				s.total_bytes_received += vc[i].packet_buf.size();
			}

			vc.clear();
		}

		m.unlock();
	}

	thread_done = true;
}


class recv_stats
{
public:

	stats s;
	thread t;
	atomic_bool stop = false;
	atomic_bool thread_done = false;
	mutex m;
	string ip_addr;

	vector<packet> packets;

	recv_stats(void)
	{
		t = thread(thread_func, ref(stop), ref(thread_done), ref(packets), ref(m), ref(s), ref(ip_addr));
	}

	~recv_stats(void)
	{
		stop = true;

		while (false == thread_done)
		{
			// cout << "Waiting for thread to return" << endl;
		}

		t.join();
	}
};


int main(int argc, char** argv)
{
	cout << endl << "udpspeed_2 1.0 - UDP speed tester" << endl << "Copyright 2021, Shawn Halayka" << endl << endl;

	program_mode mode = receive_mode;

	string target_host_string = "";
	long unsigned int port_number = 0;

	const long unsigned int tx_buf_size = 1450;
	vector<char> tx_buf(tx_buf_size, 0);

	const long unsigned int rx_buf_size = 8196;
	vector<char> rx_buf(rx_buf_size, 0);

	if (!init_options(argc, argv, mode, target_host_string, port_number))
	{
		cleanup();
		return 1;
	}

	if (send_mode == mode)
	{
		cout << "  Sending on port " << port_number << " - CTRL+C to exit." << endl;

		struct addrinfo hints;
		struct addrinfo* result;

		memset(&hints, 0, sizeof(struct addrinfo));
		hints.ai_family = AF_INET;
		hints.ai_socktype = SOCK_DGRAM;
		hints.ai_flags = 0;
		hints.ai_protocol = IPPROTO_UDP;

		ostringstream oss;
		oss << port_number;

		if (0 != getaddrinfo(target_host_string.c_str(), oss.str().c_str(), &hints, &result))
		{
			cout << "  getaddrinfo error." << endl;
			freeaddrinfo(result);
			cleanup();
			return 2;
		}

		if (INVALID_SOCKET == (udp_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)))
		{
			cout << "  Could not allocate a new socket." << endl;
			freeaddrinfo(result);
			cleanup();
			return 3;
		}

		while (1)
		{
			if (SOCKET_ERROR == (sendto(udp_socket, &tx_buf[0], tx_buf_size, 0, result->ai_addr, sizeof(struct sockaddr))))
			{
				cout << "  Socket sendto error." << endl;
				freeaddrinfo(result);
				cleanup();
				return 4;
			}
		}

		freeaddrinfo(result);
	}
	else if (receive_mode == mode)
	{
		cout << "  Receiving on UDP port " << port_number << " - CTRL+C to exit." << endl;

		struct sockaddr_in my_addr;
		struct sockaddr_in their_addr;
		int addr_len = 0;

		my_addr.sin_family = AF_INET;
		my_addr.sin_port = htons(static_cast<unsigned short int>(port_number));
		my_addr.sin_addr.s_addr = INADDR_ANY;
		memset(&(my_addr.sin_zero), '\0', 8);
		addr_len = sizeof(struct sockaddr);

		if (INVALID_SOCKET == (udp_socket = socket(AF_INET, SOCK_DGRAM, 0)))
		{
			cout << "  Could not allocate a new socket." << endl;
			cleanup();
			return 5;
		}

		if (SOCKET_ERROR == bind(udp_socket, reinterpret_cast<struct sockaddr*>(&my_addr), sizeof(struct sockaddr)))
		{
			cout << "  Could not bind socket to port " << port_number << "." << endl;
			cleanup();
			return 6;
		}

		map<string, recv_stats> senders;

		while (1)
		{
			std::chrono::high_resolution_clock::time_point start_loop_time = std::chrono::high_resolution_clock::now();

			timeval timeout;
			timeout.tv_sec = 0;
			timeout.tv_usec = 100000; // one hundred thousand microseconds is one-tenth of a second

			fd_set fds;
			FD_ZERO(&fds);
			FD_SET(udp_socket, &fds);

			int select_ret = select(0, &fds, 0, 0, &timeout);

			if (SOCKET_ERROR == select_ret)
			{
				cout << "  Socket select error." << endl;
				cleanup();
				return 7;
			}
			else if(0 < select_ret)
			{
				int temp_bytes_received = 0;

				if (SOCKET_ERROR == (temp_bytes_received = recvfrom(udp_socket, &rx_buf[0], rx_buf_size, 0, reinterpret_cast<struct sockaddr*>(&their_addr), &addr_len)))
				{
					cout << "  Socket recvfrom error." << endl;
					cleanup();
					return 8;
				}

				ostringstream oss;
				oss << static_cast<int>(their_addr.sin_addr.S_un.S_un_b.s_b1) << ".";
				oss << static_cast<int>(their_addr.sin_addr.S_un.S_un_b.s_b2) << ".";
				oss << static_cast<int>(their_addr.sin_addr.S_un.S_un_b.s_b3) << ".";
				oss << static_cast<int>(their_addr.sin_addr.S_un.S_un_b.s_b4);

				packet p;
				p.packet_buf = rx_buf;
				p.packet_buf.resize(temp_bytes_received);
				p.time_stamp = start_loop_time;

				senders[oss.str()].m.lock();
				senders[oss.str()].ip_addr = oss.str();
				senders[oss.str()].packets.push_back(p);
				senders[oss.str()].m.unlock();
			}
		}
	}

	cleanup();

	return 0;
}

on linux the “best” (for some definition thereof) approach is to open a new socket for each thread, using SO_REUSEPORT

I agree!

That being said, the “best” approach matters if you try to do things like saturate a 10 Gbps or bigger network pipe. Amazon Nitro instances will now give you 40 Gbps bandwidth, even – and to keep up with that, that kind of approach is necessary.

For smaller deployments, using a single socket, and having a bunch of threads piling on with recvfrom() all at the same time, is likely going to work fine! A third option is to have a single thread, which just calls recvfrom() and sticks the received packets into a FIFO (ideally non-blocking,) and then other threads pick up the messages from there to handle them. This option is likely to scale well enough for almost anything you can realistically do as an indie game developer, and may be easier to write and test – it all depends on what specifically you're doing!

enum Bool { True, False, FileNotFound };

hplus0603 said:

on linux the “best” (for some definition thereof) approach is to open a new socket for each thread, using SO_REUSEPORT

I agree!

That being said, the “best” approach matters if you try to do things like saturate a 10 Gbps or bigger network pipe. Amazon Nitro instances will now give you 40 Gbps bandwidth, even – and to keep up with that, that kind of approach is necessary.

For smaller deployments, using a single socket, and having a bunch of threads piling on with recvfrom() all at the same time, is likely going to work fine! A third option is to have a single thread, which just calls recvfrom() and sticks the received packets into a FIFO (ideally non-blocking,) and then other threads pick up the messages from there to handle them. This option is likely to scale well enough for almost anything you can realistically do as an indie game developer, and may be easier to write and test – it all depends on what specifically you're doing!

am actually starting around approach 3 - with one listener thread ( maybe adding use SO_REUSEPORT if it is neccesary ) that then grabs packets and passes them into what is supposed to be pool of threads to process.

But one consideration i have here is :

1 listener thread for lets say 1-5000 users that is ‘connected’, then i pass on the packets just to do nothing more than dequeue it as fast as possible.

But the workerthread/s - need to send packets to alll the users ‘connected’ - what is the best approach for the send part when you need to send to lets say worst case all users connected ? i assume if i do a for loop from user 1 to user 5000 that will be slow for user no 5000 - what ideas can be done here ? launching a thread for each send sounds scary - having a thread for each user ‘connected’ actually also sounds abit scary but would nice and clean coding wise if that is possible but assume its not good practice ?

[quote]what is the best approach for the send part when you need to send to lets say worst case all users connected ?[/quote]

Typically, a server will have a map of all sessions that are currently going (generally equivalent to all players that are currently connected.)

For each player, there will be a queue of messages that are waiting to be sent. Whenever the server needs to “send a thing to a player,” it doesn't immediately form a packet and send it, but instead puts that message into the queue for the destination player/s.

On a timer, anything between 10 and 60 times a second (depending on game type,) the server will bundle up all outgoing messages for a player, and send them in a single packet to the player. This way, the packet overhead (sequence numbers, timing, encryption, session identifiers, etc) is amortized across all messages to the player. Also, of later messages “overwrite” earlier messages (such as “this is your current hitpoints” or whatever) then those messages can collapse into only the latest instance of the message, saving some space.

There's no getting around it. With 10,000 players connected, each sending inputs 30 times a second, and each receiving updates 30 times a second, you will be receiving 300,000 packets per second, and you will be sending 300,000 packets per second. This is moderately-high load, but should be totally reasonable on modern server hardware.

Another question is that of packet size – the maximum size of a UDP datagram is just under 64KB. If you collect more than 64 KB of data to send in one network timer interval, you have the option of sending more packets ("force flush" the queue ahead of time) or spreading they payload across multiple packets sent at the same time using some fragmentation scheme (RakNet does this) or dropping packets to limit the maximum packet and bandwidth rate. Which you choose, depends on your specific game.

When you have 5,000 players, each wanting to send a message to each of the 5,000 other players, each player will receive 5,000 messages, which ends up being 25,000,000 messages. There's no way around that, other than designing your game to not use “broadcast to everyone” very much. Interest management and sharded game design are the two main tools to use to avoid that.

enum Bool { True, False, FileNotFound };

Having a map of the threads using the IP address as the key is the way I went. Threads are created on the fly, not from a pool. Hmm.

The final UDP server code is at: https://github.com/sjhalayka/udpspeed3_multithreaded

@hplus0603

When you have 5,000 players, each wanting to send a message to each of the 5,000 other players, each player will receive 5,000 messages, which ends up being 25,000,000 messages. There's no way around that, other than designing your game to not use “broadcast to everyone” very much. Interest management and sharded game design are the two main tools to use to avoid that.

Yes sharding is something i am looking into in the architecture here also as i dont know or cant predict what number is the max number of players on a single instance.

In reality the full broadcast 1 player to 4999 players probably wont be reality, but could be that all the player groups that are associated with other players are doing it so in theory full activity on all players - also i would have a ping functionality that would need to go to or from the client to keep the route alive & able to spot disconnected players so there will be continously traffic.

But if im able to make an engine that works with 1000 and my target is 5000 then i would be OK to split the ‘streamers' out on more instances by using some kind of central mechanism ex like Redis that knows on which server a player is located and then do inter-server communication and then from that server to player ( requires ofcourse the servers are a pool on their own network for speed for interserver comms ).

But am trying to understand this from a heavy-traffic perspective so at least i dont create a bottle neck in my code when i start coding.

One question though :

When im sending data to players - lets say this worst case scenario where i a queue of packets buildt up on all 5000 players - then i can do ( pesudo ):

#1

for (i=0;i<5000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 

which processes sending data to client1,2,3,4 up to 5000 - meaning client #5000 will always be the unlucky one getting data sent as the last in that loop as it will take time to get through that loop from client 1 to client 5000.

Alternative could be :

#2 
ThreadClient1()
{
  if (GotData( client1 ) & itsTimeToSend )
    {
        sendto (playerConnection[1],,,,)
    }
}

And then have 5000 threads ( one sending thread per client ) / one thread per player that will processes autonomously.

My worry here is the overload of threads and at some point i assume not so scaleable - or maybe this isnt a problem on linux ? ( i grew up learning to dont overuse threads and keep them to a minimum based it on processor if possible ).

Third alternative could be to pass on the sends to a pool of worker threads, abit more dynamic than the example here but to keep pseudo simple we could do 5 workerthreads :

#3:

WorkerThread0001_1000()
{
for (i=0;i<1000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 
}
WorkerThread1001_2000()
{
for (i=1001;i<2000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 
}
WorkerThread2001_3000()
{
for (i=2001;i<3000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 
}
WorkerThread3001_4000()
{
for (i=3001;i<4000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 
}
WorkerThread4001_5000()
{
for (i=4001;i<5000;i++)
  if ( playerConnection.packetstosend > 0)
      send (playerConnection[i],,,,) 
}

Then we split up the handling of the send in lots of 1000.

Question here is :

a)

would no #1 be faster than no #3 ? is there some sort of bottleneck anyway in the networkstack when sending UDP data to clients - or would i be dividing the time spent to handle all players there in 5 ?

b)

back in the days threads were to be used with caution so im nervous about doing a model like in #2 also dont feel its scaleable BUT if this is perfectly ok ( on linux ) and threads are lightweight then ofcourse it creates a nice and easy code but simply not sure if its the right way to do and i have the freedom to just spawn threads like it was candy ?

Thanks alot you & others here are really helping me alot here to understand what mechanisms and both opening some doors i havent thought of and closing some that i have - hope you guys dont get tired of me as i will continue asking and will also try to a diagram soon of the idea i have for a sharded setup but just need to get the concepts right here for the actual transport-part.

This topic is closed to new replies.

Advertisement