diff --git a/source/common/EQStreamFactory.cpp b/source/common/EQStreamFactory.cpp index 965865e..4abe59e 100644 --- a/source/common/EQStreamFactory.cpp +++ b/source/common/EQStreamFactory.cpp @@ -1,279 +1,346 @@ -/* - EQ2Emulator: Everquest II Server Emulator - Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net) - - This file is part of EQ2Emulator. - - EQ2Emulator is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - EQ2Emulator is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with EQ2Emulator. If not, see . -*/ +// EQ2Emulator: Everquest II Server Emulator +// Copyright (C) 2007 EQ2EMulator Development Team +// Licensed under GPL v3 - see #include "EQStreamFactory.h" #include "Log.h" -#ifdef WIN32 - #include - #include - #include - #include - #include -#else - #include - #include - #include - #include - #include - #include -#endif +// Unix/Linux networking headers +#include +#include +#include +#include +#include +#include #include +#include + +// Standard library headers #include #include +#include +#include +#include +#include + +// Project headers #include "op_codes.h" #include "EQStream.h" #include "packet_dump.h" #ifdef WORLD #include "../WorldServer/client.h" #endif + using namespace std; #ifdef WORLD - extern ClientList client_list; +extern ClientList client_list; #endif -ThreadReturnType EQStreamFactoryReaderLoop(void *eqfs) + +/** + * Thread entry point for the packet reader loop. + * + * @param eqfs - Pointer to EQStreamFactory instance + * @return Thread return value + */ +ThreadReturnType EQStreamFactoryReaderLoop(void* eqfs) { - if(eqfs){ - EQStreamFactory *fs=(EQStreamFactory *)eqfs; + if (eqfs) { + auto fs = static_cast(eqfs); fs->ReaderLoop(); } - THREAD_RETURN(NULL); + THREAD_RETURN(nullptr); } -ThreadReturnType EQStreamFactoryWriterLoop(void *eqfs) +/** + * Thread entry point for the packet writer loop. + * + * @param eqfs - Pointer to EQStreamFactory instance + * @return Thread return value + */ +ThreadReturnType EQStreamFactoryWriterLoop(void* eqfs) { - if(eqfs){ - EQStreamFactory *fs=(EQStreamFactory *)eqfs; + if (eqfs) { + auto fs = static_cast(eqfs); fs->WriterLoop(); } - THREAD_RETURN(NULL); + THREAD_RETURN(nullptr); } -ThreadReturnType EQStreamFactoryCombinePacketLoop(void *eqfs) +/** + * Thread entry point for the packet combining loop. + * + * @param eqfs - Pointer to EQStreamFactory instance + * @return Thread return value + */ +ThreadReturnType EQStreamFactoryCombinePacketLoop(void* eqfs) { - if(eqfs){ - EQStreamFactory *fs=(EQStreamFactory *)eqfs; + if (eqfs) { + auto fs = static_cast(eqfs); fs->CombinePacketLoop(); } - THREAD_RETURN(NULL); + THREAD_RETURN(nullptr); } +/** + * EQStreamFactory constructor with stream type and port. + * + * @param type - Type of streams to create (login/world) + * @param port - Port number to listen on + */ EQStreamFactory::EQStreamFactory(EQStreamType type, int port) { - StreamType=type; - Port=port; - listen_ip_address = 0; + StreamType = type; + Port = port; + listen_ip_address = nullptr; + sock = -1; + ReaderRunning = false; + WriterRunning = false; + CombinePacketRunning = false; + DecayTimer = nullptr; } +/** + * Close the stream factory and clean up all resources. + * Stops all threads, closes socket, and removes all streams. + */ void EQStreamFactory::Close() { CheckTimeout(true); Stop(); if (sock != -1) { -#ifdef WIN32 - closesocket(sock); -#else close(sock); -#endif sock = -1; } } + +/** + * Open the UDP socket and start worker threads. + * Creates reader, writer, and packet combiner threads. + * + * @return true if successful, false on error + */ bool EQStreamFactory::Open() { -struct sockaddr_in address; -#ifndef WIN32 + struct sockaddr_in address; pthread_t t1, t2, t3; -#endif - /* Setup internet address information. - This is used with the bind() call */ - memset((char *) &address, 0, sizeof(address)); + + // Setup socket address structure + memset(reinterpret_cast(&address), 0, sizeof(address)); address.sin_family = AF_INET; address.sin_port = htons(Port); + + // Set bind address based on configuration #if defined(LOGIN) || defined(MINILOGIN) - if(listen_ip_address) + if (listen_ip_address) { address.sin_addr.s_addr = inet_addr(listen_ip_address); - else + } else { address.sin_addr.s_addr = htonl(INADDR_ANY); + } #else address.sin_addr.s_addr = htonl(INADDR_ANY); #endif - /* Setting up UDP port for new clients */ + + // Create UDP socket sock = socket(AF_INET, SOCK_DGRAM, 0); if (sock < 0) { return false; } - if (::bind(sock, (struct sockaddr *) &address, sizeof(address)) < 0) { - //close(sock); - sock=-1; + // Bind socket to address + if (::bind(sock, reinterpret_cast(&address), sizeof(address)) < 0) { + close(sock); + sock = -1; return false; } - #ifdef WIN32 - unsigned long nonblock = 1; - ioctlsocket(sock, FIONBIO, &nonblock); - #else - fcntl(sock, F_SETFL, O_NONBLOCK); - #endif - //moved these because on windows the output was delayed and causing the console window to look bad + + // Set socket to non-blocking mode + fcntl(sock, F_SETFL, O_NONBLOCK); + + // Log thread startup #ifdef LOGIN - LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader"); - LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer"); + LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader"); + LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer"); #elif WORLD - LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader"); - LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer"); + LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader"); + LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer"); #endif - #ifdef WIN32 - _beginthread(EQStreamFactoryReaderLoop,0, this); - _beginthread(EQStreamFactoryWriterLoop,0, this); - _beginthread(EQStreamFactoryCombinePacketLoop,0, this); - #else - pthread_create(&t1,NULL,EQStreamFactoryReaderLoop,this); - pthread_create(&t2,NULL,EQStreamFactoryWriterLoop,this); - pthread_create(&t3,NULL,EQStreamFactoryCombinePacketLoop,this); - pthread_detach(t1); - pthread_detach(t2); - pthread_detach(t3); - #endif + + // Create and detach worker threads + pthread_create(&t1, nullptr, EQStreamFactoryReaderLoop, this); + pthread_create(&t2, nullptr, EQStreamFactoryWriterLoop, this); + pthread_create(&t3, nullptr, EQStreamFactoryCombinePacketLoop, this); + pthread_detach(t1); + pthread_detach(t2); + pthread_detach(t3); + return true; } -EQStream *EQStreamFactory::Pop() +/** + * Get the next new stream from the queue. + * Thread-safe method to retrieve newly created streams. + * + * @return Next EQStream from queue, or nullptr if none available + */ +EQStream* EQStreamFactory::Pop() { - if (!NewStreams.size()) - return NULL; + if (!NewStreams.size()) { + return nullptr; + } -EQStream *s=NULL; - //cout << "Pop():Locking MNewStreams" << endl; + EQStream* s = nullptr; MNewStreams.lock(); if (NewStreams.size()) { - s=NewStreams.front(); + s = NewStreams.front(); NewStreams.pop(); s->PutInUse(); } MNewStreams.unlock(); - //cout << "Pop(): Unlocking MNewStreams" << endl; return s; } -void EQStreamFactory::Push(EQStream *s) +/** + * Add a new stream to the queue for processing. + * Thread-safe method to queue newly created streams. + * + * @param s - EQStream to add to queue + */ +void EQStreamFactory::Push(EQStream* s) { - //cout << "Push():Locking MNewStreams" << endl; MNewStreams.lock(); NewStreams.push(s); MNewStreams.unlock(); - //cout << "Push(): Unlocking MNewStreams" << endl; } +/** + * Main packet reading loop - runs in separate thread. + * Receives UDP packets and routes them to appropriate streams. + */ void EQStreamFactory::ReaderLoop() { -fd_set readset; -map::iterator stream_itr; -int num; -int length; -unsigned char buffer[2048]; -sockaddr_in from; -int socklen=sizeof(sockaddr_in); -timeval sleep_time; - ReaderRunning=true; - while(sock!=-1) { + fd_set readset; + map::iterator stream_itr; + int num; + int length; + unsigned char buffer[2048]; + sockaddr_in from; + socklen_t socklen = sizeof(sockaddr_in); + timeval sleep_time; + + ReaderRunning = true; + while (sock != -1) { MReaderRunning.lock(); - if (!ReaderRunning) + if (!ReaderRunning) { + MReaderRunning.unlock(); break; + } MReaderRunning.unlock(); + // Setup select() for socket monitoring FD_ZERO(&readset); - FD_SET(sock,&readset); + FD_SET(sock, &readset); - sleep_time.tv_sec=30; - sleep_time.tv_usec=0; - if ((num=select(sock+1,&readset,NULL,NULL,&sleep_time))<0) { - // What do we wanna do? - } else if (num==0) + sleep_time.tv_sec = 30; + sleep_time.tv_usec = 0; + + // Wait for incoming data or timeout + num = select(sock + 1, &readset, nullptr, nullptr, &sleep_time); + if (num < 0) { + // Select error - could log this continue; + } else if (num == 0) { + // Timeout - continue loop + continue; + } - if (FD_ISSET(sock,&readset)) { -#ifdef WIN32 - if ((length=recvfrom(sock,(char*)buffer,sizeof(buffer),0,(struct sockaddr*)&from,(int *)&socklen))<2) -#else - if ((length=recvfrom(sock,buffer,2048,0,(struct sockaddr *)&from,(socklen_t *)&socklen))<2) -#endif - { - // What do we wanna do? - } else { - char temp[25]; - sprintf(temp,"%u.%d",ntohl(from.sin_addr.s_addr),ntohs(from.sin_port)); - MStreams.lock(); - if ((stream_itr=Streams.find(temp))==Streams.end() || buffer[1]==OP_SessionRequest) { - MStreams.unlock(); - if (buffer[1]==OP_SessionRequest) { - if(stream_itr != Streams.end() && stream_itr->second) - stream_itr->second->SetState(CLOSED); - EQStream *s=new EQStream(from); - s->SetFactory(this); - s->SetStreamType(StreamType); - Streams[temp]=s; - WriterWork.Signal(); - Push(s); - s->Process(buffer,length); - s->SetLastPacketTime(Timer::GetCurrentTime2()); + // Check if our socket has data + if (FD_ISSET(sock, &readset)) { + length = recvfrom(sock, buffer, 2048, 0, + reinterpret_cast(&from), + &socklen); + + if (length < 2) { + // Packet too small - ignore + continue; + } + + // Create address:port string for stream identification + char temp[25]; + snprintf(temp, sizeof(temp), "%u.%d", + ntohl(from.sin_addr.s_addr), ntohs(from.sin_port)); + + MStreams.lock(); + stream_itr = Streams.find(temp); + + // Handle new connections or session requests + if (stream_itr == Streams.end() || buffer[1] == OP_SessionRequest) { + MStreams.unlock(); + + if (buffer[1] == OP_SessionRequest) { + // Close existing stream if present + if (stream_itr != Streams.end() && stream_itr->second) { + stream_itr->second->SetState(CLOSED); } - } else { - EQStream *curstream = stream_itr->second; - //dont bother processing incoming packets for closed connections - if(curstream->CheckClosed()) - curstream = NULL; - else - curstream->PutInUse(); - MStreams.unlock(); - if(curstream) { - curstream->Process(buffer,length); - curstream->SetLastPacketTime(Timer::GetCurrentTime2()); - curstream->ReleaseFromUse(); - } + // Create new stream + auto s = new EQStream(from); + s->SetFactory(this); + s->SetStreamType(StreamType); + Streams[temp] = s; + WriterWork.Signal(); + Push(s); + s->Process(buffer, length); + s->SetLastPacketTime(Timer::GetCurrentTime2()); + } + } else { + // Route packet to existing stream + EQStream* curstream = stream_itr->second; + + // Skip closed connections + if (curstream->CheckClosed()) { + curstream = nullptr; + } else { + curstream->PutInUse(); + } + MStreams.unlock(); + + if (curstream) { + curstream->Process(buffer, length); + curstream->SetLastPacketTime(Timer::GetCurrentTime2()); + curstream->ReleaseFromUse(); } } } } } +/** + * Check for timed out streams and clean up closed connections. + * + * @param remove_all - If true, remove all streams regardless of state + */ void EQStreamFactory::CheckTimeout(bool remove_all) { - //lock streams the entire time were checking timeouts, it should be fast. + // Lock streams for the entire timeout check - should be fast MStreams.lock(); - unsigned long now=Timer::GetCurrentTime2(); - map::iterator stream_itr; + unsigned long now = Timer::GetCurrentTime2(); + map::iterator stream_itr; - for(stream_itr=Streams.begin();stream_itr!=Streams.end();) { - EQStream *s = stream_itr->second; + for (stream_itr = Streams.begin(); stream_itr != Streams.end();) { + EQStream* s = stream_itr->second; EQStreamState state = s->GetState(); - if (state==CLOSING && !s->HasOutgoingData()) { + // Transition CLOSING streams to CLOSED when no outgoing data + if (state == CLOSING && !s->HasOutgoingData()) { stream_itr->second->SetState(CLOSED); state = CLOSED; - } else if (s->CheckTimeout(now, STREAM_TIMEOUT)) { + } else if (s->CheckTimeout(now, STREAM_TIMEOUT)) { + // Handle timeout based on current state const char* stateString; - switch (state){ + switch (state) { case ESTABLISHED: stateString = "Established"; break; @@ -290,153 +357,188 @@ void EQStreamFactory::CheckTimeout(bool remove_all) stateString = "Unknown"; break; } - LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s (%u)", stateString, state); - if (state==ESTABLISHED) { + + LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s (%u)", + stateString, state); + + if (state == ESTABLISHED) { s->Close(); - } - else if (state == WAIT_CLOSE) { + } else if (state == WAIT_CLOSE) { s->SetState(CLOSING); state = CLOSING; - } - else if (state == CLOSING) { - //if we time out in the closing state, just give up + } else if (state == CLOSING) { + // If we timeout in closing state, force close s->SetState(CLOSED); state = CLOSED; } } - //not part of the else so we check it right away on state change - if (remove_all || state==CLOSED) { - if (!remove_all && s->getTimeoutDelays()<2) { + + // Remove closed streams (check immediately after state changes) + if (remove_all || state == CLOSED) { + if (!remove_all && s->getTimeoutDelays() < 2) { s->addTimeoutDelay(); - //give it a little time for everybody to finish with it + // Give other threads time to finish with this stream + ++stream_itr; } else { - //everybody is done, we can delete it now - + // Safe to delete now #ifdef LOGIN LogWrite(LOGIN__DEBUG, 0, "Login", "Removing connection..."); #else LogWrite(WORLD__DEBUG, 0, "World", "Removing connection..."); #endif - map::iterator temp=stream_itr; - stream_itr++; - //let whoever has the stream outside delete it - #ifdef WORLD + auto temp = stream_itr; + ++stream_itr; + + // Let client list handle cleanup if in world server +#ifdef WORLD client_list.RemoveConnection(temp->second); - #endif +#endif EQStream* stream = temp->second; Streams.erase(temp); delete stream; continue; } + } else { + ++stream_itr; } - - stream_itr++; } + MStreams.unlock(); } -void EQStreamFactory::CombinePacketLoop(){ +/** + * Packet combining optimization loop - runs in separate thread. + * Combines multiple small packets for efficient transmission. + */ +void EQStreamFactory::CombinePacketLoop() +{ deque combine_que; CombinePacketRunning = true; bool packets_waiting = false; - while(sock!=-1) { - if (!CombinePacketRunning) + + while (sock != -1) { + if (!CombinePacketRunning) { break; + } + MStreams.lock(); - map::iterator stream_itr; - for(stream_itr=Streams.begin();stream_itr!=Streams.end();stream_itr++) { - if(!stream_itr->second){ + + // Check all streams for combine timer expiration + for (auto& stream_pair : Streams) { + if (!stream_pair.second) { continue; } - if(stream_itr->second->combine_timer && stream_itr->second->combine_timer->Check()) - combine_que.push_back(stream_itr->second); + + if (stream_pair.second->combine_timer && + stream_pair.second->combine_timer->Check()) { + combine_que.push_back(stream_pair.second); + } } - EQStream* stream = 0; + + // Process streams that need packet combining packets_waiting = false; - while(combine_que.size()){ - stream = combine_que.front(); - if(stream->CheckActive()){ - if(!stream->CheckCombineQueue()) + while (!combine_que.empty()) { + EQStream* stream = combine_que.front(); + if (stream->CheckActive()) { + if (!stream->CheckCombineQueue()) { packets_waiting = true; + } } combine_que.pop_front(); } + MStreams.unlock(); - if(!packets_waiting) - Sleep(25); + + // Sleep longer if no packets are waiting + if (!packets_waiting) { + usleep(25000); // 25ms + } - Sleep(1); + usleep(1000); // 1ms } } +/** + * Main packet writing loop - runs in separate thread. + * Handles outgoing packet transmission and resends. + */ void EQStreamFactory::WriterLoop() { -map::iterator stream_itr; -vector wants_write; -vector::iterator cur,end; -deque resend_que; -bool decay=false; -uint32 stream_count; - -Timer DecayTimer(20); + map::iterator stream_itr; + vector wants_write; + vector::iterator cur, end; + deque resend_que; + bool decay = false; + uint32 stream_count; - WriterRunning=true; + Timer DecayTimer(20); + + WriterRunning = true; DecayTimer.Enable(); - while(sock!=-1) { + + while (sock != -1) { Timer::SetCurrentTime(); - //if (!havework) { - //WriterWork.Wait(); - //} + MWriterRunning.lock(); - if (!WriterRunning) + if (!WriterRunning) { + MWriterRunning.unlock(); break; + } MWriterRunning.unlock(); wants_write.clear(); - - decay=DecayTimer.Check(); + resend_que.clear(); - //copy streams into a seperate list so we dont have to keep - //MStreams locked while we are writting + decay = DecayTimer.Check(); + + // Copy streams into separate list to minimize lock time MStreams.lock(); - for(stream_itr=Streams.begin();stream_itr!=Streams.end();stream_itr++) { - // If it's time to decay the bytes sent, then let's do it before we try to write - if(!stream_itr->second){ - Streams.erase(stream_itr); - break; + for (stream_itr = Streams.begin(); stream_itr != Streams.end(); ++stream_itr) { + if (!stream_itr->second) { + // This shouldn't happen, but handle gracefully + continue; } - if (decay) + + // Apply bandwidth decay if it's time + if (decay) { stream_itr->second->Decay(); + } + // Queue streams with outgoing data if (stream_itr->second->HasOutgoingData()) { stream_itr->second->PutInUse(); wants_write.push_back(stream_itr->second); } - if(stream_itr->second->resend_que_timer->Check()) + + // Queue streams that need resend processing + if (stream_itr->second->resend_que_timer->Check()) { resend_que.push_back(stream_itr->second); + } } MStreams.unlock(); - //do the actual writes - cur = wants_write.begin(); - end = wants_write.end(); - for(; cur != end; cur++) { + // Perform actual packet writes + for (cur = wants_write.begin(), end = wants_write.end(); + cur != end; ++cur) { (*cur)->Write(sock); (*cur)->ReleaseFromUse(); } - while(resend_que.size()){ + + // Handle packet resends + while (!resend_que.empty()) { resend_que.front()->CheckResend(sock); resend_que.pop_front(); } - Sleep(10); + + usleep(10000); // 10ms sleep + // Check if we have any streams - wait if not MStreams.lock(); - stream_count=Streams.size(); + stream_count = Streams.size(); MStreams.unlock(); + if (!stream_count) { - //cout << "No streams, waiting on condition" << endl; WriterWork.Wait(); - //cout << "Awake from condition, must have a stream now" << endl; } } } diff --git a/source/common/EQStreamFactory.h b/source/common/EQStreamFactory.h index 9dce3c7..c886faa 100644 --- a/source/common/EQStreamFactory.h +++ b/source/common/EQStreamFactory.h @@ -1,86 +1,125 @@ -/* - EQ2Emulator: Everquest II Server Emulator - Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net) - - This file is part of EQ2Emulator. - - EQ2Emulator is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - EQ2Emulator is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with EQ2Emulator. If not, see . -*/ +// EQ2Emulator: Everquest II Server Emulator +// Copyright (C) 2007 EQ2EMulator Development Team +// Licensed under GPL v3 - see #ifndef _EQSTREAMFACTORY_H - #define _EQSTREAMFACTORY_H #include #include +#include +#include + #include "../common/EQStream.h" #include "../common/Condition.h" #include "../common/opcodemgr.h" #include "../common/timer.h" -#define STREAM_TIMEOUT 45000 //in ms +#define STREAM_TIMEOUT 45000 // Stream timeout in milliseconds +/** + * Factory class for creating and managing EverQuest network streams. + * Handles UDP socket communication, stream lifecycle, and packet processing + * for both login and world server connections. + */ class EQStreamFactory { - private: - int sock; - int Port; - - bool ReaderRunning; - Mutex MReaderRunning; - bool WriterRunning; - Mutex MWriterRunning; - bool CombinePacketRunning; - Mutex MCombinePacketRunning; - - Condition WriterWork; - - EQStreamType StreamType; - - queue NewStreams; - Mutex MNewStreams; - - map Streams; - Mutex MStreams; - - - - Timer *DecayTimer; - - public: - char* listen_ip_address; - void CheckTimeout(bool remove_all = false); - EQStreamFactory(EQStreamType type) { ReaderRunning=false; WriterRunning=false; StreamType=type; } - EQStreamFactory(EQStreamType type, int port); - ~EQStreamFactory(){ - safe_delete_array(listen_ip_address); - } - - EQStream *Pop(); - void Push(EQStream *s); - - bool loadPublicKey(); - bool Open(); - bool Open(unsigned long port) { Port=port; return Open(); } - void Close(); - void ReaderLoop(); - void WriterLoop(); - void CombinePacketLoop(); - void Stop() { StopReader(); StopWriter(); StopCombinePacket(); } - void StopReader() { MReaderRunning.lock(); ReaderRunning=false; MReaderRunning.unlock(); } - void StopWriter() { MWriterRunning.lock(); WriterRunning=false; MWriterRunning.unlock(); WriterWork.Signal(); } - void StopCombinePacket() { MCombinePacketRunning.lock(); CombinePacketRunning=false; MCombinePacketRunning.unlock(); } - void SignalWriter() { WriterWork.Signal(); } +private: + // Network socket and configuration + int sock; // UDP socket file descriptor + int Port; // Port number to listen on + + // Thread management flags and mutexes + bool ReaderRunning; // Reader thread active flag + Mutex MReaderRunning; // Mutex for reader thread flag + bool WriterRunning; // Writer thread active flag + Mutex MWriterRunning; // Mutex for writer thread flag + bool CombinePacketRunning; // Packet combiner thread active flag + Mutex MCombinePacketRunning; // Mutex for combiner thread flag + + // Thread synchronization + Condition WriterWork; // Condition variable for writer thread + + // Stream management + EQStreamType StreamType; // Type of streams this factory creates + std::queue NewStreams; // Queue of new streams waiting for processing + Mutex MNewStreams; // Mutex for new streams queue + std::map Streams; // Active streams mapped by address:port + Mutex MStreams; // Mutex for streams map + + // Cleanup timer + Timer* DecayTimer; // Timer for periodic cleanup operations +public: + // Network configuration + char* listen_ip_address; // IP address to bind to (nullptr = any) + + // Stream lifecycle management + void CheckTimeout(bool remove_all = false); + + // Constructors and destructor + EQStreamFactory(EQStreamType type) { + ReaderRunning = false; + WriterRunning = false; + CombinePacketRunning = false; + StreamType = type; + sock = -1; + Port = 0; + listen_ip_address = nullptr; + DecayTimer = nullptr; + } + + EQStreamFactory(EQStreamType type, int port); + + ~EQStreamFactory() { + safe_delete_array(listen_ip_address); + } + + // Stream queue management + EQStream* Pop(); // Get next new stream from queue + void Push(EQStream* s); // Add new stream to queue + + // Network operations + bool loadPublicKey(); // Load encryption keys (if needed) + bool Open(); // Open socket and start threads + bool Open(unsigned long port) { + Port = port; + return Open(); + } + void Close(); // Close socket and stop threads + + // Main thread loops + void ReaderLoop(); // Main packet reading loop + void WriterLoop(); // Main packet writing loop + void CombinePacketLoop(); // Packet combining optimization loop + + // Thread control + void Stop() { + StopReader(); + StopWriter(); + StopCombinePacket(); + } + + void StopReader() { + MReaderRunning.lock(); + ReaderRunning = false; + MReaderRunning.unlock(); + } + + void StopWriter() { + MWriterRunning.lock(); + WriterRunning = false; + MWriterRunning.unlock(); + WriterWork.Signal(); + } + + void StopCombinePacket() { + MCombinePacketRunning.lock(); + CombinePacketRunning = false; + MCombinePacketRunning.unlock(); + } + + void SignalWriter() { + WriterWork.Signal(); + } }; #endif