clean EQStreamFactory

This commit is contained in:
Sky Johnson 2025-08-31 22:18:35 -05:00
parent a8c533f012
commit 7e9f7eb061
2 changed files with 442 additions and 301 deletions

View File

@ -1,279 +1,346 @@
/*
EQ2Emulator: Everquest II Server Emulator
Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net)
This file is part of EQ2Emulator.
EQ2Emulator is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
EQ2Emulator is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with EQ2Emulator. If not, see <http://www.gnu.org/licenses/>.
*/
// EQ2Emulator: Everquest II Server Emulator
// Copyright (C) 2007 EQ2EMulator Development Team
// Licensed under GPL v3 - see <http://www.gnu.org/licenses/>
#include "EQStreamFactory.h"
#include "Log.h"
#ifdef WIN32
#include <WinSock2.h>
#include <windows.h>
#include <process.h>
#include <io.h>
#include <stdio.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <pthread.h>
#endif
// Unix/Linux networking headers
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <pthread.h>
#include <fcntl.h>
#include <unistd.h>
// Standard library headers
#include <fstream>
#include <iostream>
#include <cstring>
#include <deque>
#include <vector>
#include <algorithm>
// Project headers
#include "op_codes.h"
#include "EQStream.h"
#include "packet_dump.h"
#ifdef WORLD
#include "../WorldServer/client.h"
#endif
using namespace std;
#ifdef WORLD
extern ClientList client_list;
extern ClientList client_list;
#endif
ThreadReturnType EQStreamFactoryReaderLoop(void *eqfs)
/**
* Thread entry point for the packet reader loop.
*
* @param eqfs - Pointer to EQStreamFactory instance
* @return Thread return value
*/
ThreadReturnType EQStreamFactoryReaderLoop(void* eqfs)
{
if(eqfs){
EQStreamFactory *fs=(EQStreamFactory *)eqfs;
if (eqfs) {
auto fs = static_cast<EQStreamFactory*>(eqfs);
fs->ReaderLoop();
}
THREAD_RETURN(NULL);
THREAD_RETURN(nullptr);
}
ThreadReturnType EQStreamFactoryWriterLoop(void *eqfs)
/**
* Thread entry point for the packet writer loop.
*
* @param eqfs - Pointer to EQStreamFactory instance
* @return Thread return value
*/
ThreadReturnType EQStreamFactoryWriterLoop(void* eqfs)
{
if(eqfs){
EQStreamFactory *fs=(EQStreamFactory *)eqfs;
if (eqfs) {
auto fs = static_cast<EQStreamFactory*>(eqfs);
fs->WriterLoop();
}
THREAD_RETURN(NULL);
THREAD_RETURN(nullptr);
}
ThreadReturnType EQStreamFactoryCombinePacketLoop(void *eqfs)
/**
* Thread entry point for the packet combining loop.
*
* @param eqfs - Pointer to EQStreamFactory instance
* @return Thread return value
*/
ThreadReturnType EQStreamFactoryCombinePacketLoop(void* eqfs)
{
if(eqfs){
EQStreamFactory *fs=(EQStreamFactory *)eqfs;
if (eqfs) {
auto fs = static_cast<EQStreamFactory*>(eqfs);
fs->CombinePacketLoop();
}
THREAD_RETURN(NULL);
THREAD_RETURN(nullptr);
}
/**
* EQStreamFactory constructor with stream type and port.
*
* @param type - Type of streams to create (login/world)
* @param port - Port number to listen on
*/
EQStreamFactory::EQStreamFactory(EQStreamType type, int port)
{
StreamType=type;
Port=port;
listen_ip_address = 0;
StreamType = type;
Port = port;
listen_ip_address = nullptr;
sock = -1;
ReaderRunning = false;
WriterRunning = false;
CombinePacketRunning = false;
DecayTimer = nullptr;
}
/**
* Close the stream factory and clean up all resources.
* Stops all threads, closes socket, and removes all streams.
*/
void EQStreamFactory::Close()
{
CheckTimeout(true);
Stop();
if (sock != -1) {
#ifdef WIN32
closesocket(sock);
#else
close(sock);
#endif
sock = -1;
}
}
/**
* Open the UDP socket and start worker threads.
* Creates reader, writer, and packet combiner threads.
*
* @return true if successful, false on error
*/
bool EQStreamFactory::Open()
{
struct sockaddr_in address;
#ifndef WIN32
struct sockaddr_in address;
pthread_t t1, t2, t3;
#endif
/* Setup internet address information.
This is used with the bind() call */
memset((char *) &address, 0, sizeof(address));
// Setup socket address structure
memset(reinterpret_cast<char*>(&address), 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(Port);
// Set bind address based on configuration
#if defined(LOGIN) || defined(MINILOGIN)
if(listen_ip_address)
if (listen_ip_address) {
address.sin_addr.s_addr = inet_addr(listen_ip_address);
else
} else {
address.sin_addr.s_addr = htonl(INADDR_ANY);
}
#else
address.sin_addr.s_addr = htonl(INADDR_ANY);
#endif
/* Setting up UDP port for new clients */
// Create UDP socket
sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
return false;
}
if (::bind(sock, (struct sockaddr *) &address, sizeof(address)) < 0) {
//close(sock);
sock=-1;
// Bind socket to address
if (::bind(sock, reinterpret_cast<struct sockaddr*>(&address), sizeof(address)) < 0) {
close(sock);
sock = -1;
return false;
}
#ifdef WIN32
unsigned long nonblock = 1;
ioctlsocket(sock, FIONBIO, &nonblock);
#else
fcntl(sock, F_SETFL, O_NONBLOCK);
#endif
//moved these because on windows the output was delayed and causing the console window to look bad
// Set socket to non-blocking mode
fcntl(sock, F_SETFL, O_NONBLOCK);
// Log thread startup
#ifdef LOGIN
LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader");
LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer");
LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader");
LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer");
#elif WORLD
LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader");
LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer");
LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader");
LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer");
#endif
#ifdef WIN32
_beginthread(EQStreamFactoryReaderLoop,0, this);
_beginthread(EQStreamFactoryWriterLoop,0, this);
_beginthread(EQStreamFactoryCombinePacketLoop,0, this);
#else
pthread_create(&t1,NULL,EQStreamFactoryReaderLoop,this);
pthread_create(&t2,NULL,EQStreamFactoryWriterLoop,this);
pthread_create(&t3,NULL,EQStreamFactoryCombinePacketLoop,this);
pthread_detach(t1);
pthread_detach(t2);
pthread_detach(t3);
#endif
// Create and detach worker threads
pthread_create(&t1, nullptr, EQStreamFactoryReaderLoop, this);
pthread_create(&t2, nullptr, EQStreamFactoryWriterLoop, this);
pthread_create(&t3, nullptr, EQStreamFactoryCombinePacketLoop, this);
pthread_detach(t1);
pthread_detach(t2);
pthread_detach(t3);
return true;
}
EQStream *EQStreamFactory::Pop()
/**
* Get the next new stream from the queue.
* Thread-safe method to retrieve newly created streams.
*
* @return Next EQStream from queue, or nullptr if none available
*/
EQStream* EQStreamFactory::Pop()
{
if (!NewStreams.size())
return NULL;
if (!NewStreams.size()) {
return nullptr;
}
EQStream *s=NULL;
//cout << "Pop():Locking MNewStreams" << endl;
EQStream* s = nullptr;
MNewStreams.lock();
if (NewStreams.size()) {
s=NewStreams.front();
s = NewStreams.front();
NewStreams.pop();
s->PutInUse();
}
MNewStreams.unlock();
//cout << "Pop(): Unlocking MNewStreams" << endl;
return s;
}
void EQStreamFactory::Push(EQStream *s)
/**
* Add a new stream to the queue for processing.
* Thread-safe method to queue newly created streams.
*
* @param s - EQStream to add to queue
*/
void EQStreamFactory::Push(EQStream* s)
{
//cout << "Push():Locking MNewStreams" << endl;
MNewStreams.lock();
NewStreams.push(s);
MNewStreams.unlock();
//cout << "Push(): Unlocking MNewStreams" << endl;
}
/**
* Main packet reading loop - runs in separate thread.
* Receives UDP packets and routes them to appropriate streams.
*/
void EQStreamFactory::ReaderLoop()
{
fd_set readset;
map<string,EQStream *>::iterator stream_itr;
int num;
int length;
unsigned char buffer[2048];
sockaddr_in from;
int socklen=sizeof(sockaddr_in);
timeval sleep_time;
ReaderRunning=true;
while(sock!=-1) {
fd_set readset;
map<string, EQStream*>::iterator stream_itr;
int num;
int length;
unsigned char buffer[2048];
sockaddr_in from;
socklen_t socklen = sizeof(sockaddr_in);
timeval sleep_time;
ReaderRunning = true;
while (sock != -1) {
MReaderRunning.lock();
if (!ReaderRunning)
if (!ReaderRunning) {
MReaderRunning.unlock();
break;
}
MReaderRunning.unlock();
// Setup select() for socket monitoring
FD_ZERO(&readset);
FD_SET(sock,&readset);
FD_SET(sock, &readset);
sleep_time.tv_sec=30;
sleep_time.tv_usec=0;
if ((num=select(sock+1,&readset,NULL,NULL,&sleep_time))<0) {
// What do we wanna do?
} else if (num==0)
sleep_time.tv_sec = 30;
sleep_time.tv_usec = 0;
// Wait for incoming data or timeout
num = select(sock + 1, &readset, nullptr, nullptr, &sleep_time);
if (num < 0) {
// Select error - could log this
continue;
} else if (num == 0) {
// Timeout - continue loop
continue;
}
if (FD_ISSET(sock,&readset)) {
#ifdef WIN32
if ((length=recvfrom(sock,(char*)buffer,sizeof(buffer),0,(struct sockaddr*)&from,(int *)&socklen))<2)
#else
if ((length=recvfrom(sock,buffer,2048,0,(struct sockaddr *)&from,(socklen_t *)&socklen))<2)
#endif
{
// What do we wanna do?
} else {
char temp[25];
sprintf(temp,"%u.%d",ntohl(from.sin_addr.s_addr),ntohs(from.sin_port));
MStreams.lock();
if ((stream_itr=Streams.find(temp))==Streams.end() || buffer[1]==OP_SessionRequest) {
MStreams.unlock();
if (buffer[1]==OP_SessionRequest) {
if(stream_itr != Streams.end() && stream_itr->second)
stream_itr->second->SetState(CLOSED);
EQStream *s=new EQStream(from);
s->SetFactory(this);
s->SetStreamType(StreamType);
Streams[temp]=s;
WriterWork.Signal();
Push(s);
s->Process(buffer,length);
s->SetLastPacketTime(Timer::GetCurrentTime2());
// Check if our socket has data
if (FD_ISSET(sock, &readset)) {
length = recvfrom(sock, buffer, 2048, 0,
reinterpret_cast<struct sockaddr*>(&from),
&socklen);
if (length < 2) {
// Packet too small - ignore
continue;
}
// Create address:port string for stream identification
char temp[25];
snprintf(temp, sizeof(temp), "%u.%d",
ntohl(from.sin_addr.s_addr), ntohs(from.sin_port));
MStreams.lock();
stream_itr = Streams.find(temp);
// Handle new connections or session requests
if (stream_itr == Streams.end() || buffer[1] == OP_SessionRequest) {
MStreams.unlock();
if (buffer[1] == OP_SessionRequest) {
// Close existing stream if present
if (stream_itr != Streams.end() && stream_itr->second) {
stream_itr->second->SetState(CLOSED);
}
} else {
EQStream *curstream = stream_itr->second;
//dont bother processing incoming packets for closed connections
if(curstream->CheckClosed())
curstream = NULL;
else
curstream->PutInUse();
MStreams.unlock();
if(curstream) {
curstream->Process(buffer,length);
curstream->SetLastPacketTime(Timer::GetCurrentTime2());
curstream->ReleaseFromUse();
}
// Create new stream
auto s = new EQStream(from);
s->SetFactory(this);
s->SetStreamType(StreamType);
Streams[temp] = s;
WriterWork.Signal();
Push(s);
s->Process(buffer, length);
s->SetLastPacketTime(Timer::GetCurrentTime2());
}
} else {
// Route packet to existing stream
EQStream* curstream = stream_itr->second;
// Skip closed connections
if (curstream->CheckClosed()) {
curstream = nullptr;
} else {
curstream->PutInUse();
}
MStreams.unlock();
if (curstream) {
curstream->Process(buffer, length);
curstream->SetLastPacketTime(Timer::GetCurrentTime2());
curstream->ReleaseFromUse();
}
}
}
}
}
/**
* Check for timed out streams and clean up closed connections.
*
* @param remove_all - If true, remove all streams regardless of state
*/
void EQStreamFactory::CheckTimeout(bool remove_all)
{
//lock streams the entire time were checking timeouts, it should be fast.
// Lock streams for the entire timeout check - should be fast
MStreams.lock();
unsigned long now=Timer::GetCurrentTime2();
map<string,EQStream *>::iterator stream_itr;
unsigned long now = Timer::GetCurrentTime2();
map<string, EQStream*>::iterator stream_itr;
for(stream_itr=Streams.begin();stream_itr!=Streams.end();) {
EQStream *s = stream_itr->second;
for (stream_itr = Streams.begin(); stream_itr != Streams.end();) {
EQStream* s = stream_itr->second;
EQStreamState state = s->GetState();
if (state==CLOSING && !s->HasOutgoingData()) {
// Transition CLOSING streams to CLOSED when no outgoing data
if (state == CLOSING && !s->HasOutgoingData()) {
stream_itr->second->SetState(CLOSED);
state = CLOSED;
} else if (s->CheckTimeout(now, STREAM_TIMEOUT)) {
} else if (s->CheckTimeout(now, STREAM_TIMEOUT)) {
// Handle timeout based on current state
const char* stateString;
switch (state){
switch (state) {
case ESTABLISHED:
stateString = "Established";
break;
@ -290,153 +357,188 @@ void EQStreamFactory::CheckTimeout(bool remove_all)
stateString = "Unknown";
break;
}
LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s (%u)", stateString, state);
if (state==ESTABLISHED) {
LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s (%u)",
stateString, state);
if (state == ESTABLISHED) {
s->Close();
}
else if (state == WAIT_CLOSE) {
} else if (state == WAIT_CLOSE) {
s->SetState(CLOSING);
state = CLOSING;
}
else if (state == CLOSING) {
//if we time out in the closing state, just give up
} else if (state == CLOSING) {
// If we timeout in closing state, force close
s->SetState(CLOSED);
state = CLOSED;
}
}
//not part of the else so we check it right away on state change
if (remove_all || state==CLOSED) {
if (!remove_all && s->getTimeoutDelays()<2) {
// Remove closed streams (check immediately after state changes)
if (remove_all || state == CLOSED) {
if (!remove_all && s->getTimeoutDelays() < 2) {
s->addTimeoutDelay();
//give it a little time for everybody to finish with it
// Give other threads time to finish with this stream
++stream_itr;
} else {
//everybody is done, we can delete it now
// Safe to delete now
#ifdef LOGIN
LogWrite(LOGIN__DEBUG, 0, "Login", "Removing connection...");
#else
LogWrite(WORLD__DEBUG, 0, "World", "Removing connection...");
#endif
map<string,EQStream *>::iterator temp=stream_itr;
stream_itr++;
//let whoever has the stream outside delete it
#ifdef WORLD
auto temp = stream_itr;
++stream_itr;
// Let client list handle cleanup if in world server
#ifdef WORLD
client_list.RemoveConnection(temp->second);
#endif
#endif
EQStream* stream = temp->second;
Streams.erase(temp);
delete stream;
continue;
}
} else {
++stream_itr;
}
stream_itr++;
}
MStreams.unlock();
}
void EQStreamFactory::CombinePacketLoop(){
/**
* Packet combining optimization loop - runs in separate thread.
* Combines multiple small packets for efficient transmission.
*/
void EQStreamFactory::CombinePacketLoop()
{
deque<EQStream*> combine_que;
CombinePacketRunning = true;
bool packets_waiting = false;
while(sock!=-1) {
if (!CombinePacketRunning)
while (sock != -1) {
if (!CombinePacketRunning) {
break;
}
MStreams.lock();
map<string,EQStream *>::iterator stream_itr;
for(stream_itr=Streams.begin();stream_itr!=Streams.end();stream_itr++) {
if(!stream_itr->second){
// Check all streams for combine timer expiration
for (auto& stream_pair : Streams) {
if (!stream_pair.second) {
continue;
}
if(stream_itr->second->combine_timer && stream_itr->second->combine_timer->Check())
combine_que.push_back(stream_itr->second);
if (stream_pair.second->combine_timer &&
stream_pair.second->combine_timer->Check()) {
combine_que.push_back(stream_pair.second);
}
}
EQStream* stream = 0;
// Process streams that need packet combining
packets_waiting = false;
while(combine_que.size()){
stream = combine_que.front();
if(stream->CheckActive()){
if(!stream->CheckCombineQueue())
while (!combine_que.empty()) {
EQStream* stream = combine_que.front();
if (stream->CheckActive()) {
if (!stream->CheckCombineQueue()) {
packets_waiting = true;
}
}
combine_que.pop_front();
}
MStreams.unlock();
if(!packets_waiting)
Sleep(25);
// Sleep longer if no packets are waiting
if (!packets_waiting) {
usleep(25000); // 25ms
}
Sleep(1);
usleep(1000); // 1ms
}
}
/**
* Main packet writing loop - runs in separate thread.
* Handles outgoing packet transmission and resends.
*/
void EQStreamFactory::WriterLoop()
{
map<string,EQStream *>::iterator stream_itr;
vector<EQStream *> wants_write;
vector<EQStream *>::iterator cur,end;
deque<EQStream*> resend_que;
bool decay=false;
uint32 stream_count;
Timer DecayTimer(20);
map<string, EQStream*>::iterator stream_itr;
vector<EQStream*> wants_write;
vector<EQStream*>::iterator cur, end;
deque<EQStream*> resend_que;
bool decay = false;
uint32 stream_count;
WriterRunning=true;
Timer DecayTimer(20);
WriterRunning = true;
DecayTimer.Enable();
while(sock!=-1) {
while (sock != -1) {
Timer::SetCurrentTime();
//if (!havework) {
//WriterWork.Wait();
//}
MWriterRunning.lock();
if (!WriterRunning)
if (!WriterRunning) {
MWriterRunning.unlock();
break;
}
MWriterRunning.unlock();
wants_write.clear();
decay=DecayTimer.Check();
resend_que.clear();
//copy streams into a seperate list so we dont have to keep
//MStreams locked while we are writting
decay = DecayTimer.Check();
// Copy streams into separate list to minimize lock time
MStreams.lock();
for(stream_itr=Streams.begin();stream_itr!=Streams.end();stream_itr++) {
// If it's time to decay the bytes sent, then let's do it before we try to write
if(!stream_itr->second){
Streams.erase(stream_itr);
break;
for (stream_itr = Streams.begin(); stream_itr != Streams.end(); ++stream_itr) {
if (!stream_itr->second) {
// This shouldn't happen, but handle gracefully
continue;
}
if (decay)
// Apply bandwidth decay if it's time
if (decay) {
stream_itr->second->Decay();
}
// Queue streams with outgoing data
if (stream_itr->second->HasOutgoingData()) {
stream_itr->second->PutInUse();
wants_write.push_back(stream_itr->second);
}
if(stream_itr->second->resend_que_timer->Check())
// Queue streams that need resend processing
if (stream_itr->second->resend_que_timer->Check()) {
resend_que.push_back(stream_itr->second);
}
}
MStreams.unlock();
//do the actual writes
cur = wants_write.begin();
end = wants_write.end();
for(; cur != end; cur++) {
// Perform actual packet writes
for (cur = wants_write.begin(), end = wants_write.end();
cur != end; ++cur) {
(*cur)->Write(sock);
(*cur)->ReleaseFromUse();
}
while(resend_que.size()){
// Handle packet resends
while (!resend_que.empty()) {
resend_que.front()->CheckResend(sock);
resend_que.pop_front();
}
Sleep(10);
usleep(10000); // 10ms sleep
// Check if we have any streams - wait if not
MStreams.lock();
stream_count=Streams.size();
stream_count = Streams.size();
MStreams.unlock();
if (!stream_count) {
//cout << "No streams, waiting on condition" << endl;
WriterWork.Wait();
//cout << "Awake from condition, must have a stream now" << endl;
}
}
}

View File

@ -1,86 +1,125 @@
/*
EQ2Emulator: Everquest II Server Emulator
Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net)
This file is part of EQ2Emulator.
EQ2Emulator is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
EQ2Emulator is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with EQ2Emulator. If not, see <http://www.gnu.org/licenses/>.
*/
// EQ2Emulator: Everquest II Server Emulator
// Copyright (C) 2007 EQ2EMulator Development Team
// Licensed under GPL v3 - see <http://www.gnu.org/licenses/>
#ifndef _EQSTREAMFACTORY_H
#define _EQSTREAMFACTORY_H
#include <queue>
#include <map>
#include <string>
#include <memory>
#include "../common/EQStream.h"
#include "../common/Condition.h"
#include "../common/opcodemgr.h"
#include "../common/timer.h"
#define STREAM_TIMEOUT 45000 //in ms
#define STREAM_TIMEOUT 45000 // Stream timeout in milliseconds
/**
* Factory class for creating and managing EverQuest network streams.
* Handles UDP socket communication, stream lifecycle, and packet processing
* for both login and world server connections.
*/
class EQStreamFactory {
private:
int sock;
int Port;
bool ReaderRunning;
Mutex MReaderRunning;
bool WriterRunning;
Mutex MWriterRunning;
bool CombinePacketRunning;
Mutex MCombinePacketRunning;
Condition WriterWork;
EQStreamType StreamType;
queue<EQStream *> NewStreams;
Mutex MNewStreams;
map<string,EQStream *> Streams;
Mutex MStreams;
Timer *DecayTimer;
public:
char* listen_ip_address;
void CheckTimeout(bool remove_all = false);
EQStreamFactory(EQStreamType type) { ReaderRunning=false; WriterRunning=false; StreamType=type; }
EQStreamFactory(EQStreamType type, int port);
~EQStreamFactory(){
safe_delete_array(listen_ip_address);
}
EQStream *Pop();
void Push(EQStream *s);
bool loadPublicKey();
bool Open();
bool Open(unsigned long port) { Port=port; return Open(); }
void Close();
void ReaderLoop();
void WriterLoop();
void CombinePacketLoop();
void Stop() { StopReader(); StopWriter(); StopCombinePacket(); }
void StopReader() { MReaderRunning.lock(); ReaderRunning=false; MReaderRunning.unlock(); }
void StopWriter() { MWriterRunning.lock(); WriterRunning=false; MWriterRunning.unlock(); WriterWork.Signal(); }
void StopCombinePacket() { MCombinePacketRunning.lock(); CombinePacketRunning=false; MCombinePacketRunning.unlock(); }
void SignalWriter() { WriterWork.Signal(); }
private:
// Network socket and configuration
int sock; // UDP socket file descriptor
int Port; // Port number to listen on
// Thread management flags and mutexes
bool ReaderRunning; // Reader thread active flag
Mutex MReaderRunning; // Mutex for reader thread flag
bool WriterRunning; // Writer thread active flag
Mutex MWriterRunning; // Mutex for writer thread flag
bool CombinePacketRunning; // Packet combiner thread active flag
Mutex MCombinePacketRunning; // Mutex for combiner thread flag
// Thread synchronization
Condition WriterWork; // Condition variable for writer thread
// Stream management
EQStreamType StreamType; // Type of streams this factory creates
std::queue<EQStream*> NewStreams; // Queue of new streams waiting for processing
Mutex MNewStreams; // Mutex for new streams queue
std::map<std::string, EQStream*> Streams; // Active streams mapped by address:port
Mutex MStreams; // Mutex for streams map
// Cleanup timer
Timer* DecayTimer; // Timer for periodic cleanup operations
public:
// Network configuration
char* listen_ip_address; // IP address to bind to (nullptr = any)
// Stream lifecycle management
void CheckTimeout(bool remove_all = false);
// Constructors and destructor
EQStreamFactory(EQStreamType type) {
ReaderRunning = false;
WriterRunning = false;
CombinePacketRunning = false;
StreamType = type;
sock = -1;
Port = 0;
listen_ip_address = nullptr;
DecayTimer = nullptr;
}
EQStreamFactory(EQStreamType type, int port);
~EQStreamFactory() {
safe_delete_array(listen_ip_address);
}
// Stream queue management
EQStream* Pop(); // Get next new stream from queue
void Push(EQStream* s); // Add new stream to queue
// Network operations
bool loadPublicKey(); // Load encryption keys (if needed)
bool Open(); // Open socket and start threads
bool Open(unsigned long port) {
Port = port;
return Open();
}
void Close(); // Close socket and stop threads
// Main thread loops
void ReaderLoop(); // Main packet reading loop
void WriterLoop(); // Main packet writing loop
void CombinePacketLoop(); // Packet combining optimization loop
// Thread control
void Stop() {
StopReader();
StopWriter();
StopCombinePacket();
}
void StopReader() {
MReaderRunning.lock();
ReaderRunning = false;
MReaderRunning.unlock();
}
void StopWriter() {
MWriterRunning.lock();
WriterRunning = false;
MWriterRunning.unlock();
WriterWork.Signal();
}
void StopCombinePacket() {
MCombinePacketRunning.lock();
CombinePacketRunning = false;
MCombinePacketRunning.unlock();
}
void SignalWriter() {
WriterWork.Signal();
}
};
#endif