1
0

Fixed polling issues when a server is erroring out and also peering the right id

This commit is contained in:
Emagi 2024-12-02 20:01:21 -05:00
parent 140d58d0da
commit 405e43f672
2 changed files with 156 additions and 154 deletions

View File

@ -511,157 +511,150 @@ void HTTPSClientPool::pollPeerHealth(const std::string& server, const std::strin
LogWrite(PEERING__ERROR, 0, "Peering", "%s: Error finding peer %s:%s.", __FUNCTION__, server.c_str(), port.c_str()); LogWrite(PEERING__ERROR, 0, "Peering", "%s: Error finding peer %s:%s.", __FUNCTION__, server.c_str(), port.c_str());
} }
else { else {
auto client = getOrCreateClient(id, port, server + ":" + port); auto client = getOrCreateClient(id, server, port);
int16 interval = pollingInterval; int16 interval = pollingInterval;
while (running.load()) { HealthStatus curStatus = peer_manager.getPeerStatus(server, web_worldport);
interval++; id = peer_manager.isPeer(server, web_worldport);
if (interval > pollingInterval) { try {
HealthStatus curStatus = peer_manager.getPeerStatus(server, web_worldport); auto response = client->sendRequest(server, port, "/status"); // Assumes HTTPSClient has a get method
id = peer_manager.isPeer(server, web_worldport); //std::cout << "Health check response from " << server << ":" << port << " - " << response << std::endl;
try {
auto response = client->sendRequest(server, port, "/status"); // Assumes HTTPSClient has a get method
//std::cout << "Health check response from " << server << ":" << port << " - " << response << std::endl;
boost::property_tree::ptree json_tree; boost::property_tree::ptree json_tree;
std::istringstream json_stream(response); std::istringstream json_stream(response);
boost::property_tree::read_json(json_stream, json_tree); boost::property_tree::read_json(json_stream, json_tree);
std::string online_status; std::string online_status;
int16 peer_priority = 65535; int16 peer_priority = 65535;
bool peer_primary = false; bool peer_primary = false;
if (auto status = json_tree.get_optional<std::string>("world_status")) { if (auto status = json_tree.get_optional<std::string>("world_status")) {
online_status = status.get(); online_status = status.get();
} }
if (auto priority = json_tree.get_optional<int16>("peer_priority")) { if (auto priority = json_tree.get_optional<int16>("peer_priority")) {
peer_priority = priority.get(); peer_priority = priority.get();
} }
if (auto isprimary = json_tree.get_optional<bool>("peer_primary")) { if (auto isprimary = json_tree.get_optional<bool>("peer_primary")) {
peer_primary = isprimary.get(); peer_primary = isprimary.get();
} }
peer_manager.updatePriority(id, peer_priority); peer_manager.updatePriority(id, peer_priority);
if (peer_primary && net.is_primary) { if (peer_primary && net.is_primary) {
peer_manager.handlePrimaryConflict(id); peer_manager.handlePrimaryConflict(id);
std::shared_ptr<Peer> hasPrimary = peer_manager.getHealthyPrimaryPeerPtr(); std::shared_ptr<Peer> hasPrimary = peer_manager.getHealthyPrimaryPeerPtr();
if (hasPrimary) { // demote self if (hasPrimary) { // demote self
LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY status, demoting self.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY status, demoting self.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
net.SetPrimary(false); net.SetPrimary(false);
} }
} }
else if(!peer_manager.hasPrimary() && peer_primary) { else if (!peer_manager.hasPrimary() && peer_primary) {
peer_manager.setPrimary(id); peer_manager.setPrimary(id);
} }
switch (curStatus) { switch (curStatus) {
case HealthStatus::STARTUP: { case HealthStatus::STARTUP: {
pollPeerHealthData(client, id, server, port); pollPeerHealthData(client, id, server, port);
if (online_status == "offline") { if (online_status == "offline") {
std::shared_ptr<Peer> peer = peer_manager.getPeerById(id); std::shared_ptr<Peer> peer = peer_manager.getPeerById(id);
if (peer) { if (peer) {
peer->wasOffline = true; peer->wasOffline = true;
if (peer->sentInitialPeerData) { if (peer->sentInitialPeerData) {
peer->sentInitialPeerData = false; peer->sentInitialPeerData = false;
} }
} }
} }
if (online_status == "online") { if (online_status == "online") {
peer_manager.updateHealth(id, HealthStatus::OK); peer_manager.updateHealth(id, HealthStatus::OK);
std::shared_ptr<Peer> peer = peer_manager.getPeerById(id); std::shared_ptr<Peer> peer = peer_manager.getPeerById(id);
if (net.is_primary) { if (net.is_primary) {
if (peer) { if (peer) {
if (peer->wasOffline && !peer->sentInitialPeerData) { if (peer->wasOffline && !peer->sentInitialPeerData) {
world.GetGroupManager()->SendPeerGroupData(id); world.GetGroupManager()->SendPeerGroupData(id);
} }
peer->sentInitialPeerData = true; peer->sentInitialPeerData = true;
} }
} }
else if (peer) { // set as if we already sent the data since if we take over we don't want the peer trying to resubmit all groups else if (peer) { // set as if we already sent the data since if we take over we don't want the peer trying to resubmit all groups
peer->wasOffline = false; peer->wasOffline = false;
peer->sentInitialPeerData = true; peer->sentInitialPeerData = true;
} }
LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS OK/UP state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS OK/UP state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
} }
if (!net.is_primary && !peer_manager.hasPrimary() && peer_priority < net.GetPeerPriority()) { if (!net.is_primary && !peer_manager.hasPrimary() && peer_priority < net.GetPeerPriority()) {
LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
peer_manager.setPrimary(id); peer_manager.setPrimary(id);
net.SetPrimary(false); net.SetPrimary(false);
} }
else if (!peer_manager.hasPrimary() && world.world_loaded && !net.is_primary && net.GetPeerPriority() <= peer_priority) { else if (!peer_manager.hasPrimary() && world.world_loaded && !net.is_primary && net.GetPeerPriority() <= peer_priority) {
LogWrite(PEERING__INFO, 0, "Peering", "%s: I AM PRIMARY!", __FUNCTION__); LogWrite(PEERING__INFO, 0, "Peering", "%s: I AM PRIMARY!", __FUNCTION__);
net.SetPrimary(); net.SetPrimary();
} }
break; break;
} }
case HealthStatus::OK: { case HealthStatus::OK: {
pollPeerHealthData(client, id, server, port); pollPeerHealthData(client, id, server, port);
if (!net.is_primary && !peer_manager.hasPrimary() && peer_priority < net.GetPeerPriority()) { if (!net.is_primary && !peer_manager.hasPrimary() && peer_priority < net.GetPeerPriority()) {
LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
peer_manager.setPrimary(id); peer_manager.setPrimary(id);
net.SetPrimary(false); net.SetPrimary(false);
} }
else if (!peer_manager.hasPrimary() && !net.is_primary && net.GetPeerPriority() <= peer_priority) { else if (!peer_manager.hasPrimary() && !net.is_primary && net.GetPeerPriority() <= peer_priority) {
LogWrite(PEERING__INFO, 0, "Peering", "%s: I AM PRIMARY!!", __FUNCTION__); LogWrite(PEERING__INFO, 0, "Peering", "%s: I AM PRIMARY!!", __FUNCTION__);
net.SetPrimary(); net.SetPrimary();
} }
break; break;
} }
case HealthStatus::WARN: case HealthStatus::WARN:
case HealthStatus::ERROR: case HealthStatus::ERROR:
case HealthStatus::SHUTDOWN: { case HealthStatus::SHUTDOWN: {
peer_manager.updateHealth(id, HealthStatus::STARTUP); peer_manager.updateHealth(id, HealthStatus::STARTUP);
LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS ENTERED STARTUP state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS ENTERED STARTUP state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
if (net.is_primary) { if (net.is_primary) {
std::shared_ptr<Peer> peer = peer_manager.getPeerById(id); std::shared_ptr<Peer> peer = peer_manager.getPeerById(id);
if (peer && peer->sentInitialPeerData == true) { if (peer && peer->sentInitialPeerData == true) {
peer->sentInitialPeerData = false; peer->sentInitialPeerData = false;
} }
} }
break; break;
} }
}
}
catch (const std::exception& e) {
HealthStatus curStatus = peer_manager.getPeerStatus(server, web_worldport);
switch (curStatus) {
case HealthStatus::WARN: {
peer_manager.updateHealth(id, HealthStatus::ERROR);
break;
}
case HealthStatus::ERROR: {
LogWrite(PEERING__ERROR, 0, "Peering", "%s: Peer %s at %s:%s - HAS ERROR->SHUTDOWN state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
peer_manager.updateHealth(id, HealthStatus::SHUTDOWN);
if (peer_manager.getHealthyPeer() == std::nullopt) {
if (!net.is_primary && world.world_loaded) {
LogWrite(PEERING__INFO, 0, "Peering", "%s: TAKING OVER AS PRIMARY, NO PEERS AVAILABLE TO CHECK", __FUNCTION__);
net.SetPrimary();
}
}
else if (!peer_manager.hasPrimary()) {
std::string newPrimary = peer_manager.getPriorityPeer();
if (newPrimary.size() > 0) {
LogWrite(PEERING__INFO, 0, "Peering", "%s: NEW PRIMARY %s", __FUNCTION__, newPrimary);
peer_manager.setPrimary(newPrimary);
net.SetPrimary(false);
}
else {
LogWrite(PEERING__ERROR, 0, "Peering", "%s: NEW PRIMARY CANNOT BE ESTABLISHED!", __FUNCTION__);
}
}
break;
}
default: {
peer_manager.updateHealth(id, HealthStatus::WARN);
break;
}
}
LogWrite(PEERING__ERROR, 0, "Peering", "%s: ERROR POLLING %s:%s reason: %s", __FUNCTION__, server.c_str(), port.c_str(), e.what() ? e.what() : "??");
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
interval = 0;
} }
std::this_thread::sleep_for(std::chrono::milliseconds(50));
} }
catch (const std::exception& e) {
HealthStatus curStatus = peer_manager.getPeerStatus(server, web_worldport);
switch (curStatus) {
case HealthStatus::WARN: {
peer_manager.updateHealth(id, HealthStatus::ERROR);
break;
}
case HealthStatus::ERROR: {
LogWrite(PEERING__ERROR, 0, "Peering", "%s: Peer %s at %s:%s - HAS ERROR->SHUTDOWN state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
peer_manager.updateHealth(id, HealthStatus::SHUTDOWN);
if (peer_manager.getHealthyPeer() == std::nullopt) {
if (!net.is_primary && world.world_loaded) {
LogWrite(PEERING__INFO, 0, "Peering", "%s: TAKING OVER AS PRIMARY, NO PEERS AVAILABLE TO CHECK", __FUNCTION__);
net.SetPrimary();
}
}
else if (!peer_manager.hasPrimary()) {
std::string newPrimary = peer_manager.getPriorityPeer();
if (newPrimary.size() > 0) {
LogWrite(PEERING__INFO, 0, "Peering", "%s: NEW PRIMARY %s", __FUNCTION__, newPrimary);
peer_manager.setPrimary(newPrimary);
net.SetPrimary(false);
}
else {
LogWrite(PEERING__ERROR, 0, "Peering", "%s: NEW PRIMARY CANNOT BE ESTABLISHED!", __FUNCTION__);
}
}
break;
}
default: {
peer_manager.updateHealth(id, HealthStatus::WARN);
break;
}
}
LogWrite(PEERING__ERROR, 0, "Peering", "%s: ERROR POLLING %s:%s reason: %s", __FUNCTION__, server.c_str(), port.c_str(), e.what() ? e.what() : "??");
}
interval = 0;
} }
} }
@ -724,15 +717,19 @@ void HTTPSClientPool::startPolling() {
std::async(std::launch::async, [this, server, port]() { std::async(std::launch::async, [this, server, port]() {
try { try {
pollPeerHealth(server, port); pollPeerHealth(server, port);
} catch (const std::exception& e) { }
catch (const std::exception& e) {
LogWrite(PEERING__DEBUG, 1, "Peering", "Exception in pollPeerHealth for %s:%s: %s", server.c_str(), port.c_str(), e.what()); LogWrite(PEERING__DEBUG, 1, "Peering", "Exception in pollPeerHealth for %s:%s: %s", server.c_str(), port.c_str(), e.what());
} catch (...) { }
catch (...) {
LogWrite(PEERING__DEBUG, 1, "Peering", "Unknown exception in pollPeerHealth for %s:%s.", server.c_str(), port.c_str()); LogWrite(PEERING__DEBUG, 1, "Peering", "Unknown exception in pollPeerHealth for %s:%s.", server.c_str(), port.c_str());
} }
}); });
} catch (const std::exception& e) { }
catch (const std::exception& e) {
LogWrite(PEERING__DEBUG, 1, "Peering", "Failed to start async task for %s:%s: %s", server.c_str(), port.c_str(), e.what()); LogWrite(PEERING__DEBUG, 1, "Peering", "Failed to start async task for %s:%s: %s", server.c_str(), port.c_str(), e.what());
} catch (...) { }
catch (...) {
LogWrite(PEERING__DEBUG, 1, "Peering", "Unknown exception when starting async task for %s:%s.", server.c_str(), port.c_str()); LogWrite(PEERING__DEBUG, 1, "Peering", "Unknown exception when starting async task for %s:%s.", server.c_str(), port.c_str());
} }
} }

View File

@ -634,8 +634,13 @@ ThreadReturnType AchievmentLoad (void* tmp)
ThreadReturnType StartPeerPoll (void* tmp) ThreadReturnType StartPeerPoll (void* tmp)
{ {
LogWrite(WORLD__WARNING, 0, "Thread", "Start Polling...");
peer_https_pool.startPolling(); while( RunLoops )
{
LogWrite(WORLD__WARNING, 0, "Thread", "Start Polling...");
peer_https_pool.startPolling();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
THREAD_RETURN(NULL); THREAD_RETURN(NULL);
} }