eq2go/old/common/database/database.hpp
2025-08-06 19:00:30 -05:00

392 lines
10 KiB
C++

// Copyright (C) 2007 EQ2EMulator Development Team - GPL v3 License
#pragma once
#include <map>
#include <vector>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "database_core.hpp"
#include "query.hpp"
#include "../stream/eq_stream.hpp"
#include "../packet/packet_functions.hpp"
#include "../opcodes/emu_opcodes.hpp"
#include "../eq_emu_error.hpp"
#include "../packet/packet_dump.hpp"
#ifdef WORLD
#include "../../WorldServer/WorldDatabase.h"
#endif
#ifdef LOGIN
#include "../../LoginServer/login_database.hpp"
#endif
#ifdef PARSER
#include "../PacketParser/ParserDatabase.h"
#endif
#ifdef PATCHER
#include "../PatchServer/PatcherDatabase.h"
#endif
using namespace std;
// Forward declaration
class Query;
// Structure for async database operations
typedef struct
{
int32 queryid;
} DBStruct;
// Advanced database class with async processing and game-specific functionality
class Database : public DatabaseCore
{
public:
// Constructor - initializes database variables
Database()
{
#ifdef WORLD
continueAsync = false;
#endif
}
// Destructor - cleans up resources and async queries
~Database()
{
#ifdef WORLD
DBQueryMutex.writelock(__FUNCTION__, __LINE__);
activeQuerySessions.clear();
DBQueryMutex.releasewritelock(__FUNCTION__, __LINE__);
DBAsyncMutex.writelock();
continueAsync = false;
for (auto& itr : asyncQueries) {
asyncQueriesMutex[itr.first]->writelock();
auto& queries = itr.second;
while (queries.size() > 0) {
Query* cur = queries.front();
queries.pop_front();
safe_delete(cur);
}
asyncQueriesMutex[itr.first]->releasewritelock();
Mutex* mutex = asyncQueriesMutex[itr.first];
asyncQueriesMutex.erase(itr.first);
safe_delete(mutex);
}
asyncQueries.clear();
asyncQueriesMutex.clear();
DBAsyncMutex.releasewritelock();
PurgeDBInstances();
#endif
}
// Initialize database connection with optional silent loading
bool Init(bool silentLoad = false)
{
return Connect();
}
// Comprehensive query execution method - inherits from DatabaseCore but ensures Database access
using DatabaseCore::RunQuery;
// Retrieve opcode mappings for specified client version
map<string, uint16> GetOpcodes(int16 version)
{
map<string, uint16> opcodes;
DatabaseResult result;
if (Select(&result, "select name, opcode from opcodes where %i between version_range1 and version_range2 order by version_range1, id", version)) {
while (result.Next()) {
opcodes[result.GetStringStr("name")] = result.GetInt16Str("opcode");
}
}
return opcodes;
}
// Get available client version ranges
map<int16, int16> GetVersions()
{
map<int16, int16> versions;
DatabaseResult result;
if (Select(&result, "select distinct version_range1, version_range2 from opcodes")) {
while (result.Next()) {
versions[result.GetInt16Str("version_range1")] = result.GetInt16Str("version_range2");
}
}
return versions;
}
// Authenticate web user credentials and return user ID
int32 AuthenticateWebUser(char* userName, char* passwd, int32* status = 0)
{
if (status) {
*status = 0;
}
int32 id = 0;
DatabaseResult result;
if (Select(&result, "select id, status from web_users where username='%s' and passwd = sha2('%s', 512)", EscapeStr(userName).c_str(), EscapeStr(passwd).c_str())) {
if (result.Next()) {
id = result.GetInt32Str("id");
if (status) {
*status = result.GetInt32Str("status");
}
}
}
return id;
}
// Check if route requires authentication
int32 NoAuthRoute(char* route)
{
int32 status = 0xFFFFFFFF;
DatabaseResult result;
if (Select(&result, "select status from web_routes where route='%s'", EscapeStr(route).c_str())) {
if (result.Next()) {
status = result.GetInt32Str("status");
}
}
return status;
}
// Handle MySQL error codes with appropriate logging
void HandleMysqlError(int32 errnum)
{
switch (errnum) {
case 0:
break;
case 1045: // Access Denied
case 2001: {
AddEQEMuError(EQEMuError_Mysql_1405, true);
break;
}
case 2003: { // Unable to connect
AddEQEMuError(EQEMuError_Mysql_2003, true);
break;
}
case 2005: { // Unable to connect
AddEQEMuError(EQEMuError_Mysql_2005, true);
break;
}
case 2007: { // Unable to connect
AddEQEMuError(EQEMuError_Mysql_2007, true);
break;
}
}
}
#ifdef WORLD
// Add query to async processing queue
void AddAsyncQuery(Query* query)
{
DBAsyncMutex.writelock();
map<int32, Mutex*>::iterator mutexItr = asyncQueriesMutex.find(query->GetQueryID());
if (mutexItr == asyncQueriesMutex.end()) {
Mutex* queryMutex = new Mutex();
queryMutex->SetName("AsyncQuery" + query->GetQueryID());
asyncQueriesMutex.insert(make_pair(query->GetQueryID(), queryMutex));
}
map<int32, deque<Query*>>::iterator itr = asyncQueries.find(query->GetQueryID());
asyncQueriesMutex[query->GetQueryID()]->writelock();
if (itr != asyncQueries.end())
itr->second.push_back(query);
else {
deque<Query*> queue;
queue.push_back(query);
asyncQueries.insert(make_pair(query->GetQueryID(), queue));
}
AddActiveQuery(query);
asyncQueriesMutex[query->GetQueryID()]->releasewritelock();
DBAsyncMutex.releasewritelock();
bool isActive = IsActiveQuery(query->GetQueryID(), query);
if (!isActive) {
continueAsync = true;
DBStruct* tmp = new DBStruct;
tmp->queryid = query->GetQueryID();
thread asyncThread(DBAsyncQueries, (void*)tmp);
asyncThread.detach();
}
}
// Process all async queries for specified query ID
void RunAsyncQueries(int32 queryid)
{
Database* asyncdb = FindFreeInstance();
DBAsyncMutex.writelock();
map<int32, deque<Query*>>::iterator itr = asyncQueries.find(queryid);
if (itr == asyncQueries.end()) {
DBAsyncMutex.releasewritelock();
return;
}
asyncQueriesMutex[queryid]->writelock();
deque<Query*> queries;
while (itr->second.size()) {
Query* cur = itr->second.front();
queries.push_back(cur);
itr->second.pop_front();
}
itr->second.clear();
asyncQueries.erase(itr);
DBAsyncMutex.releasewritelock();
asyncQueriesMutex[queryid]->releasewritelock();
while (queries.size() > 0) {
Query* cur = queries.front();
cur->RunQueryAsync(asyncdb);
this->RemoveActiveQuery(cur);
queries.pop_front();
safe_delete(cur);
}
FreeDBInstance(asyncdb);
bool isActive = IsActiveQuery(queryid);
if (isActive) {
continueAsync = true;
DBStruct* tmp = new DBStruct;
tmp->queryid = queryid;
thread asyncThread(DBAsyncQueries, (void*)tmp);
asyncThread.detach();
}
}
// Find available database instance from pool
Database* FindFreeInstance()
{
Database* db_inst = 0;
map<Database*, bool>::iterator itr;
DBInstanceMutex.writelock(__FUNCTION__, __LINE__);
for (itr = dbInstances.begin(); itr != dbInstances.end(); itr++) {
if (!itr->second) {
db_inst = itr->first;
itr->second = true;
break;
}
}
if (!db_inst) {
WorldDatabase* tmp = new WorldDatabase();
db_inst = (Database*)tmp;
tmp->Init();
tmp->ConnectNewDatabase();
dbInstances.insert(make_pair(db_inst, true));
}
DBInstanceMutex.releasewritelock(__FUNCTION__, __LINE__);
return db_inst;
}
// Remove query from active session tracking
void RemoveActiveQuery(Query* query)
{
DBQueryMutex.writelock(__FUNCTION__, __LINE__);
vector<Query*>::iterator itr;
for (itr = activeQuerySessions.begin(); itr != activeQuerySessions.end(); itr++) {
Query* curQuery = *itr;
if (query == curQuery) {
activeQuerySessions.erase(itr);
break;
}
}
DBQueryMutex.releasewritelock(__FUNCTION__, __LINE__);
}
// Add query to active session tracking
void AddActiveQuery(Query* query)
{
DBQueryMutex.writelock(__FUNCTION__, __LINE__);
activeQuerySessions.push_back(query);
DBQueryMutex.releasewritelock(__FUNCTION__, __LINE__);
}
// Check if query ID has active operations
bool IsActiveQuery(int32 id, Query* skip = 0)
{
bool isActive = false;
DBQueryMutex.readlock(__FUNCTION__, __LINE__);
vector<Query*>::iterator itr;
for (itr = activeQuerySessions.begin(); itr != activeQuerySessions.end(); itr++) {
Query* query = *itr;
if (query == skip)
continue;
if (query->GetQueryID() == id) {
isActive = true;
break;
}
}
DBQueryMutex.releasereadlock(__FUNCTION__, __LINE__);
return isActive;
}
// Ping all async database connections
void PingAsyncDatabase()
{
map<Database*, bool>::iterator itr;
DBInstanceMutex.readlock(__FUNCTION__, __LINE__);
for (itr = dbInstances.begin(); itr != dbInstances.end(); itr++) {
Database* tmpInst = itr->first;
tmpInst->Ping();
}
DBInstanceMutex.releasereadlock(__FUNCTION__, __LINE__);
}
private:
// Clean up all database instances in pool
void PurgeDBInstances()
{
map<Database*, bool>::iterator itr;
DBInstanceMutex.writelock(__FUNCTION__, __LINE__);
for (itr = dbInstances.begin(); itr != dbInstances.end(); itr++) {
WorldDatabase* tmpInst = (WorldDatabase*)itr->first;
safe_delete(tmpInst);
}
dbInstances.clear();
DBInstanceMutex.releasewritelock(__FUNCTION__, __LINE__);
}
// Return database instance to available pool
void FreeDBInstance(Database* cur)
{
DBInstanceMutex.writelock(__FUNCTION__, __LINE__);
dbInstances[cur] = false;
DBInstanceMutex.releasewritelock(__FUNCTION__, __LINE__);
}
bool continueAsync; // Flag to continue async processing
map<int32, deque<Query*>> asyncQueries; // Queue of async queries by ID
map<int32, Mutex*> asyncQueriesMutex; // Mutex per query ID for thread safety
map<Database*, bool> dbInstances; // Pool of database instances
vector<Query*> activeQuerySessions; // Currently active query sessions
Mutex DBAsyncMutex; // Mutex for async operations
Mutex DBInstanceMutex; // Mutex for instance pool management
Mutex DBQueryMutex; // Mutex for query session tracking
#endif
};
#ifdef WORLD
// Global database instance declaration - would be defined elsewhere
extern Database database;
// Thread function for processing async database queries
ThreadReturnType DBAsyncQueries(void* str)
{
this_thread::sleep_for(chrono::milliseconds(10));
DBStruct* data = (DBStruct*)str;
database.RunAsyncQueries(data->queryid);
delete data;
THREAD_RETURN(NULL);
}
#endif