// EQ2Emulator: Everquest II Server Emulator // Copyright (C) 2007 EQ2EMulator Development Team // Licensed under GPL v3 - see #include "EQStreamFactory.h" #include "Log.h" // Unix/Linux networking headers #include #include #include #include #include #include #include #include // Standard library headers #include #include #include #include #include #include // Project headers #include "op_codes.h" #include "EQStream.h" #include "packet_dump.h" #ifdef WORLD #include "../WorldServer/client.h" #endif using namespace std; #ifdef WORLD extern ClientList client_list; #endif /** * Thread entry point for the packet reader loop. * * @param eqfs - Pointer to EQStreamFactory instance * @return Thread return value */ ThreadReturnType EQStreamFactoryReaderLoop(void* eqfs) { if (eqfs) { auto fs = static_cast(eqfs); fs->ReaderLoop(); } THREAD_RETURN(nullptr); } /** * Thread entry point for the packet writer loop. * * @param eqfs - Pointer to EQStreamFactory instance * @return Thread return value */ ThreadReturnType EQStreamFactoryWriterLoop(void* eqfs) { if (eqfs) { auto fs = static_cast(eqfs); fs->WriterLoop(); } THREAD_RETURN(nullptr); } /** * Thread entry point for the packet combining loop. * * @param eqfs - Pointer to EQStreamFactory instance * @return Thread return value */ ThreadReturnType EQStreamFactoryCombinePacketLoop(void* eqfs) { if (eqfs) { auto fs = static_cast(eqfs); fs->CombinePacketLoop(); } THREAD_RETURN(nullptr); } /** * EQStreamFactory constructor with stream type and port. * * @param type - Type of streams to create (login/world) * @param port - Port number to listen on */ EQStreamFactory::EQStreamFactory(EQStreamType type, int port) { StreamType = type; Port = port; listen_ip_address = nullptr; sock = -1; ReaderRunning = false; WriterRunning = false; CombinePacketRunning = false; DecayTimer = nullptr; } /** * Close the stream factory and clean up all resources. * Stops all threads, closes socket, and removes all streams. */ void EQStreamFactory::Close() { CheckTimeout(true); Stop(); if (sock != -1) { close(sock); sock = -1; } } /** * Open the UDP socket and start worker threads. * Creates reader, writer, and packet combiner threads. * * @return true if successful, false on error */ bool EQStreamFactory::Open() { struct sockaddr_in address; pthread_t t1, t2, t3; // Setup socket address structure memset(reinterpret_cast(&address), 0, sizeof(address)); address.sin_family = AF_INET; address.sin_port = htons(Port); // Set bind address based on configuration #if defined(LOGIN) || defined(MINILOGIN) if (listen_ip_address) { address.sin_addr.s_addr = inet_addr(listen_ip_address); } else { address.sin_addr.s_addr = htonl(INADDR_ANY); } #else address.sin_addr.s_addr = htonl(INADDR_ANY); #endif // Create UDP socket sock = socket(AF_INET, SOCK_DGRAM, 0); if (sock < 0) { return false; } // Bind socket to address if (::bind(sock, reinterpret_cast(&address), sizeof(address)) < 0) { close(sock); sock = -1; return false; } // Set socket to non-blocking mode fcntl(sock, F_SETFL, O_NONBLOCK); // Log thread startup #ifdef LOGIN LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader"); LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer"); #elif WORLD LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader"); LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer"); #endif // Create and detach worker threads pthread_create(&t1, nullptr, EQStreamFactoryReaderLoop, this); pthread_create(&t2, nullptr, EQStreamFactoryWriterLoop, this); pthread_create(&t3, nullptr, EQStreamFactoryCombinePacketLoop, this); pthread_detach(t1); pthread_detach(t2); pthread_detach(t3); return true; } /** * Get the next new stream from the queue. * Thread-safe method to retrieve newly created streams. * * @return Next EQStream from queue, or nullptr if none available */ EQStream* EQStreamFactory::Pop() { if (!NewStreams.size()) { return nullptr; } EQStream* s = nullptr; MNewStreams.lock(); if (NewStreams.size()) { s = NewStreams.front(); NewStreams.pop(); s->PutInUse(); } MNewStreams.unlock(); return s; } /** * Add a new stream to the queue for processing. * Thread-safe method to queue newly created streams. * * @param s - EQStream to add to queue */ void EQStreamFactory::Push(EQStream* s) { MNewStreams.lock(); NewStreams.push(s); MNewStreams.unlock(); } /** * Main packet reading loop - runs in separate thread. * Receives UDP packets and routes them to appropriate streams. */ void EQStreamFactory::ReaderLoop() { fd_set readset; map::iterator stream_itr; int num; int length; unsigned char buffer[2048]; sockaddr_in from; socklen_t socklen = sizeof(sockaddr_in); timeval sleep_time; ReaderRunning = true; while (sock != -1) { MReaderRunning.lock(); if (!ReaderRunning) { MReaderRunning.unlock(); break; } MReaderRunning.unlock(); // Setup select() for socket monitoring FD_ZERO(&readset); FD_SET(sock, &readset); sleep_time.tv_sec = 30; sleep_time.tv_usec = 0; // Wait for incoming data or timeout num = select(sock + 1, &readset, nullptr, nullptr, &sleep_time); if (num < 0) { // Select error - could log this continue; } else if (num == 0) { // Timeout - continue loop continue; } // Check if our socket has data if (FD_ISSET(sock, &readset)) { length = recvfrom(sock, buffer, 2048, 0, reinterpret_cast(&from), &socklen); if (length < 2) { // Packet too small - ignore continue; } // Create address:port string for stream identification char temp[25]; snprintf(temp, sizeof(temp), "%u.%d", ntohl(from.sin_addr.s_addr), ntohs(from.sin_port)); MStreams.lock(); stream_itr = Streams.find(temp); // Handle new connections or session requests if (stream_itr == Streams.end() || buffer[1] == OP_SessionRequest) { MStreams.unlock(); if (buffer[1] == OP_SessionRequest) { // Close existing stream if present if (stream_itr != Streams.end() && stream_itr->second) { stream_itr->second->SetState(CLOSED); } // Create new stream auto s = new EQStream(from); s->SetFactory(this); s->SetStreamType(StreamType); Streams[temp] = s; WriterWork.notify_one(); Push(s); s->Process(buffer, length); s->SetLastPacketTime(Timer::GetCurrentTime2()); } } else { // Route packet to existing stream EQStream* curstream = stream_itr->second; // Skip closed connections if (curstream->CheckClosed()) { curstream = nullptr; } else { curstream->PutInUse(); } MStreams.unlock(); if (curstream) { curstream->Process(buffer, length); curstream->SetLastPacketTime(Timer::GetCurrentTime2()); curstream->ReleaseFromUse(); } } } } } /** * Check for timed out streams and clean up closed connections. * * @param remove_all - If true, remove all streams regardless of state */ void EQStreamFactory::CheckTimeout(bool remove_all) { // Lock streams for the entire timeout check - should be fast MStreams.lock(); unsigned long now = Timer::GetCurrentTime2(); map::iterator stream_itr; for (stream_itr = Streams.begin(); stream_itr != Streams.end();) { EQStream* s = stream_itr->second; EQStreamState state = s->GetState(); // Transition CLOSING streams to CLOSED when no outgoing data if (state == CLOSING && !s->HasOutgoingData()) { stream_itr->second->SetState(CLOSED); state = CLOSED; } else if (s->CheckTimeout(now, STREAM_TIMEOUT)) { // Handle timeout based on current state const char* stateString; switch (state) { case ESTABLISHED: stateString = "Established"; break; case CLOSING: stateString = "Closing"; break; case CLOSED: stateString = "Closed"; break; case WAIT_CLOSE: stateString = "Wait-Close"; break; default: stateString = "Unknown"; break; } LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s (%u)", stateString, state); if (state == ESTABLISHED) { s->Close(); } else if (state == WAIT_CLOSE) { s->SetState(CLOSING); state = CLOSING; } else if (state == CLOSING) { // If we timeout in closing state, force close s->SetState(CLOSED); state = CLOSED; } } // Remove closed streams (check immediately after state changes) if (remove_all || state == CLOSED) { if (!remove_all && s->getTimeoutDelays() < 2) { s->addTimeoutDelay(); // Give other threads time to finish with this stream ++stream_itr; } else { // Safe to delete now #ifdef LOGIN LogWrite(LOGIN__DEBUG, 0, "Login", "Removing connection..."); #else LogWrite(WORLD__DEBUG, 0, "World", "Removing connection..."); #endif auto temp = stream_itr; ++stream_itr; // Let client list handle cleanup if in world server #ifdef WORLD client_list.RemoveConnection(temp->second); #endif EQStream* stream = temp->second; Streams.erase(temp); delete stream; continue; } } else { ++stream_itr; } } MStreams.unlock(); } /** * Packet combining optimization loop - runs in separate thread. * Combines multiple small packets for efficient transmission. */ void EQStreamFactory::CombinePacketLoop() { deque combine_que; CombinePacketRunning = true; bool packets_waiting = false; while (sock != -1) { if (!CombinePacketRunning) { break; } MStreams.lock(); // Check all streams for combine timer expiration for (auto& stream_pair : Streams) { if (!stream_pair.second) { continue; } if (stream_pair.second->combine_timer && stream_pair.second->combine_timer->Check()) { combine_que.push_back(stream_pair.second); } } // Process streams that need packet combining packets_waiting = false; while (!combine_que.empty()) { EQStream* stream = combine_que.front(); if (stream->CheckActive()) { if (!stream->CheckCombineQueue()) { packets_waiting = true; } } combine_que.pop_front(); } MStreams.unlock(); // Sleep longer if no packets are waiting if (!packets_waiting) { usleep(25000); // 25ms } usleep(1000); // 1ms } } /** * Main packet writing loop - runs in separate thread. * Handles outgoing packet transmission and resends. */ void EQStreamFactory::WriterLoop() { map::iterator stream_itr; vector wants_write; vector::iterator cur, end; deque resend_que; bool decay = false; uint32 stream_count; Timer DecayTimer(20); WriterRunning = true; DecayTimer.Enable(); while (sock != -1) { Timer::SetCurrentTime(); MWriterRunning.lock(); if (!WriterRunning) { MWriterRunning.unlock(); break; } MWriterRunning.unlock(); wants_write.clear(); resend_que.clear(); decay = DecayTimer.Check(); // Copy streams into separate list to minimize lock time MStreams.lock(); for (stream_itr = Streams.begin(); stream_itr != Streams.end(); ++stream_itr) { if (!stream_itr->second) { // This shouldn't happen, but handle gracefully continue; } // Apply bandwidth decay if it's time if (decay) { stream_itr->second->Decay(); } // Queue streams with outgoing data if (stream_itr->second->HasOutgoingData()) { stream_itr->second->PutInUse(); wants_write.push_back(stream_itr->second); } // Queue streams that need resend processing if (stream_itr->second->resend_que_timer->Check()) { resend_que.push_back(stream_itr->second); } } MStreams.unlock(); // Perform actual packet writes for (cur = wants_write.begin(), end = wants_write.end(); cur != end; ++cur) { (*cur)->Write(sock); (*cur)->ReleaseFromUse(); } // Handle packet resends while (!resend_que.empty()) { resend_que.front()->CheckResend(sock); resend_que.pop_front(); } usleep(10000); // 10ms sleep // Check if we have any streams - wait if not MStreams.lock(); stream_count = Streams.size(); MStreams.unlock(); if (!stream_count) { std::unique_lock lock(WriterWorkMutex); WriterWork.wait(lock); } } }