eq2go/old/common/stream/eq_stream_factory.hpp
2025-08-06 19:00:30 -05:00

500 lines
12 KiB
C++

// Copyright (C) 2007 EQ2EMulator Development Team - GPL v3 License
#pragma once
#include <queue>
#include <map>
#include <vector>
#include <deque>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include "log.hpp"
#include "timer.hpp"
#include "eq_stream.hpp"
#include "../packet/packet_dump.hpp"
#include "../opcodes/opcodes.hpp"
#include "../opcodes/opcode_manager.hpp"
#ifdef WORLD
#include "../../WorldServer/client.h"
extern ClientList client_list;
#endif
#define STREAM_TIMEOUT 45000
class EQStreamFactory
{
public:
char* listen_ip_address;
// Default constructor - initializes factory with stream type only
EQStreamFactory(EQStreamType type)
{
ReaderRunning = false;
WriterRunning = false;
CombinePacketRunning = false;
StreamType = type;
Port = 0;
sock = -1;
listen_ip_address = nullptr;
}
// Constructor with port - initializes factory with stream type and port
EQStreamFactory(EQStreamType type, int port)
{
StreamType = type;
Port = port;
listen_ip_address = nullptr;
sock = -1;
ReaderRunning = false;
WriterRunning = false;
CombinePacketRunning = false;
}
// Destructor - cleans up allocated resources
~EQStreamFactory()
{
if (listen_ip_address) {
delete[] listen_ip_address;
}
}
// Retrieves the next available stream from the new streams queue
EQStream* Pop()
{
if (!NewStreams.size())
return nullptr;
EQStream* s = nullptr;
std::lock_guard<std::mutex> lock(MNewStreams);
if (NewStreams.size()) {
s = NewStreams.front();
NewStreams.pop();
s->PutInUse();
}
return s;
}
// Adds a new stream to the new streams queue
void Push(EQStream* s)
{
std::lock_guard<std::mutex> lock(MNewStreams);
NewStreams.push(s);
}
// Opens the socket and starts the reader/writer/combine packet threads
bool Open()
{
struct sockaddr_in address;
// Setup internet address information for bind() call
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(Port);
#if defined(LOGIN) || defined(MINILOGIN)
if (listen_ip_address)
address.sin_addr.s_addr = inet_addr(listen_ip_address);
else
address.sin_addr.s_addr = htonl(INADDR_ANY);
#else
address.sin_addr.s_addr = htonl(INADDR_ANY);
#endif
// Setup UDP socket for new clients
sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
return false;
}
if (::bind(sock, (struct sockaddr*)&address, sizeof(address)) < 0) {
sock = -1;
return false;
}
// Set socket to non-blocking mode
fcntl(sock, F_SETFL, O_NONBLOCK);
#ifdef LOGIN
LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader");
LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer");
#elif WORLD
LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader");
LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer");
#endif
// Start threads for packet processing
std::thread(&EQStreamFactory::ReaderLoop, this).detach();
std::thread(&EQStreamFactory::WriterLoop, this).detach();
std::thread(&EQStreamFactory::CombinePacketLoop, this).detach();
return true;
}
// Opens with specified port override
bool Open(unsigned long port)
{
Port = port;
return Open();
}
// Closes the factory and cleans up all connections
void Close()
{
CheckTimeout(true);
Stop();
if (sock != -1) {
close(sock);
sock = -1;
}
}
// Main reader loop - processes incoming packets from clients
void ReaderLoop()
{
fd_set readset;
std::map<std::string, EQStream*>::iterator stream_itr;
int num;
int length;
unsigned char buffer[2048];
sockaddr_in from;
socklen_t socklen = sizeof(sockaddr_in);
timeval sleep_time;
ReaderRunning = true;
while (sock != -1) {
{
std::lock_guard<std::mutex> lock(MReaderRunning);
if (!ReaderRunning)
break;
}
FD_ZERO(&readset);
FD_SET(sock, &readset);
sleep_time.tv_sec = 30;
sleep_time.tv_usec = 0;
if ((num = select(sock + 1, &readset, nullptr, nullptr, &sleep_time)) < 0) {
// Socket error occurred
}
else if (num == 0) {
continue; // Timeout, no data available
}
if (FD_ISSET(sock, &readset)) {
if ((length = recvfrom(sock, buffer, 2048, 0, (struct sockaddr*)&from, &socklen)) < 2) {
// Invalid packet received
}
else {
char temp[25];
sprintf(temp, "%u.%d", ntohl(from.sin_addr.s_addr), ntohs(from.sin_port));
std::lock_guard<std::mutex> streams_lock(MStreams);
if ((stream_itr = Streams.find(temp)) == Streams.end() || buffer[1] == OP_SessionRequest) {
// New session request or existing session requesting new connection
if (buffer[1] == OP_SessionRequest) {
if (stream_itr != Streams.end() && stream_itr->second)
stream_itr->second->SetState(CLOSED);
EQStream* s = new EQStream(from);
s->SetFactory(this);
s->SetStreamType(StreamType);
Streams[temp] = s;
WriterWork.notify_one();
Push(s);
s->Process(buffer, length);
s->SetLastPacketTime(Timer::GetCurrentTime2());
}
}
else {
EQStream* curstream = stream_itr->second;
// Don't process packets for closed connections
if (curstream->CheckClosed()) {
curstream = nullptr;
}
else {
curstream->PutInUse();
}
if (curstream) {
curstream->Process(buffer, length);
curstream->SetLastPacketTime(Timer::GetCurrentTime2());
curstream->ReleaseFromUse();
}
}
}
}
}
}
// Checks for timed out connections and removes them
void CheckTimeout(bool remove_all = false)
{
// Lock streams for the entire timeout check operation
std::lock_guard<std::mutex> lock(MStreams);
unsigned long now = Timer::GetCurrentTime2();
std::map<std::string, EQStream*>::iterator stream_itr;
for (stream_itr = Streams.begin(); stream_itr != Streams.end();) {
EQStream* s = stream_itr->second;
EQStreamState state = s->GetState();
if (state == CLOSING && !s->HasOutgoingData()) {
stream_itr->second->SetState(CLOSED);
state = CLOSED;
}
else if (s->CheckTimeout(now, STREAM_TIMEOUT)) {
const char* stateString;
switch (state) {
case ESTABLISHED:
stateString = "Established";
break;
case CLOSING:
stateString = "Closing";
break;
case CLOSED:
stateString = "Closed";
break;
case WAIT_CLOSE:
stateString = "Wait-Close";
break;
default:
stateString = "Unknown";
break;
}
LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s (%u)", stateString, state);
if (state == ESTABLISHED) {
s->Close();
}
else if (state == WAIT_CLOSE) {
s->SetState(CLOSING);
state = CLOSING;
}
else if (state == CLOSING) {
// Timeout in closing state, force close
s->SetState(CLOSED);
state = CLOSED;
}
}
// Check for closed connections to remove
if (remove_all || state == CLOSED) {
if (!remove_all && s->getTimeoutDelays() < 2) {
s->addTimeoutDelay();
// Give time for other threads to finish with stream
}
else {
// Safe to delete the stream now
#ifdef LOGIN
LogWrite(LOGIN__DEBUG, 0, "Login", "Removing connection...");
#else
LogWrite(WORLD__DEBUG, 0, "World", "Removing connection...");
#endif
std::map<std::string, EQStream*>::iterator temp = stream_itr;
stream_itr++;
#ifdef WORLD
client_list.RemoveConnection(temp->second);
#endif
EQStream* stream = temp->second;
Streams.erase(temp);
delete stream;
continue;
}
}
stream_itr++;
}
}
// Processes packet combining for streams that need it
void CombinePacketLoop()
{
std::deque<EQStream*> combine_que;
CombinePacketRunning = true;
bool packets_waiting = false;
while (sock != -1) {
if (!CombinePacketRunning)
break;
{
std::lock_guard<std::mutex> lock(MStreams);
std::map<std::string, EQStream*>::iterator stream_itr;
for (stream_itr = Streams.begin(); stream_itr != Streams.end(); stream_itr++) {
if (!stream_itr->second) {
continue;
}
if (stream_itr->second->combine_timer && stream_itr->second->combine_timer->Check())
combine_que.push_back(stream_itr->second);
}
}
EQStream* stream = nullptr;
packets_waiting = false;
while (combine_que.size()) {
stream = combine_que.front();
if (stream->CheckActive()) {
if (!stream->CheckCombineQueue())
packets_waiting = true;
}
combine_que.pop_front();
}
if (!packets_waiting)
std::this_thread::sleep_for(std::chrono::milliseconds(25));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
// Main writer loop - sends outgoing packets to clients
void WriterLoop()
{
std::map<std::string, EQStream*>::iterator stream_itr;
std::vector<EQStream*> wants_write;
std::vector<EQStream*>::iterator cur, end;
std::deque<EQStream*> resend_que;
bool decay = false;
uint32_t stream_count;
Timer DecayTimer(20);
WriterRunning = true;
DecayTimer.Enable();
while (sock != -1) {
Timer::SetCurrentTime();
{
std::lock_guard<std::mutex> lock(MWriterRunning);
if (!WriterRunning)
break;
}
wants_write.clear();
decay = DecayTimer.Check();
// Copy streams into separate list to avoid keeping MStreams locked during writes
{
std::lock_guard<std::mutex> lock(MStreams);
for (stream_itr = Streams.begin(); stream_itr != Streams.end(); stream_itr++) {
if (!stream_itr->second) {
Streams.erase(stream_itr);
break;
}
// Decay bytes sent if timer expired
if (decay)
stream_itr->second->Decay();
if (stream_itr->second->HasOutgoingData()) {
stream_itr->second->PutInUse();
wants_write.push_back(stream_itr->second);
}
if (stream_itr->second->resend_que_timer->Check())
resend_que.push_back(stream_itr->second);
}
}
// Perform actual packet writes
cur = wants_write.begin();
end = wants_write.end();
for (; cur != end; cur++) {
(*cur)->Write(sock);
(*cur)->ReleaseFromUse();
}
// Process resend queue
while (resend_que.size()) {
resend_que.front()->CheckResend(sock);
resend_que.pop_front();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
{
std::lock_guard<std::mutex> lock(MStreams);
stream_count = Streams.size();
}
if (!stream_count) {
// No streams available, wait for work signal
std::unique_lock<std::mutex> writer_lock(WriterWorkMutex);
WriterWork.wait(writer_lock);
}
}
}
// Stops all factory operations
void Stop()
{
StopReader();
StopWriter();
StopCombinePacket();
}
// Stops the reader thread
void StopReader()
{
std::lock_guard<std::mutex> lock(MReaderRunning);
ReaderRunning = false;
}
// Stops the writer thread
void StopWriter()
{
{
std::lock_guard<std::mutex> lock(MWriterRunning);
WriterRunning = false;
}
WriterWork.notify_one();
}
// Stops the combine packet thread
void StopCombinePacket()
{
std::lock_guard<std::mutex> lock(MCombinePacketRunning);
CombinePacketRunning = false;
}
// Signals the writer thread that work is available
void SignalWriter()
{
WriterWork.notify_one();
}
private:
int sock; // Socket file descriptor
int Port; // Port number to listen on
bool ReaderRunning; // Reader thread running flag
std::mutex MReaderRunning; // Mutex for reader thread state
bool WriterRunning; // Writer thread running flag
std::mutex MWriterRunning; // Mutex for writer thread state
bool CombinePacketRunning; // Combine packet thread running flag
std::mutex MCombinePacketRunning; // Mutex for combine packet thread state
std::condition_variable WriterWork; // Condition variable for writer thread work signal
std::mutex WriterWorkMutex; // Mutex for writer work condition variable
EQStreamType StreamType; // Type of streams this factory creates
std::queue<EQStream*> NewStreams; // Queue of newly created streams
std::mutex MNewStreams; // Mutex for new streams queue
std::map<std::string, EQStream*> Streams; // Map of active streams by IP:Port
std::mutex MStreams; // Mutex for streams map
};