392 lines
10 KiB
C++
392 lines
10 KiB
C++
// Copyright (C) 2007 EQ2EMulator Development Team - GPL v3 License
|
|
|
|
#pragma once
|
|
|
|
#include <map>
|
|
#include <vector>
|
|
#include <deque>
|
|
#include <thread>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
|
|
#include "database_core.hpp"
|
|
#include "query.hpp"
|
|
#include "../stream/eq_stream.hpp"
|
|
#include "../packet/packet_functions.hpp"
|
|
#include "../opcodes/emu_opcodes.hpp"
|
|
#include "../eq_emu_error.hpp"
|
|
#include "../packet/packet_dump.hpp"
|
|
|
|
#ifdef WORLD
|
|
#include "../../WorldServer/WorldDatabase.h"
|
|
#endif
|
|
#ifdef LOGIN
|
|
#include "../../LoginServer/login_database.hpp"
|
|
#endif
|
|
#ifdef PARSER
|
|
#include "../PacketParser/ParserDatabase.h"
|
|
#endif
|
|
#ifdef PATCHER
|
|
#include "../PatchServer/PatcherDatabase.h"
|
|
#endif
|
|
|
|
|
|
using namespace std;
|
|
|
|
// Forward declaration
|
|
class Query;
|
|
|
|
// Structure for async database operations
|
|
typedef struct
|
|
{
|
|
int32 queryid;
|
|
} DBStruct;
|
|
|
|
// Advanced database class with async processing and game-specific functionality
|
|
class Database : public DatabaseCore
|
|
{
|
|
public:
|
|
// Constructor - initializes database variables
|
|
Database()
|
|
{
|
|
#ifdef WORLD
|
|
continueAsync = false;
|
|
#endif
|
|
}
|
|
|
|
// Destructor - cleans up resources and async queries
|
|
~Database()
|
|
{
|
|
#ifdef WORLD
|
|
DBQueryMutex.writelock(__FUNCTION__, __LINE__);
|
|
activeQuerySessions.clear();
|
|
DBQueryMutex.releasewritelock(__FUNCTION__, __LINE__);
|
|
|
|
DBAsyncMutex.writelock();
|
|
continueAsync = false;
|
|
for (auto& itr : asyncQueries) {
|
|
asyncQueriesMutex[itr.first]->writelock();
|
|
auto& queries = itr.second;
|
|
while (queries.size() > 0) {
|
|
Query* cur = queries.front();
|
|
queries.pop_front();
|
|
safe_delete(cur);
|
|
}
|
|
asyncQueriesMutex[itr.first]->releasewritelock();
|
|
Mutex* mutex = asyncQueriesMutex[itr.first];
|
|
asyncQueriesMutex.erase(itr.first);
|
|
safe_delete(mutex);
|
|
}
|
|
asyncQueries.clear();
|
|
asyncQueriesMutex.clear();
|
|
DBAsyncMutex.releasewritelock();
|
|
|
|
PurgeDBInstances();
|
|
#endif
|
|
}
|
|
|
|
// Initialize database connection with optional silent loading
|
|
bool Init(bool silentLoad = false)
|
|
{
|
|
return Connect();
|
|
}
|
|
|
|
// Comprehensive query execution method - inherits from DatabaseCore but ensures Database access
|
|
using DatabaseCore::RunQuery;
|
|
|
|
// Retrieve opcode mappings for specified client version
|
|
map<string, uint16> GetOpcodes(int16 version)
|
|
{
|
|
map<string, uint16> opcodes;
|
|
DatabaseResult result;
|
|
if (Select(&result, "select name, opcode from opcodes where %i between version_range1 and version_range2 order by version_range1, id", version)) {
|
|
while (result.Next()) {
|
|
opcodes[result.GetStringStr("name")] = result.GetInt16Str("opcode");
|
|
}
|
|
}
|
|
return opcodes;
|
|
}
|
|
|
|
// Get available client version ranges
|
|
map<int16, int16> GetVersions()
|
|
{
|
|
map<int16, int16> versions;
|
|
DatabaseResult result;
|
|
if (Select(&result, "select distinct version_range1, version_range2 from opcodes")) {
|
|
while (result.Next()) {
|
|
versions[result.GetInt16Str("version_range1")] = result.GetInt16Str("version_range2");
|
|
}
|
|
}
|
|
return versions;
|
|
}
|
|
|
|
// Authenticate web user credentials and return user ID
|
|
int32 AuthenticateWebUser(char* userName, char* passwd, int32* status = 0)
|
|
{
|
|
if (status) {
|
|
*status = 0;
|
|
}
|
|
int32 id = 0;
|
|
DatabaseResult result;
|
|
if (Select(&result, "select id, status from web_users where username='%s' and passwd = sha2('%s', 512)", EscapeStr(userName).c_str(), EscapeStr(passwd).c_str())) {
|
|
if (result.Next()) {
|
|
id = result.GetInt32Str("id");
|
|
if (status) {
|
|
*status = result.GetInt32Str("status");
|
|
}
|
|
}
|
|
}
|
|
return id;
|
|
}
|
|
|
|
// Check if route requires authentication
|
|
int32 NoAuthRoute(char* route)
|
|
{
|
|
int32 status = 0xFFFFFFFF;
|
|
DatabaseResult result;
|
|
if (Select(&result, "select status from web_routes where route='%s'", EscapeStr(route).c_str())) {
|
|
if (result.Next()) {
|
|
status = result.GetInt32Str("status");
|
|
}
|
|
}
|
|
return status;
|
|
}
|
|
|
|
// Handle MySQL error codes with appropriate logging
|
|
void HandleMysqlError(int32 errnum)
|
|
{
|
|
switch (errnum) {
|
|
case 0:
|
|
break;
|
|
case 1045: // Access Denied
|
|
case 2001: {
|
|
AddEQEMuError(EQEMuError_Mysql_1405, true);
|
|
break;
|
|
}
|
|
case 2003: { // Unable to connect
|
|
AddEQEMuError(EQEMuError_Mysql_2003, true);
|
|
break;
|
|
}
|
|
case 2005: { // Unable to connect
|
|
AddEQEMuError(EQEMuError_Mysql_2005, true);
|
|
break;
|
|
}
|
|
case 2007: { // Unable to connect
|
|
AddEQEMuError(EQEMuError_Mysql_2007, true);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
#ifdef WORLD
|
|
// Add query to async processing queue
|
|
void AddAsyncQuery(Query* query)
|
|
{
|
|
DBAsyncMutex.writelock();
|
|
map<int32, Mutex*>::iterator mutexItr = asyncQueriesMutex.find(query->GetQueryID());
|
|
if (mutexItr == asyncQueriesMutex.end()) {
|
|
Mutex* queryMutex = new Mutex();
|
|
queryMutex->SetName("AsyncQuery" + query->GetQueryID());
|
|
asyncQueriesMutex.insert(make_pair(query->GetQueryID(), queryMutex));
|
|
}
|
|
map<int32, deque<Query*>>::iterator itr = asyncQueries.find(query->GetQueryID());
|
|
asyncQueriesMutex[query->GetQueryID()]->writelock();
|
|
|
|
if (itr != asyncQueries.end())
|
|
itr->second.push_back(query);
|
|
else {
|
|
deque<Query*> queue;
|
|
queue.push_back(query);
|
|
asyncQueries.insert(make_pair(query->GetQueryID(), queue));
|
|
}
|
|
|
|
AddActiveQuery(query);
|
|
|
|
asyncQueriesMutex[query->GetQueryID()]->releasewritelock();
|
|
DBAsyncMutex.releasewritelock();
|
|
|
|
bool isActive = IsActiveQuery(query->GetQueryID(), query);
|
|
if (!isActive) {
|
|
continueAsync = true;
|
|
DBStruct* tmp = new DBStruct;
|
|
tmp->queryid = query->GetQueryID();
|
|
|
|
thread asyncThread(DBAsyncQueries, (void*)tmp);
|
|
asyncThread.detach();
|
|
}
|
|
}
|
|
|
|
// Process all async queries for specified query ID
|
|
void RunAsyncQueries(int32 queryid)
|
|
{
|
|
Database* asyncdb = FindFreeInstance();
|
|
DBAsyncMutex.writelock();
|
|
map<int32, deque<Query*>>::iterator itr = asyncQueries.find(queryid);
|
|
if (itr == asyncQueries.end()) {
|
|
DBAsyncMutex.releasewritelock();
|
|
return;
|
|
}
|
|
|
|
asyncQueriesMutex[queryid]->writelock();
|
|
deque<Query*> queries;
|
|
while (itr->second.size()) {
|
|
Query* cur = itr->second.front();
|
|
queries.push_back(cur);
|
|
itr->second.pop_front();
|
|
}
|
|
itr->second.clear();
|
|
asyncQueries.erase(itr);
|
|
DBAsyncMutex.releasewritelock();
|
|
asyncQueriesMutex[queryid]->releasewritelock();
|
|
|
|
while (queries.size() > 0) {
|
|
Query* cur = queries.front();
|
|
cur->RunQueryAsync(asyncdb);
|
|
this->RemoveActiveQuery(cur);
|
|
queries.pop_front();
|
|
safe_delete(cur);
|
|
}
|
|
FreeDBInstance(asyncdb);
|
|
|
|
bool isActive = IsActiveQuery(queryid);
|
|
if (isActive) {
|
|
continueAsync = true;
|
|
DBStruct* tmp = new DBStruct;
|
|
tmp->queryid = queryid;
|
|
|
|
thread asyncThread(DBAsyncQueries, (void*)tmp);
|
|
asyncThread.detach();
|
|
}
|
|
}
|
|
|
|
// Find available database instance from pool
|
|
Database* FindFreeInstance()
|
|
{
|
|
Database* db_inst = 0;
|
|
map<Database*, bool>::iterator itr;
|
|
DBInstanceMutex.writelock(__FUNCTION__, __LINE__);
|
|
for (itr = dbInstances.begin(); itr != dbInstances.end(); itr++) {
|
|
if (!itr->second) {
|
|
db_inst = itr->first;
|
|
itr->second = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!db_inst) {
|
|
WorldDatabase* tmp = new WorldDatabase();
|
|
db_inst = (Database*)tmp;
|
|
tmp->Init();
|
|
tmp->ConnectNewDatabase();
|
|
dbInstances.insert(make_pair(db_inst, true));
|
|
}
|
|
DBInstanceMutex.releasewritelock(__FUNCTION__, __LINE__);
|
|
|
|
return db_inst;
|
|
}
|
|
|
|
// Remove query from active session tracking
|
|
void RemoveActiveQuery(Query* query)
|
|
{
|
|
DBQueryMutex.writelock(__FUNCTION__, __LINE__);
|
|
|
|
vector<Query*>::iterator itr;
|
|
for (itr = activeQuerySessions.begin(); itr != activeQuerySessions.end(); itr++) {
|
|
Query* curQuery = *itr;
|
|
if (query == curQuery) {
|
|
activeQuerySessions.erase(itr);
|
|
break;
|
|
}
|
|
}
|
|
DBQueryMutex.releasewritelock(__FUNCTION__, __LINE__);
|
|
}
|
|
|
|
// Add query to active session tracking
|
|
void AddActiveQuery(Query* query)
|
|
{
|
|
DBQueryMutex.writelock(__FUNCTION__, __LINE__);
|
|
activeQuerySessions.push_back(query);
|
|
DBQueryMutex.releasewritelock(__FUNCTION__, __LINE__);
|
|
}
|
|
|
|
// Check if query ID has active operations
|
|
bool IsActiveQuery(int32 id, Query* skip = 0)
|
|
{
|
|
bool isActive = false;
|
|
|
|
DBQueryMutex.readlock(__FUNCTION__, __LINE__);
|
|
vector<Query*>::iterator itr;
|
|
for (itr = activeQuerySessions.begin(); itr != activeQuerySessions.end(); itr++) {
|
|
Query* query = *itr;
|
|
if (query == skip)
|
|
continue;
|
|
|
|
if (query->GetQueryID() == id) {
|
|
isActive = true;
|
|
break;
|
|
}
|
|
}
|
|
DBQueryMutex.releasereadlock(__FUNCTION__, __LINE__);
|
|
|
|
return isActive;
|
|
}
|
|
|
|
// Ping all async database connections
|
|
void PingAsyncDatabase()
|
|
{
|
|
map<Database*, bool>::iterator itr;
|
|
DBInstanceMutex.readlock(__FUNCTION__, __LINE__);
|
|
for (itr = dbInstances.begin(); itr != dbInstances.end(); itr++) {
|
|
Database* tmpInst = itr->first;
|
|
tmpInst->Ping();
|
|
}
|
|
DBInstanceMutex.releasereadlock(__FUNCTION__, __LINE__);
|
|
}
|
|
|
|
private:
|
|
// Clean up all database instances in pool
|
|
void PurgeDBInstances()
|
|
{
|
|
map<Database*, bool>::iterator itr;
|
|
DBInstanceMutex.writelock(__FUNCTION__, __LINE__);
|
|
for (itr = dbInstances.begin(); itr != dbInstances.end(); itr++) {
|
|
WorldDatabase* tmpInst = (WorldDatabase*)itr->first;
|
|
safe_delete(tmpInst);
|
|
}
|
|
dbInstances.clear();
|
|
DBInstanceMutex.releasewritelock(__FUNCTION__, __LINE__);
|
|
}
|
|
|
|
// Return database instance to available pool
|
|
void FreeDBInstance(Database* cur)
|
|
{
|
|
DBInstanceMutex.writelock(__FUNCTION__, __LINE__);
|
|
dbInstances[cur] = false;
|
|
DBInstanceMutex.releasewritelock(__FUNCTION__, __LINE__);
|
|
}
|
|
|
|
bool continueAsync; // Flag to continue async processing
|
|
map<int32, deque<Query*>> asyncQueries; // Queue of async queries by ID
|
|
map<int32, Mutex*> asyncQueriesMutex; // Mutex per query ID for thread safety
|
|
map<Database*, bool> dbInstances; // Pool of database instances
|
|
vector<Query*> activeQuerySessions; // Currently active query sessions
|
|
Mutex DBAsyncMutex; // Mutex for async operations
|
|
Mutex DBInstanceMutex; // Mutex for instance pool management
|
|
Mutex DBQueryMutex; // Mutex for query session tracking
|
|
#endif
|
|
};
|
|
|
|
#ifdef WORLD
|
|
// Global database instance declaration - would be defined elsewhere
|
|
extern Database database;
|
|
|
|
// Thread function for processing async database queries
|
|
ThreadReturnType DBAsyncQueries(void* str)
|
|
{
|
|
this_thread::sleep_for(chrono::milliseconds(10));
|
|
DBStruct* data = (DBStruct*)str;
|
|
database.RunAsyncQueries(data->queryid);
|
|
delete data;
|
|
THREAD_RETURN(NULL);
|
|
}
|
|
#endif |