1
0
Protocol/old/common/EQStreamFactory.cpp
2025-09-02 20:25:42 -05:00

547 lines
12 KiB
C++

// EQ2Emulator: Everquest II Server Emulator
// Copyright (C) 2007 EQ2EMulator Development Team
// Licensed under GPL v3 - see <http://www.gnu.org/licenses/>
#include "EQStreamFactory.h"
#include "Log.h"
// Unix/Linux networking headers
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <pthread.h>
#include <fcntl.h>
#include <unistd.h>
// Standard library headers
#include <fstream>
#include <iostream>
#include <cstring>
#include <deque>
#include <vector>
#include <algorithm>
// Project headers
#include "op_codes.h"
#include "EQStream.h"
#include "packet_dump.h"
#ifdef WORLD
#include "../world/client.h"
#endif
using namespace std;
#ifdef WORLD
extern ClientList client_list;
#endif
/**
* Thread entry point for the packet reader loop.
*
* @param eqfs - Pointer to EQStreamFactory instance
* @return Thread return value
*/
ThreadReturnType EQStreamFactoryReaderLoop(void* eqfs)
{
if (eqfs) {
auto fs = static_cast<EQStreamFactory*>(eqfs);
fs->ReaderLoop();
}
THREAD_RETURN(nullptr);
}
/**
* Thread entry point for the packet writer loop.
*
* @param eqfs - Pointer to EQStreamFactory instance
* @return Thread return value
*/
ThreadReturnType EQStreamFactoryWriterLoop(void* eqfs)
{
if (eqfs) {
auto fs = static_cast<EQStreamFactory*>(eqfs);
fs->WriterLoop();
}
THREAD_RETURN(nullptr);
}
/**
* Thread entry point for the packet combining loop.
*
* @param eqfs - Pointer to EQStreamFactory instance
* @return Thread return value
*/
ThreadReturnType EQStreamFactoryCombinePacketLoop(void* eqfs)
{
if (eqfs) {
auto fs = static_cast<EQStreamFactory*>(eqfs);
fs->CombinePacketLoop();
}
THREAD_RETURN(nullptr);
}
/**
* EQStreamFactory constructor with stream type and port.
*
* @param type - Type of streams to create (login/world)
* @param port - Port number to listen on
*/
EQStreamFactory::EQStreamFactory(EQStreamType type, int port)
{
StreamType = type;
Port = port;
listen_ip_address = nullptr;
sock = -1;
ReaderRunning = false;
WriterRunning = false;
CombinePacketRunning = false;
DecayTimer = nullptr;
}
/**
* Close the stream factory and clean up all resources.
* Stops all threads, closes socket, and removes all streams.
*/
void EQStreamFactory::Close()
{
CheckTimeout(true);
Stop();
if (sock != -1) {
close(sock);
sock = -1;
}
}
/**
* Open the UDP socket and start worker threads.
* Creates reader, writer, and packet combiner threads.
*
* @return true if successful, false on error
*/
bool EQStreamFactory::Open()
{
struct sockaddr_in address;
pthread_t t1, t2, t3;
// Setup socket address structure
memset(reinterpret_cast<char*>(&address), 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(Port);
// Set bind address based on configuration
#if defined(LOGIN) || defined(MINILOGIN)
if (listen_ip_address) {
address.sin_addr.s_addr = inet_addr(listen_ip_address);
} else {
address.sin_addr.s_addr = htonl(INADDR_ANY);
}
#else
address.sin_addr.s_addr = htonl(INADDR_ANY);
#endif
// Create UDP socket
sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
return false;
}
// Bind socket to address
if (::bind(sock, reinterpret_cast<struct sockaddr*>(&address), sizeof(address)) < 0) {
close(sock);
sock = -1;
return false;
}
// Set socket to non-blocking mode
fcntl(sock, F_SETFL, O_NONBLOCK);
// Log thread startup
#ifdef LOGIN
LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader");
LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer");
#elif WORLD
LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader");
LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer");
#endif
// Create and detach worker threads
pthread_create(&t1, nullptr, EQStreamFactoryReaderLoop, this);
pthread_create(&t2, nullptr, EQStreamFactoryWriterLoop, this);
pthread_create(&t3, nullptr, EQStreamFactoryCombinePacketLoop, this);
pthread_detach(t1);
pthread_detach(t2);
pthread_detach(t3);
return true;
}
/**
* Get the next new stream from the queue.
* Thread-safe method to retrieve newly created streams.
*
* @return Next EQStream from queue, or nullptr if none available
*/
EQStream* EQStreamFactory::Pop()
{
if (!NewStreams.size()) {
return nullptr;
}
EQStream* s = nullptr;
MNewStreams.lock();
if (NewStreams.size()) {
s = NewStreams.front();
NewStreams.pop();
s->PutInUse();
}
MNewStreams.unlock();
return s;
}
/**
* Add a new stream to the queue for processing.
* Thread-safe method to queue newly created streams.
*
* @param s - EQStream to add to queue
*/
void EQStreamFactory::Push(EQStream* s)
{
MNewStreams.lock();
NewStreams.push(s);
MNewStreams.unlock();
}
/**
* Main packet reading loop - runs in separate thread.
* Receives UDP packets and routes them to appropriate streams.
*/
void EQStreamFactory::ReaderLoop()
{
fd_set readset;
map<string, EQStream*>::iterator stream_itr;
int num;
int length;
unsigned char buffer[2048];
sockaddr_in from;
socklen_t socklen = sizeof(sockaddr_in);
timeval sleep_time;
ReaderRunning = true;
while (sock != -1) {
MReaderRunning.lock();
if (!ReaderRunning) {
MReaderRunning.unlock();
break;
}
MReaderRunning.unlock();
// Setup select() for socket monitoring
FD_ZERO(&readset);
FD_SET(sock, &readset);
sleep_time.tv_sec = 30;
sleep_time.tv_usec = 0;
// Wait for incoming data or timeout
num = select(sock + 1, &readset, nullptr, nullptr, &sleep_time);
if (num < 0) {
// Select error - could log this
continue;
} else if (num == 0) {
// Timeout - continue loop
continue;
}
// Check if our socket has data
if (FD_ISSET(sock, &readset)) {
length = recvfrom(sock, buffer, 2048, 0,
reinterpret_cast<struct sockaddr*>(&from),
&socklen);
if (length < 2) {
// Packet too small - ignore
continue;
}
// Create address:port string for stream identification
char temp[25];
snprintf(temp, sizeof(temp), "%u.%d",
ntohl(from.sin_addr.s_addr), ntohs(from.sin_port));
MStreams.lock();
stream_itr = Streams.find(temp);
// Handle new connections or session requests
if (stream_itr == Streams.end() || buffer[1] == OP_SessionRequest) {
MStreams.unlock();
if (buffer[1] == OP_SessionRequest) {
// Close existing stream if present
if (stream_itr != Streams.end() && stream_itr->second) {
stream_itr->second->SetState(CLOSED);
}
// Create new stream
auto s = new EQStream(from);
s->SetFactory(this);
s->SetStreamType(StreamType);
Streams[temp] = s;
WriterWork.Signal();
Push(s);
s->Process(buffer, length);
s->SetLastPacketTime(Timer::GetCurrentTime2());
}
} else {
// Route packet to existing stream
EQStream* curstream = stream_itr->second;
// Skip closed connections
if (curstream->CheckClosed()) {
curstream = nullptr;
} else {
curstream->PutInUse();
}
MStreams.unlock();
if (curstream) {
curstream->Process(buffer, length);
curstream->SetLastPacketTime(Timer::GetCurrentTime2());
curstream->ReleaseFromUse();
}
}
}
}
}
/**
* Check for timed out streams and clean up closed connections.
*
* @param remove_all - If true, remove all streams regardless of state
*/
void EQStreamFactory::CheckTimeout(bool remove_all)
{
// Lock streams for the entire timeout check - should be fast
MStreams.lock();
unsigned long now = Timer::GetCurrentTime2();
map<string, EQStream*>::iterator stream_itr;
for (stream_itr = Streams.begin(); stream_itr != Streams.end();) {
EQStream* s = stream_itr->second;
EQStreamState state = s->GetState();
// Transition CLOSING streams to CLOSED when no outgoing data
if (state == CLOSING && !s->HasOutgoingData()) {
stream_itr->second->SetState(CLOSED);
state = CLOSED;
} else if (s->CheckTimeout(now, STREAM_TIMEOUT)) {
// Handle timeout based on current state
const char* stateString;
switch (state) {
case ESTABLISHED:
stateString = "Established";
break;
case CLOSING:
stateString = "Closing";
break;
case CLOSED:
stateString = "Closed";
break;
case WAIT_CLOSE:
stateString = "Wait-Close";
break;
default:
stateString = "Unknown";
break;
}
LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s (%u)",
stateString, state);
if (state == ESTABLISHED) {
s->Close();
} else if (state == WAIT_CLOSE) {
s->SetState(CLOSING);
state = CLOSING;
} else if (state == CLOSING) {
// If we timeout in closing state, force close
s->SetState(CLOSED);
state = CLOSED;
}
}
// Remove closed streams (check immediately after state changes)
if (remove_all || state == CLOSED) {
if (!remove_all && s->getTimeoutDelays() < 2) {
s->addTimeoutDelay();
// Give other threads time to finish with this stream
++stream_itr;
} else {
// Safe to delete now
#ifdef LOGIN
LogWrite(LOGIN__DEBUG, 0, "Login", "Removing connection...");
#else
LogWrite(WORLD__DEBUG, 0, "World", "Removing connection...");
#endif
auto temp = stream_itr;
++stream_itr;
// Let client list handle cleanup if in world server
#ifdef WORLD
client_list.RemoveConnection(temp->second);
#endif
EQStream* stream = temp->second;
Streams.erase(temp);
delete stream;
continue;
}
} else {
++stream_itr;
}
}
MStreams.unlock();
}
/**
* Packet combining optimization loop - runs in separate thread.
* Combines multiple small packets for efficient transmission.
*/
void EQStreamFactory::CombinePacketLoop()
{
deque<EQStream*> combine_que;
CombinePacketRunning = true;
bool packets_waiting = false;
while (sock != -1) {
if (!CombinePacketRunning) {
break;
}
MStreams.lock();
// Check all streams for combine timer expiration
for (auto& stream_pair : Streams) {
if (!stream_pair.second) {
continue;
}
if (stream_pair.second->combine_timer &&
stream_pair.second->combine_timer->Check()) {
combine_que.push_back(stream_pair.second);
}
}
// Process streams that need packet combining
packets_waiting = false;
while (!combine_que.empty()) {
EQStream* stream = combine_que.front();
if (stream->CheckActive()) {
if (!stream->CheckCombineQueue()) {
packets_waiting = true;
}
}
combine_que.pop_front();
}
MStreams.unlock();
// Sleep longer if no packets are waiting
if (!packets_waiting) {
usleep(25000); // 25ms
}
usleep(1000); // 1ms
}
}
/**
* Main packet writing loop - runs in separate thread.
* Handles outgoing packet transmission and resends.
*/
void EQStreamFactory::WriterLoop()
{
map<string, EQStream*>::iterator stream_itr;
vector<EQStream*> wants_write;
vector<EQStream*>::iterator cur, end;
deque<EQStream*> resend_que;
bool decay = false;
uint32 stream_count;
Timer DecayTimer(20);
WriterRunning = true;
DecayTimer.Enable();
while (sock != -1) {
Timer::SetCurrentTime();
MWriterRunning.lock();
if (!WriterRunning) {
MWriterRunning.unlock();
break;
}
MWriterRunning.unlock();
wants_write.clear();
resend_que.clear();
decay = DecayTimer.Check();
// Copy streams into separate list to minimize lock time
MStreams.lock();
for (stream_itr = Streams.begin(); stream_itr != Streams.end(); ++stream_itr) {
if (!stream_itr->second) {
// This shouldn't happen, but handle gracefully
continue;
}
// Apply bandwidth decay if it's time
if (decay) {
stream_itr->second->Decay();
}
// Queue streams with outgoing data
if (stream_itr->second->HasOutgoingData()) {
stream_itr->second->PutInUse();
wants_write.push_back(stream_itr->second);
}
// Queue streams that need resend processing
if (stream_itr->second->resend_que_timer->Check()) {
resend_que.push_back(stream_itr->second);
}
}
MStreams.unlock();
// Perform actual packet writes
for (cur = wants_write.begin(), end = wants_write.end();
cur != end; ++cur) {
(*cur)->Write(sock);
(*cur)->ReleaseFromUse();
}
// Handle packet resends
while (!resend_que.empty()) {
resend_que.front()->CheckResend(sock);
resend_que.pop_front();
}
usleep(10000); // 10ms sleep
// Check if we have any streams - wait if not
MStreams.lock();
stream_count = Streams.size();
MStreams.unlock();
if (!stream_count) {
WriterWork.Wait();
}
}
}