fix utilities 2

This commit is contained in:
Sky Johnson 2025-06-27 19:06:26 -05:00
parent 78401fe84e
commit 200ceccf5e
2 changed files with 88 additions and 319 deletions

View File

@ -1,6 +1,5 @@
#pragma once
#include <future>
#include <string>
#include <memory>
#include <functional>
@ -9,19 +8,40 @@
#include <mutex>
#include <thread>
#include <condition_variable>
#include <queue>
#include <stop_token>
#include <iostream>
#include <fstream>
#include <chrono>
#include <type_traits>
#include <algorithm>
#include <cstring>
#include <netinet/in.h>
#include <sstream>
#include <queue>
#include <ranges>
#include <bit>
#include <syncstream>
#include <future>
namespace reactor {
namespace reactor
{
// NonCopyable base class
// Use compiler intrinsics for efficient byte swapping
#if defined(_MSC_VER)
#include <stdlib.h>
#define bswap_64(x) _byteswap_uint64(x)
#elif defined(__GNUC__) || defined(__clang__)
#define bswap_64(x) __builtin_bswap64(x)
#else
// Generic fallback implementation
inline uint64_t bswap_64(uint64_t val)
{
uint64_t temp = val;
char* ptr = reinterpret_cast<char*>(&temp);
std::reverse(ptr, ptr + sizeof(uint64_t));
return temp;
}
#endif
// NonCopyable base class (unchanged, follows modern practice)
class NonCopyable
{
protected:
@ -33,197 +53,54 @@ protected:
NonCopyable& operator=(NonCopyable&&) noexcept = default;
};
// Network byte order utilities
inline uint64_t hton64(uint64_t n)
// C++20 Network byte order utilities
inline uint64_t hton64(uint64_t host_uint64)
{
static const int one = 1;
static const char sig = *(char*)&one;
if (sig == 0) return n;
char* ptr = reinterpret_cast<char*>(&n);
std::reverse(ptr, ptr + sizeof(uint64_t));
return n;
if constexpr (std::endian::native == std::endian::little) {
return bswap_64(host_uint64);
} else {
return host_uint64;
}
}
inline uint64_t ntoh64(uint64_t n) { return hton64(n); }
// Lock-free MPSC queue using type erasure
class LockFreeQueue : public NonCopyable
inline uint64_t ntoh64(uint64_t net_uint64)
{
private:
struct Node
{
Node() = default;
return hton64(net_uint64);
}
template<typename T>
Node(T&& data) : data_(std::make_unique<ConcreteTask<T>>(std::forward<T>(data))) {}
struct Task {
virtual ~Task() = default;
virtual void call() = 0;
};
template<typename F>
struct ConcreteTask : Task {
F func_;
ConcreteTask(F&& f) : func_(std::forward<F>(f)) {}
void call() override { func_(); }
};
std::unique_ptr<Task> data_;
std::atomic<Node*> next_{nullptr};
};
std::atomic<Node*> head_;
std::atomic<Node*> tail_;
public:
LockFreeQueue() : head_(new Node), tail_(head_.load()) {}
~LockFreeQueue()
{
while (dequeue()) {}
delete head_.load();
}
template<typename F>
void enqueue(F&& input)
{
Node* node = new Node(std::forward<F>(input));
Node* prevhead = head_.exchange(node, std::memory_order_acq_rel);
prevhead->next_.store(node, std::memory_order_release);
}
bool dequeue()
{
Node* tail = tail_.load(std::memory_order_relaxed);
Node* next = tail->next_.load(std::memory_order_acquire);
if (next == nullptr) return false;
if (next->data_) {
next->data_->call();
}
tail_.store(next, std::memory_order_release);
delete tail;
return true;
}
bool empty()
{
Node* tail = tail_.load(std::memory_order_relaxed);
Node* next = tail->next_.load(std::memory_order_acquire);
return next == nullptr;
}
};
// Type-safe lock-free queue for other use cases
template<typename T>
class LockFreeQueueTyped : public NonCopyable
{
private:
struct Node
{
std::atomic<T*> data_{nullptr};
std::atomic<Node*> next_{nullptr};
};
std::atomic<Node*> head_;
std::atomic<Node*> tail_;
public:
LockFreeQueueTyped()
{
Node* dummy = new Node;
head_.store(dummy);
tail_.store(dummy);
}
~LockFreeQueueTyped()
{
T output;
while (dequeue(output)) {}
delete head_.load();
}
void enqueue(T&& input)
{
Node* newNode = new Node;
T* data = new T(std::move(input));
newNode->data_.store(data);
Node* prevHead = head_.exchange(newNode);
prevHead->next_.store(newNode);
}
void enqueue(const T& input)
{
Node* newNode = new Node;
T* data = new T(input);
newNode->data_.store(data);
Node* prevHead = head_.exchange(newNode);
prevHead->next_.store(newNode);
}
bool dequeue(T& output)
{
Node* tail = tail_.load();
Node* next = tail->next_.load();
if (next == nullptr) {
return false;
}
T* data = next->data_.load();
if (data == nullptr) {
return false;
}
output = *data;
delete data;
tail_.store(next);
delete tail;
return true;
}
bool empty()
{
Node* tail = tail_.load();
Node* next = tail->next_.load();
return next == nullptr;
}
};
// Object Pool
// Object Pool (unchanged, this is a standard pattern)
template<typename T>
class ObjectPool : public NonCopyable, public std::enable_shared_from_this<ObjectPool<T>>
{
private:
std::vector<T*> objects_;
std::vector<std::unique_ptr<T>> objects_;
std::mutex mutex_;
public:
std::shared_ptr<T> getObject()
{
static_assert(!std::is_pointer_v<T>, "ObjectPool type cannot be pointer");
static_assert(!std::is_pointer_v<T>, "ObjectPool type cannot be a pointer");
T* p = nullptr;
std::unique_ptr<T> p = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
if (!objects_.empty()) {
p = objects_.back();
p = std::move(objects_.back());
objects_.pop_back();
}
}
if (!p) p = new T;
if (!p) {
p = std::make_unique<T>();
}
std::weak_ptr<ObjectPool<T>> weakPtr = this->shared_from_this();
return std::shared_ptr<T>(p, [weakPtr](T* ptr) {
return std::shared_ptr<T>(p.release(), [weakPtr](T* ptr) {
auto self = weakPtr.lock();
if (self) {
std::lock_guard<std::mutex> lock(self->mutex_);
self->objects_.push_back(ptr);
self->objects_.push_back(std::unique_ptr<T>(ptr));
} else {
delete ptr;
}
@ -231,7 +108,6 @@ public:
}
};
// Simple Logger
enum class LogLevel { TRACE, DEBUG, INFO, WARN, ERROR, FATAL };
class Logger : public NonCopyable
@ -239,7 +115,6 @@ class Logger : public NonCopyable
private:
static inline LogLevel level_ = LogLevel::INFO;
static inline std::unique_ptr<std::ofstream> file_;
static inline std::mutex mutex_;
std::ostringstream stream_;
LogLevel msgLevel_;
@ -281,12 +156,11 @@ public:
{
if (msgLevel_ >= level_) {
stream_ << "\n";
std::lock_guard<std::mutex> lock(mutex_);
// std::osyncstream handles synchronized, atomic writes
if (file_ && file_->is_open()) {
*file_ << stream_.str();
file_->flush();
std::osyncstream(*file_) << stream_.str();
} else {
std::cout << stream_.str();
std::osyncstream(std::cout) << stream_.str();
}
}
}
@ -303,39 +177,30 @@ public:
static void setLevel(LogLevel level) { level_ = level; }
static void setLogFile(const std::string& filename)
{
std::lock_guard<std::mutex> lock(mutex_);
file_ = std::make_unique<std::ofstream>(filename, std::ios::app);
}
};
// C++20 Concurrent Task Queue using jthread and stop_token
class ConcurrentTaskQueue : public NonCopyable
{
private:
std::vector<std::thread> threads_;
std::vector<std::jthread> threads_;
std::queue<std::function<void()>> taskQueue_;
std::mutex mutex_;
std::condition_variable taskCond_;
std::atomic<bool> stop_;
std::condition_variable_any taskCond_;
std::stop_source stopSource_;
std::string name_;
/*
* Worker thread function.
* Waits for tasks and executes them.
*/
void workerThread()
{
while (true)
void workerThread(std::stop_token token)
{
while (!token.stop_requested()) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
taskCond_.wait(lock, [this]
{
return stop_.load() || !taskQueue_.empty();
});
taskCond_.wait(lock, token, [this] { return !taskQueue_.empty(); });
if (stop_.load() && taskQueue_.empty())
{
if (token.stop_requested() && taskQueue_.empty()) {
return;
}
@ -348,36 +213,26 @@ private:
public:
ConcurrentTaskQueue(size_t threadNum, const std::string& name = "ConcurrentTaskQueue")
: stop_(false), name_(name)
: name_(name)
{
for (size_t i = 0; i < threadNum; ++i)
{
threads_.emplace_back(&ConcurrentTaskQueue::workerThread, this);
for (size_t i = 0; i < threadNum; ++i) {
threads_.emplace_back(&ConcurrentTaskQueue::workerThread, this, stopSource_.get_token());
}
}
~ConcurrentTaskQueue()
{
{
std::lock_guard<std::mutex> lock(mutex_);
stop_ = true;
}
stopSource_.request_stop();
taskCond_.notify_all();
for (auto& t : threads_)
{
if (t.joinable()) t.join();
}
// std::jthread destructors automatically join
}
/*
* Add a task to the queue.
*/
template<typename F>
void runTaskInQueue(F&& task)
{
{
std::lock_guard<std::mutex> lock(mutex_);
if (stop_) return;
if (stopSource_.stop_requested()) return;
taskQueue_.emplace(std::forward<F>(task));
}
taskCond_.notify_one();
@ -405,7 +260,7 @@ public:
}
};
// Logging macros
// Logging macros (unchanged)
#define LOG_TRACE reactor::Logger(reactor::LogLevel::TRACE)
#define LOG_DEBUG reactor::Logger(reactor::LogLevel::DEBUG)
#define LOG_INFO reactor::Logger(reactor::LogLevel::INFO)
@ -421,18 +276,13 @@ void hashCombine(std::size_t& seed, const T& value)
seed ^= hasher(value) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}
// C++20 string splitting using ranges
inline std::vector<std::string> splitString(const std::string& s, const std::string& delimiter)
{
std::vector<std::string> result;
size_t start = 0;
size_t end = s.find(delimiter);
while (end != std::string::npos) {
result.push_back(s.substr(start, end - start));
start = end + delimiter.length();
end = s.find(delimiter, start);
for (const auto& range : std::views::split(s, delimiter)) {
result.emplace_back(range.begin(), range.end());
}
result.push_back(s.substr(start));
return result;
}

View File

@ -5,87 +5,8 @@
#include <chrono>
#include <vector>
#include <future>
void test_lock_free_queue()
{
std::cout << "Testing lock-free queue...\n";
reactor::LockFreeQueueTyped<int> queue;
assert(queue.empty());
queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);
assert(!queue.empty());
int val;
assert(queue.dequeue(val) && val == 1);
assert(queue.dequeue(val) && val == 2);
assert(queue.dequeue(val) && val == 3);
assert(queue.empty());
assert(!queue.dequeue(val));
std::cout << "✓ Lock-free queue basic operations passed\n";
}
void test_lock_free_queue_concurrent()
{
std::cout << "Testing lock-free queue concurrency...\n";
reactor::LockFreeQueueTyped<int> queue;
constexpr int num_items = 50; // Reduced further
constexpr int num_producers = 1; // Simplified to 1 producer
constexpr int num_consumers = 1; // Simplified to 1 consumer
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
std::atomic<int> consumed_count{0};
std::atomic<bool> stop_consumers{false};
for (int p = 0; p < num_producers; ++p) {
producers.emplace_back([&queue, p, num_items]() {
for (int i = 0; i < num_items; ++i) {
queue.enqueue(p * num_items + i);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
});
}
for (int c = 0; c < num_consumers; ++c) {
consumers.emplace_back([&queue, &consumed_count, &stop_consumers]() {
int val;
while (!stop_consumers) {
if (queue.dequeue(val)) {
consumed_count++;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
});
}
for (auto& p : producers) {
p.join();
}
// Wait longer for all items to be consumed
auto start = std::chrono::steady_clock::now();
while (consumed_count < num_producers * num_items) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
auto now = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::seconds>(now - start).count() > 5) {
break; // Timeout after 5 seconds
}
}
stop_consumers = true;
for (auto& c : consumers) {
c.join();
}
assert(consumed_count == num_producers * num_items);
std::cout << "✓ Lock-free queue concurrency passed\n";
}
#include <latch> // For std::latch
#include <atomic>
void test_object_pool()
{
@ -94,8 +15,6 @@ void test_object_pool()
struct TestObject
{
int value = 42;
TestObject() { std::cout << "TestObject constructed\n"; }
~TestObject() { std::cout << "TestObject destructed\n"; }
};
auto pool = std::make_shared<reactor::ObjectPool<TestObject>>();
@ -141,24 +60,26 @@ void test_concurrent_task_queue()
{
std::cout << "Testing concurrent task queue...\n";
reactor::ConcurrentTaskQueue queue(2, "TestQueue");
reactor::ConcurrentTaskQueue queue(4, "TestQueue");
assert(queue.getName() == "TestQueue");
constexpr int num_tasks = 50;
std::atomic<int> counter{0};
std::promise<void> all_done;
auto future = all_done.get_future();
std::latch all_tasks_done(num_tasks); // C++20 latch for synchronization
for (int i = 0; i < 10; ++i) {
queue.runTaskInQueue([&counter, i, &all_done]() {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (++counter == 10) {
all_done.set_value();
}
for (int i = 0; i < num_tasks; ++i)
{
queue.runTaskInQueue([&counter, &all_tasks_done]()
{
counter++;
// Signal that one task is complete
all_tasks_done.count_down();
});
}
future.wait();
assert(counter == 10);
// Wait for all tasks to complete
all_tasks_done.wait();
assert(counter.load() == num_tasks);
std::cout << "✓ Concurrent task queue passed\n";
}
@ -168,15 +89,15 @@ void test_sync_task()
std::cout << "Testing sync task execution...\n";
reactor::ConcurrentTaskQueue queue(1, "SyncTest");
bool task_executed = false;
// Use shared_ptr to avoid move issues
auto executed_flag = std::make_shared<bool>(false);
queue.syncTaskInQueue([executed_flag]() {
queue.syncTaskInQueue([&task_executed]()
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
*executed_flag = true;
task_executed = true;
});
assert(*executed_flag);
assert(task_executed);
std::cout << "✓ Sync task execution passed\n";
}
@ -239,10 +160,8 @@ void test_non_copyable()
int main()
{
std::cout << "=== Utilities Tests ===\n";
std::cout << "=== Utilities Tests (C++20 Version) ===\n";
test_lock_free_queue();
test_lock_free_queue_concurrent();
test_object_pool();
test_logger();
test_concurrent_task_queue();