// Copyright (C) 2007 EQ2EMulator Development Team - GPL v3 License #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "log.hpp" #include "timer.hpp" #include "eq_stream.hpp" #include "../packet/packet_dump.hpp" #include "../opcodes/opcodes.hpp" #include "../opcodes/opcode_manager.hpp" #ifdef WORLD #include "../../WorldServer/client.h" extern ClientList client_list; #endif #define STREAM_TIMEOUT 45000 class EQStreamFactory { public: char* listen_ip_address; // Default constructor - initializes factory with stream type only EQStreamFactory(EQStreamType type) { ReaderRunning = false; WriterRunning = false; CombinePacketRunning = false; StreamType = type; Port = 0; sock = -1; listen_ip_address = nullptr; } // Constructor with port - initializes factory with stream type and port EQStreamFactory(EQStreamType type, int port) { StreamType = type; Port = port; listen_ip_address = nullptr; sock = -1; ReaderRunning = false; WriterRunning = false; CombinePacketRunning = false; } // Destructor - cleans up allocated resources ~EQStreamFactory() { if (listen_ip_address) { delete[] listen_ip_address; } } // Retrieves the next available stream from the new streams queue EQStream* Pop() { if (!NewStreams.size()) return nullptr; EQStream* s = nullptr; std::lock_guard lock(MNewStreams); if (NewStreams.size()) { s = NewStreams.front(); NewStreams.pop(); s->PutInUse(); } return s; } // Adds a new stream to the new streams queue void Push(EQStream* s) { std::lock_guard lock(MNewStreams); NewStreams.push(s); } // Opens the socket and starts the reader/writer/combine packet threads bool Open() { struct sockaddr_in address; // Setup internet address information for bind() call memset(&address, 0, sizeof(address)); address.sin_family = AF_INET; address.sin_port = htons(Port); #if defined(LOGIN) || defined(MINILOGIN) if (listen_ip_address) address.sin_addr.s_addr = inet_addr(listen_ip_address); else address.sin_addr.s_addr = htonl(INADDR_ANY); #else address.sin_addr.s_addr = htonl(INADDR_ANY); #endif // Setup UDP socket for new clients sock = socket(AF_INET, SOCK_DGRAM, 0); if (sock < 0) { return false; } if (::bind(sock, (struct sockaddr*)&address, sizeof(address)) < 0) { sock = -1; return false; } // Set socket to non-blocking mode fcntl(sock, F_SETFL, O_NONBLOCK); #ifdef LOGIN LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader"); LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer"); #elif WORLD LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader"); LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer"); #endif // Start threads for packet processing std::thread(&EQStreamFactory::ReaderLoop, this).detach(); std::thread(&EQStreamFactory::WriterLoop, this).detach(); std::thread(&EQStreamFactory::CombinePacketLoop, this).detach(); return true; } // Opens with specified port override bool Open(unsigned long port) { Port = port; return Open(); } // Closes the factory and cleans up all connections void Close() { CheckTimeout(true); Stop(); if (sock != -1) { close(sock); sock = -1; } } // Main reader loop - processes incoming packets from clients void ReaderLoop() { fd_set readset; std::map::iterator stream_itr; int num; int length; unsigned char buffer[2048]; sockaddr_in from; socklen_t socklen = sizeof(sockaddr_in); timeval sleep_time; ReaderRunning = true; while (sock != -1) { { std::lock_guard lock(MReaderRunning); if (!ReaderRunning) break; } FD_ZERO(&readset); FD_SET(sock, &readset); sleep_time.tv_sec = 30; sleep_time.tv_usec = 0; if ((num = select(sock + 1, &readset, nullptr, nullptr, &sleep_time)) < 0) { // Socket error occurred } else if (num == 0) { continue; // Timeout, no data available } if (FD_ISSET(sock, &readset)) { if ((length = recvfrom(sock, buffer, 2048, 0, (struct sockaddr*)&from, &socklen)) < 2) { // Invalid packet received } else { char temp[25]; sprintf(temp, "%u.%d", ntohl(from.sin_addr.s_addr), ntohs(from.sin_port)); std::lock_guard streams_lock(MStreams); if ((stream_itr = Streams.find(temp)) == Streams.end() || buffer[1] == OP_SessionRequest) { // New session request or existing session requesting new connection if (buffer[1] == OP_SessionRequest) { if (stream_itr != Streams.end() && stream_itr->second) stream_itr->second->SetState(CLOSED); EQStream* s = new EQStream(from); s->SetFactory(this); s->SetStreamType(StreamType); Streams[temp] = s; WriterWork.notify_one(); Push(s); s->Process(buffer, length); s->SetLastPacketTime(Timer::GetCurrentTime2()); } } else { EQStream* curstream = stream_itr->second; // Don't process packets for closed connections if (curstream->CheckClosed()) { curstream = nullptr; } else { curstream->PutInUse(); } if (curstream) { curstream->Process(buffer, length); curstream->SetLastPacketTime(Timer::GetCurrentTime2()); curstream->ReleaseFromUse(); } } } } } } // Checks for timed out connections and removes them void CheckTimeout(bool remove_all = false) { // Lock streams for the entire timeout check operation std::lock_guard lock(MStreams); unsigned long now = Timer::GetCurrentTime2(); std::map::iterator stream_itr; for (stream_itr = Streams.begin(); stream_itr != Streams.end();) { EQStream* s = stream_itr->second; EQStreamState state = s->GetState(); if (state == CLOSING && !s->HasOutgoingData()) { stream_itr->second->SetState(CLOSED); state = CLOSED; } else if (s->CheckTimeout(now, STREAM_TIMEOUT)) { const char* stateString; switch (state) { case ESTABLISHED: stateString = "Established"; break; case CLOSING: stateString = "Closing"; break; case CLOSED: stateString = "Closed"; break; case WAIT_CLOSE: stateString = "Wait-Close"; break; default: stateString = "Unknown"; break; } LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s (%u)", stateString, state); if (state == ESTABLISHED) { s->Close(); } else if (state == WAIT_CLOSE) { s->SetState(CLOSING); state = CLOSING; } else if (state == CLOSING) { // Timeout in closing state, force close s->SetState(CLOSED); state = CLOSED; } } // Check for closed connections to remove if (remove_all || state == CLOSED) { if (!remove_all && s->getTimeoutDelays() < 2) { s->addTimeoutDelay(); // Give time for other threads to finish with stream } else { // Safe to delete the stream now #ifdef LOGIN LogWrite(LOGIN__DEBUG, 0, "Login", "Removing connection..."); #else LogWrite(WORLD__DEBUG, 0, "World", "Removing connection..."); #endif std::map::iterator temp = stream_itr; stream_itr++; #ifdef WORLD client_list.RemoveConnection(temp->second); #endif EQStream* stream = temp->second; Streams.erase(temp); delete stream; continue; } } stream_itr++; } } // Processes packet combining for streams that need it void CombinePacketLoop() { std::deque combine_que; CombinePacketRunning = true; bool packets_waiting = false; while (sock != -1) { if (!CombinePacketRunning) break; { std::lock_guard lock(MStreams); std::map::iterator stream_itr; for (stream_itr = Streams.begin(); stream_itr != Streams.end(); stream_itr++) { if (!stream_itr->second) { continue; } if (stream_itr->second->combine_timer && stream_itr->second->combine_timer->Check()) combine_que.push_back(stream_itr->second); } } EQStream* stream = nullptr; packets_waiting = false; while (combine_que.size()) { stream = combine_que.front(); if (stream->CheckActive()) { if (!stream->CheckCombineQueue()) packets_waiting = true; } combine_que.pop_front(); } if (!packets_waiting) std::this_thread::sleep_for(std::chrono::milliseconds(25)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } // Main writer loop - sends outgoing packets to clients void WriterLoop() { std::map::iterator stream_itr; std::vector wants_write; std::vector::iterator cur, end; std::deque resend_que; bool decay = false; uint32_t stream_count; Timer DecayTimer(20); WriterRunning = true; DecayTimer.Enable(); while (sock != -1) { Timer::SetCurrentTime(); { std::lock_guard lock(MWriterRunning); if (!WriterRunning) break; } wants_write.clear(); decay = DecayTimer.Check(); // Copy streams into separate list to avoid keeping MStreams locked during writes { std::lock_guard lock(MStreams); for (stream_itr = Streams.begin(); stream_itr != Streams.end(); stream_itr++) { if (!stream_itr->second) { Streams.erase(stream_itr); break; } // Decay bytes sent if timer expired if (decay) stream_itr->second->Decay(); if (stream_itr->second->HasOutgoingData()) { stream_itr->second->PutInUse(); wants_write.push_back(stream_itr->second); } if (stream_itr->second->resend_que_timer->Check()) resend_que.push_back(stream_itr->second); } } // Perform actual packet writes cur = wants_write.begin(); end = wants_write.end(); for (; cur != end; cur++) { (*cur)->Write(sock); (*cur)->ReleaseFromUse(); } // Process resend queue while (resend_que.size()) { resend_que.front()->CheckResend(sock); resend_que.pop_front(); } std::this_thread::sleep_for(std::chrono::milliseconds(10)); { std::lock_guard lock(MStreams); stream_count = Streams.size(); } if (!stream_count) { // No streams available, wait for work signal std::unique_lock writer_lock(WriterWorkMutex); WriterWork.wait(writer_lock); } } } // Stops all factory operations void Stop() { StopReader(); StopWriter(); StopCombinePacket(); } // Stops the reader thread void StopReader() { std::lock_guard lock(MReaderRunning); ReaderRunning = false; } // Stops the writer thread void StopWriter() { { std::lock_guard lock(MWriterRunning); WriterRunning = false; } WriterWork.notify_one(); } // Stops the combine packet thread void StopCombinePacket() { std::lock_guard lock(MCombinePacketRunning); CombinePacketRunning = false; } // Signals the writer thread that work is available void SignalWriter() { WriterWork.notify_one(); } private: int sock; // Socket file descriptor int Port; // Port number to listen on bool ReaderRunning; // Reader thread running flag std::mutex MReaderRunning; // Mutex for reader thread state bool WriterRunning; // Writer thread running flag std::mutex MWriterRunning; // Mutex for writer thread state bool CombinePacketRunning; // Combine packet thread running flag std::mutex MCombinePacketRunning; // Mutex for combine packet thread state std::condition_variable WriterWork; // Condition variable for writer thread work signal std::mutex WriterWorkMutex; // Mutex for writer work condition variable EQStreamType StreamType; // Type of streams this factory creates std::queue NewStreams; // Queue of newly created streams std::mutex MNewStreams; // Mutex for new streams queue std::map Streams; // Map of active streams by IP:Port std::mutex MStreams; // Mutex for streams map };