500 lines
12 KiB
C++
500 lines
12 KiB
C++
// Copyright (C) 2007 EQ2EMulator Development Team - GPL v3 License
|
|
|
|
#pragma once
|
|
|
|
#include <queue>
|
|
#include <map>
|
|
#include <vector>
|
|
#include <deque>
|
|
#include <string>
|
|
#include <thread>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
#include <chrono>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <arpa/inet.h>
|
|
#include <unistd.h>
|
|
#include <fcntl.h>
|
|
|
|
#include "log.hpp"
|
|
#include "timer.hpp"
|
|
#include "eq_stream.hpp"
|
|
#include "../packet/packet_dump.hpp"
|
|
#include "../opcodes/opcodes.hpp"
|
|
#include "../opcodes/opcode_manager.hpp"
|
|
|
|
#ifdef WORLD
|
|
#include "../../WorldServer/client.h"
|
|
extern ClientList client_list;
|
|
#endif
|
|
|
|
#define STREAM_TIMEOUT 45000
|
|
|
|
class EQStreamFactory
|
|
{
|
|
public:
|
|
char* listen_ip_address;
|
|
|
|
// Default constructor - initializes factory with stream type only
|
|
EQStreamFactory(EQStreamType type)
|
|
{
|
|
ReaderRunning = false;
|
|
WriterRunning = false;
|
|
CombinePacketRunning = false;
|
|
StreamType = type;
|
|
Port = 0;
|
|
sock = -1;
|
|
listen_ip_address = nullptr;
|
|
}
|
|
|
|
// Constructor with port - initializes factory with stream type and port
|
|
EQStreamFactory(EQStreamType type, int port)
|
|
{
|
|
StreamType = type;
|
|
Port = port;
|
|
listen_ip_address = nullptr;
|
|
sock = -1;
|
|
ReaderRunning = false;
|
|
WriterRunning = false;
|
|
CombinePacketRunning = false;
|
|
}
|
|
|
|
// Destructor - cleans up allocated resources
|
|
~EQStreamFactory()
|
|
{
|
|
if (listen_ip_address) {
|
|
delete[] listen_ip_address;
|
|
}
|
|
}
|
|
|
|
// Retrieves the next available stream from the new streams queue
|
|
EQStream* Pop()
|
|
{
|
|
if (!NewStreams.size())
|
|
return nullptr;
|
|
|
|
EQStream* s = nullptr;
|
|
std::lock_guard<std::mutex> lock(MNewStreams);
|
|
if (NewStreams.size()) {
|
|
s = NewStreams.front();
|
|
NewStreams.pop();
|
|
s->PutInUse();
|
|
}
|
|
return s;
|
|
}
|
|
|
|
// Adds a new stream to the new streams queue
|
|
void Push(EQStream* s)
|
|
{
|
|
std::lock_guard<std::mutex> lock(MNewStreams);
|
|
NewStreams.push(s);
|
|
}
|
|
|
|
// Opens the socket and starts the reader/writer/combine packet threads
|
|
bool Open()
|
|
{
|
|
struct sockaddr_in address;
|
|
|
|
// Setup internet address information for bind() call
|
|
memset(&address, 0, sizeof(address));
|
|
address.sin_family = AF_INET;
|
|
address.sin_port = htons(Port);
|
|
|
|
#if defined(LOGIN) || defined(MINILOGIN)
|
|
if (listen_ip_address)
|
|
address.sin_addr.s_addr = inet_addr(listen_ip_address);
|
|
else
|
|
address.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
#else
|
|
address.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
#endif
|
|
|
|
// Setup UDP socket for new clients
|
|
sock = socket(AF_INET, SOCK_DGRAM, 0);
|
|
if (sock < 0) {
|
|
return false;
|
|
}
|
|
|
|
if (::bind(sock, (struct sockaddr*)&address, sizeof(address)) < 0) {
|
|
sock = -1;
|
|
return false;
|
|
}
|
|
|
|
// Set socket to non-blocking mode
|
|
fcntl(sock, F_SETFL, O_NONBLOCK);
|
|
|
|
#ifdef LOGIN
|
|
LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader");
|
|
LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer");
|
|
#elif WORLD
|
|
LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader");
|
|
LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer");
|
|
#endif
|
|
|
|
// Start threads for packet processing
|
|
std::thread(&EQStreamFactory::ReaderLoop, this).detach();
|
|
std::thread(&EQStreamFactory::WriterLoop, this).detach();
|
|
std::thread(&EQStreamFactory::CombinePacketLoop, this).detach();
|
|
|
|
return true;
|
|
}
|
|
|
|
// Opens with specified port override
|
|
bool Open(unsigned long port)
|
|
{
|
|
Port = port;
|
|
return Open();
|
|
}
|
|
|
|
// Closes the factory and cleans up all connections
|
|
void Close()
|
|
{
|
|
CheckTimeout(true);
|
|
Stop();
|
|
if (sock != -1) {
|
|
close(sock);
|
|
sock = -1;
|
|
}
|
|
}
|
|
|
|
// Main reader loop - processes incoming packets from clients
|
|
void ReaderLoop()
|
|
{
|
|
fd_set readset;
|
|
std::map<std::string, EQStream*>::iterator stream_itr;
|
|
int num;
|
|
int length;
|
|
unsigned char buffer[2048];
|
|
sockaddr_in from;
|
|
socklen_t socklen = sizeof(sockaddr_in);
|
|
timeval sleep_time;
|
|
|
|
ReaderRunning = true;
|
|
while (sock != -1) {
|
|
{
|
|
std::lock_guard<std::mutex> lock(MReaderRunning);
|
|
if (!ReaderRunning)
|
|
break;
|
|
}
|
|
|
|
FD_ZERO(&readset);
|
|
FD_SET(sock, &readset);
|
|
|
|
sleep_time.tv_sec = 30;
|
|
sleep_time.tv_usec = 0;
|
|
|
|
if ((num = select(sock + 1, &readset, nullptr, nullptr, &sleep_time)) < 0) {
|
|
// Socket error occurred
|
|
}
|
|
else if (num == 0) {
|
|
continue; // Timeout, no data available
|
|
}
|
|
|
|
if (FD_ISSET(sock, &readset)) {
|
|
if ((length = recvfrom(sock, buffer, 2048, 0, (struct sockaddr*)&from, &socklen)) < 2) {
|
|
// Invalid packet received
|
|
}
|
|
else {
|
|
char temp[25];
|
|
sprintf(temp, "%u.%d", ntohl(from.sin_addr.s_addr), ntohs(from.sin_port));
|
|
|
|
std::lock_guard<std::mutex> streams_lock(MStreams);
|
|
if ((stream_itr = Streams.find(temp)) == Streams.end() || buffer[1] == OP_SessionRequest) {
|
|
// New session request or existing session requesting new connection
|
|
if (buffer[1] == OP_SessionRequest) {
|
|
if (stream_itr != Streams.end() && stream_itr->second)
|
|
stream_itr->second->SetState(CLOSED);
|
|
|
|
EQStream* s = new EQStream(from);
|
|
s->SetFactory(this);
|
|
s->SetStreamType(StreamType);
|
|
Streams[temp] = s;
|
|
WriterWork.notify_one();
|
|
Push(s);
|
|
s->Process(buffer, length);
|
|
s->SetLastPacketTime(Timer::GetCurrentTime2());
|
|
}
|
|
}
|
|
else {
|
|
EQStream* curstream = stream_itr->second;
|
|
// Don't process packets for closed connections
|
|
if (curstream->CheckClosed()) {
|
|
curstream = nullptr;
|
|
}
|
|
else {
|
|
curstream->PutInUse();
|
|
}
|
|
|
|
if (curstream) {
|
|
curstream->Process(buffer, length);
|
|
curstream->SetLastPacketTime(Timer::GetCurrentTime2());
|
|
curstream->ReleaseFromUse();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Checks for timed out connections and removes them
|
|
void CheckTimeout(bool remove_all = false)
|
|
{
|
|
// Lock streams for the entire timeout check operation
|
|
std::lock_guard<std::mutex> lock(MStreams);
|
|
|
|
unsigned long now = Timer::GetCurrentTime2();
|
|
std::map<std::string, EQStream*>::iterator stream_itr;
|
|
|
|
for (stream_itr = Streams.begin(); stream_itr != Streams.end();) {
|
|
EQStream* s = stream_itr->second;
|
|
EQStreamState state = s->GetState();
|
|
|
|
if (state == CLOSING && !s->HasOutgoingData()) {
|
|
stream_itr->second->SetState(CLOSED);
|
|
state = CLOSED;
|
|
}
|
|
else if (s->CheckTimeout(now, STREAM_TIMEOUT)) {
|
|
const char* stateString;
|
|
switch (state) {
|
|
case ESTABLISHED:
|
|
stateString = "Established";
|
|
break;
|
|
case CLOSING:
|
|
stateString = "Closing";
|
|
break;
|
|
case CLOSED:
|
|
stateString = "Closed";
|
|
break;
|
|
case WAIT_CLOSE:
|
|
stateString = "Wait-Close";
|
|
break;
|
|
default:
|
|
stateString = "Unknown";
|
|
break;
|
|
}
|
|
LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s (%u)", stateString, state);
|
|
|
|
if (state == ESTABLISHED) {
|
|
s->Close();
|
|
}
|
|
else if (state == WAIT_CLOSE) {
|
|
s->SetState(CLOSING);
|
|
state = CLOSING;
|
|
}
|
|
else if (state == CLOSING) {
|
|
// Timeout in closing state, force close
|
|
s->SetState(CLOSED);
|
|
state = CLOSED;
|
|
}
|
|
}
|
|
|
|
// Check for closed connections to remove
|
|
if (remove_all || state == CLOSED) {
|
|
if (!remove_all && s->getTimeoutDelays() < 2) {
|
|
s->addTimeoutDelay();
|
|
// Give time for other threads to finish with stream
|
|
}
|
|
else {
|
|
// Safe to delete the stream now
|
|
#ifdef LOGIN
|
|
LogWrite(LOGIN__DEBUG, 0, "Login", "Removing connection...");
|
|
#else
|
|
LogWrite(WORLD__DEBUG, 0, "World", "Removing connection...");
|
|
#endif
|
|
std::map<std::string, EQStream*>::iterator temp = stream_itr;
|
|
stream_itr++;
|
|
|
|
#ifdef WORLD
|
|
client_list.RemoveConnection(temp->second);
|
|
#endif
|
|
EQStream* stream = temp->second;
|
|
Streams.erase(temp);
|
|
delete stream;
|
|
continue;
|
|
}
|
|
}
|
|
stream_itr++;
|
|
}
|
|
}
|
|
|
|
// Processes packet combining for streams that need it
|
|
void CombinePacketLoop()
|
|
{
|
|
std::deque<EQStream*> combine_que;
|
|
CombinePacketRunning = true;
|
|
bool packets_waiting = false;
|
|
|
|
while (sock != -1) {
|
|
if (!CombinePacketRunning)
|
|
break;
|
|
|
|
{
|
|
std::lock_guard<std::mutex> lock(MStreams);
|
|
std::map<std::string, EQStream*>::iterator stream_itr;
|
|
for (stream_itr = Streams.begin(); stream_itr != Streams.end(); stream_itr++) {
|
|
if (!stream_itr->second) {
|
|
continue;
|
|
}
|
|
if (stream_itr->second->combine_timer && stream_itr->second->combine_timer->Check())
|
|
combine_que.push_back(stream_itr->second);
|
|
}
|
|
}
|
|
|
|
EQStream* stream = nullptr;
|
|
packets_waiting = false;
|
|
while (combine_que.size()) {
|
|
stream = combine_que.front();
|
|
if (stream->CheckActive()) {
|
|
if (!stream->CheckCombineQueue())
|
|
packets_waiting = true;
|
|
}
|
|
combine_que.pop_front();
|
|
}
|
|
|
|
if (!packets_waiting)
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(25));
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
|
}
|
|
}
|
|
|
|
// Main writer loop - sends outgoing packets to clients
|
|
void WriterLoop()
|
|
{
|
|
std::map<std::string, EQStream*>::iterator stream_itr;
|
|
std::vector<EQStream*> wants_write;
|
|
std::vector<EQStream*>::iterator cur, end;
|
|
std::deque<EQStream*> resend_que;
|
|
bool decay = false;
|
|
uint32_t stream_count;
|
|
|
|
Timer DecayTimer(20);
|
|
|
|
WriterRunning = true;
|
|
DecayTimer.Enable();
|
|
|
|
while (sock != -1) {
|
|
Timer::SetCurrentTime();
|
|
|
|
{
|
|
std::lock_guard<std::mutex> lock(MWriterRunning);
|
|
if (!WriterRunning)
|
|
break;
|
|
}
|
|
|
|
wants_write.clear();
|
|
decay = DecayTimer.Check();
|
|
|
|
// Copy streams into separate list to avoid keeping MStreams locked during writes
|
|
{
|
|
std::lock_guard<std::mutex> lock(MStreams);
|
|
for (stream_itr = Streams.begin(); stream_itr != Streams.end(); stream_itr++) {
|
|
if (!stream_itr->second) {
|
|
Streams.erase(stream_itr);
|
|
break;
|
|
}
|
|
|
|
// Decay bytes sent if timer expired
|
|
if (decay)
|
|
stream_itr->second->Decay();
|
|
|
|
if (stream_itr->second->HasOutgoingData()) {
|
|
stream_itr->second->PutInUse();
|
|
wants_write.push_back(stream_itr->second);
|
|
}
|
|
|
|
if (stream_itr->second->resend_que_timer->Check())
|
|
resend_que.push_back(stream_itr->second);
|
|
}
|
|
}
|
|
|
|
// Perform actual packet writes
|
|
cur = wants_write.begin();
|
|
end = wants_write.end();
|
|
for (; cur != end; cur++) {
|
|
(*cur)->Write(sock);
|
|
(*cur)->ReleaseFromUse();
|
|
}
|
|
|
|
// Process resend queue
|
|
while (resend_que.size()) {
|
|
resend_que.front()->CheckResend(sock);
|
|
resend_que.pop_front();
|
|
}
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
|
|
{
|
|
std::lock_guard<std::mutex> lock(MStreams);
|
|
stream_count = Streams.size();
|
|
}
|
|
|
|
if (!stream_count) {
|
|
// No streams available, wait for work signal
|
|
std::unique_lock<std::mutex> writer_lock(WriterWorkMutex);
|
|
WriterWork.wait(writer_lock);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stops all factory operations
|
|
void Stop()
|
|
{
|
|
StopReader();
|
|
StopWriter();
|
|
StopCombinePacket();
|
|
}
|
|
|
|
// Stops the reader thread
|
|
void StopReader()
|
|
{
|
|
std::lock_guard<std::mutex> lock(MReaderRunning);
|
|
ReaderRunning = false;
|
|
}
|
|
|
|
// Stops the writer thread
|
|
void StopWriter()
|
|
{
|
|
{
|
|
std::lock_guard<std::mutex> lock(MWriterRunning);
|
|
WriterRunning = false;
|
|
}
|
|
WriterWork.notify_one();
|
|
}
|
|
|
|
// Stops the combine packet thread
|
|
void StopCombinePacket()
|
|
{
|
|
std::lock_guard<std::mutex> lock(MCombinePacketRunning);
|
|
CombinePacketRunning = false;
|
|
}
|
|
|
|
// Signals the writer thread that work is available
|
|
void SignalWriter()
|
|
{
|
|
WriterWork.notify_one();
|
|
}
|
|
|
|
private:
|
|
int sock; // Socket file descriptor
|
|
int Port; // Port number to listen on
|
|
|
|
bool ReaderRunning; // Reader thread running flag
|
|
std::mutex MReaderRunning; // Mutex for reader thread state
|
|
bool WriterRunning; // Writer thread running flag
|
|
std::mutex MWriterRunning; // Mutex for writer thread state
|
|
bool CombinePacketRunning; // Combine packet thread running flag
|
|
std::mutex MCombinePacketRunning; // Mutex for combine packet thread state
|
|
|
|
std::condition_variable WriterWork; // Condition variable for writer thread work signal
|
|
std::mutex WriterWorkMutex; // Mutex for writer work condition variable
|
|
|
|
EQStreamType StreamType; // Type of streams this factory creates
|
|
|
|
std::queue<EQStream*> NewStreams; // Queue of newly created streams
|
|
std::mutex MNewStreams; // Mutex for new streams queue
|
|
|
|
std::map<std::string, EQStream*> Streams; // Map of active streams by IP:Port
|
|
std::mutex MStreams; // Mutex for streams map
|
|
}; |