From 200ceccf5e9307e31088bf8e18e78f5c60d91c67 Mon Sep 17 00:00:00 2001 From: Sky Johnson Date: Fri, 27 Jun 2025 19:06:26 -0500 Subject: [PATCH] fix utilities 2 --- lib/Utilities.hpp | 284 +++++++++------------------------------ tests/test_utilities.cpp | 123 +++-------------- 2 files changed, 88 insertions(+), 319 deletions(-) diff --git a/lib/Utilities.hpp b/lib/Utilities.hpp index d30ae0a..05ea62f 100644 --- a/lib/Utilities.hpp +++ b/lib/Utilities.hpp @@ -1,6 +1,5 @@ #pragma once -#include #include #include #include @@ -9,19 +8,40 @@ #include #include #include +#include +#include #include #include #include #include #include -#include -#include #include -#include +#include +#include +#include +#include -namespace reactor { +namespace reactor +{ -// NonCopyable base class +// Use compiler intrinsics for efficient byte swapping +#if defined(_MSC_VER) + #include + #define bswap_64(x) _byteswap_uint64(x) +#elif defined(__GNUC__) || defined(__clang__) + #define bswap_64(x) __builtin_bswap64(x) +#else + // Generic fallback implementation + inline uint64_t bswap_64(uint64_t val) + { + uint64_t temp = val; + char* ptr = reinterpret_cast(&temp); + std::reverse(ptr, ptr + sizeof(uint64_t)); + return temp; + } +#endif + +// NonCopyable base class (unchanged, follows modern practice) class NonCopyable { protected: @@ -33,197 +53,54 @@ protected: NonCopyable& operator=(NonCopyable&&) noexcept = default; }; -// Network byte order utilities -inline uint64_t hton64(uint64_t n) +// C++20 Network byte order utilities +inline uint64_t hton64(uint64_t host_uint64) { - static const int one = 1; - static const char sig = *(char*)&one; - if (sig == 0) return n; - char* ptr = reinterpret_cast(&n); - std::reverse(ptr, ptr + sizeof(uint64_t)); - return n; + if constexpr (std::endian::native == std::endian::little) { + return bswap_64(host_uint64); + } else { + return host_uint64; + } } -inline uint64_t ntoh64(uint64_t n) { return hton64(n); } - -// Lock-free MPSC queue using type erasure -class LockFreeQueue : public NonCopyable +inline uint64_t ntoh64(uint64_t net_uint64) { -private: - struct Node - { - Node() = default; + return hton64(net_uint64); +} - template - Node(T&& data) : data_(std::make_unique>(std::forward(data))) {} - struct Task { - virtual ~Task() = default; - virtual void call() = 0; - }; - - template - struct ConcreteTask : Task { - F func_; - ConcreteTask(F&& f) : func_(std::forward(f)) {} - void call() override { func_(); } - }; - - std::unique_ptr data_; - std::atomic next_{nullptr}; - }; - - std::atomic head_; - std::atomic tail_; - -public: - LockFreeQueue() : head_(new Node), tail_(head_.load()) {} - - ~LockFreeQueue() - { - while (dequeue()) {} - delete head_.load(); - } - - template - void enqueue(F&& input) - { - Node* node = new Node(std::forward(input)); - Node* prevhead = head_.exchange(node, std::memory_order_acq_rel); - prevhead->next_.store(node, std::memory_order_release); - } - - bool dequeue() - { - Node* tail = tail_.load(std::memory_order_relaxed); - Node* next = tail->next_.load(std::memory_order_acquire); - - if (next == nullptr) return false; - - if (next->data_) { - next->data_->call(); - } - tail_.store(next, std::memory_order_release); - delete tail; - return true; - } - - bool empty() - { - Node* tail = tail_.load(std::memory_order_relaxed); - Node* next = tail->next_.load(std::memory_order_acquire); - return next == nullptr; - } -}; - -// Type-safe lock-free queue for other use cases -template -class LockFreeQueueTyped : public NonCopyable -{ -private: - struct Node - { - std::atomic data_{nullptr}; - std::atomic next_{nullptr}; - }; - - std::atomic head_; - std::atomic tail_; - -public: - LockFreeQueueTyped() - { - Node* dummy = new Node; - head_.store(dummy); - tail_.store(dummy); - } - - ~LockFreeQueueTyped() - { - T output; - while (dequeue(output)) {} - delete head_.load(); - } - - void enqueue(T&& input) - { - Node* newNode = new Node; - T* data = new T(std::move(input)); - newNode->data_.store(data); - - Node* prevHead = head_.exchange(newNode); - prevHead->next_.store(newNode); - } - - void enqueue(const T& input) - { - Node* newNode = new Node; - T* data = new T(input); - newNode->data_.store(data); - - Node* prevHead = head_.exchange(newNode); - prevHead->next_.store(newNode); - } - - bool dequeue(T& output) - { - Node* tail = tail_.load(); - Node* next = tail->next_.load(); - - if (next == nullptr) { - return false; - } - - T* data = next->data_.load(); - if (data == nullptr) { - return false; - } - - output = *data; - delete data; - tail_.store(next); - delete tail; - return true; - } - - bool empty() - { - Node* tail = tail_.load(); - Node* next = tail->next_.load(); - return next == nullptr; - } -}; - -// Object Pool +// Object Pool (unchanged, this is a standard pattern) template class ObjectPool : public NonCopyable, public std::enable_shared_from_this> { private: - std::vector objects_; + std::vector> objects_; std::mutex mutex_; public: std::shared_ptr getObject() { - static_assert(!std::is_pointer_v, "ObjectPool type cannot be pointer"); + static_assert(!std::is_pointer_v, "ObjectPool type cannot be a pointer"); - T* p = nullptr; + std::unique_ptr p = nullptr; { std::lock_guard lock(mutex_); if (!objects_.empty()) { - p = objects_.back(); + p = std::move(objects_.back()); objects_.pop_back(); } } - if (!p) p = new T; + if (!p) { + p = std::make_unique(); + } std::weak_ptr> weakPtr = this->shared_from_this(); - return std::shared_ptr(p, [weakPtr](T* ptr) { + return std::shared_ptr(p.release(), [weakPtr](T* ptr) { auto self = weakPtr.lock(); if (self) { std::lock_guard lock(self->mutex_); - self->objects_.push_back(ptr); + self->objects_.push_back(std::unique_ptr(ptr)); } else { delete ptr; } @@ -231,7 +108,6 @@ public: } }; -// Simple Logger enum class LogLevel { TRACE, DEBUG, INFO, WARN, ERROR, FATAL }; class Logger : public NonCopyable @@ -239,7 +115,6 @@ class Logger : public NonCopyable private: static inline LogLevel level_ = LogLevel::INFO; static inline std::unique_ptr file_; - static inline std::mutex mutex_; std::ostringstream stream_; LogLevel msgLevel_; @@ -281,12 +156,11 @@ public: { if (msgLevel_ >= level_) { stream_ << "\n"; - std::lock_guard lock(mutex_); + // std::osyncstream handles synchronized, atomic writes if (file_ && file_->is_open()) { - *file_ << stream_.str(); - file_->flush(); + std::osyncstream(*file_) << stream_.str(); } else { - std::cout << stream_.str(); + std::osyncstream(std::cout) << stream_.str(); } } } @@ -303,39 +177,30 @@ public: static void setLevel(LogLevel level) { level_ = level; } static void setLogFile(const std::string& filename) { - std::lock_guard lock(mutex_); file_ = std::make_unique(filename, std::ios::app); } }; +// C++20 Concurrent Task Queue using jthread and stop_token class ConcurrentTaskQueue : public NonCopyable { private: - std::vector threads_; + std::vector threads_; std::queue> taskQueue_; std::mutex mutex_; - std::condition_variable taskCond_; - std::atomic stop_; + std::condition_variable_any taskCond_; + std::stop_source stopSource_; std::string name_; - /* - * Worker thread function. - * Waits for tasks and executes them. - */ - void workerThread() + void workerThread(std::stop_token token) { - while (true) - { + while (!token.stop_requested()) { std::function task; { std::unique_lock lock(mutex_); - taskCond_.wait(lock, [this] - { - return stop_.load() || !taskQueue_.empty(); - }); + taskCond_.wait(lock, token, [this] { return !taskQueue_.empty(); }); - if (stop_.load() && taskQueue_.empty()) - { + if (token.stop_requested() && taskQueue_.empty()) { return; } @@ -348,36 +213,26 @@ private: public: ConcurrentTaskQueue(size_t threadNum, const std::string& name = "ConcurrentTaskQueue") - : stop_(false), name_(name) + : name_(name) { - for (size_t i = 0; i < threadNum; ++i) - { - threads_.emplace_back(&ConcurrentTaskQueue::workerThread, this); + for (size_t i = 0; i < threadNum; ++i) { + threads_.emplace_back(&ConcurrentTaskQueue::workerThread, this, stopSource_.get_token()); } } ~ConcurrentTaskQueue() { - { - std::lock_guard lock(mutex_); - stop_ = true; - } + stopSource_.request_stop(); taskCond_.notify_all(); - for (auto& t : threads_) - { - if (t.joinable()) t.join(); - } + // std::jthread destructors automatically join } - /* - * Add a task to the queue. - */ template void runTaskInQueue(F&& task) { { std::lock_guard lock(mutex_); - if (stop_) return; + if (stopSource_.stop_requested()) return; taskQueue_.emplace(std::forward(task)); } taskCond_.notify_one(); @@ -405,7 +260,7 @@ public: } }; -// Logging macros +// Logging macros (unchanged) #define LOG_TRACE reactor::Logger(reactor::LogLevel::TRACE) #define LOG_DEBUG reactor::Logger(reactor::LogLevel::DEBUG) #define LOG_INFO reactor::Logger(reactor::LogLevel::INFO) @@ -421,18 +276,13 @@ void hashCombine(std::size_t& seed, const T& value) seed ^= hasher(value) + 0x9e3779b9 + (seed << 6) + (seed >> 2); } +// C++20 string splitting using ranges inline std::vector splitString(const std::string& s, const std::string& delimiter) { std::vector result; - size_t start = 0; - size_t end = s.find(delimiter); - - while (end != std::string::npos) { - result.push_back(s.substr(start, end - start)); - start = end + delimiter.length(); - end = s.find(delimiter, start); + for (const auto& range : std::views::split(s, delimiter)) { + result.emplace_back(range.begin(), range.end()); } - result.push_back(s.substr(start)); return result; } diff --git a/tests/test_utilities.cpp b/tests/test_utilities.cpp index ac04301..41b9830 100644 --- a/tests/test_utilities.cpp +++ b/tests/test_utilities.cpp @@ -5,87 +5,8 @@ #include #include #include - -void test_lock_free_queue() -{ - std::cout << "Testing lock-free queue...\n"; - - reactor::LockFreeQueueTyped queue; - assert(queue.empty()); - - queue.enqueue(1); - queue.enqueue(2); - queue.enqueue(3); - assert(!queue.empty()); - - int val; - assert(queue.dequeue(val) && val == 1); - assert(queue.dequeue(val) && val == 2); - assert(queue.dequeue(val) && val == 3); - assert(queue.empty()); - assert(!queue.dequeue(val)); - - std::cout << "✓ Lock-free queue basic operations passed\n"; -} - -void test_lock_free_queue_concurrent() -{ - std::cout << "Testing lock-free queue concurrency...\n"; - - reactor::LockFreeQueueTyped queue; - constexpr int num_items = 50; // Reduced further - constexpr int num_producers = 1; // Simplified to 1 producer - constexpr int num_consumers = 1; // Simplified to 1 consumer - - std::vector producers; - std::vector consumers; - std::atomic consumed_count{0}; - std::atomic stop_consumers{false}; - - for (int p = 0; p < num_producers; ++p) { - producers.emplace_back([&queue, p, num_items]() { - for (int i = 0; i < num_items; ++i) { - queue.enqueue(p * num_items + i); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - }); - } - - for (int c = 0; c < num_consumers; ++c) { - consumers.emplace_back([&queue, &consumed_count, &stop_consumers]() { - int val; - while (!stop_consumers) { - if (queue.dequeue(val)) { - consumed_count++; - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } - }); - } - - for (auto& p : producers) { - p.join(); - } - - // Wait longer for all items to be consumed - auto start = std::chrono::steady_clock::now(); - while (consumed_count < num_producers * num_items) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - auto now = std::chrono::steady_clock::now(); - if (std::chrono::duration_cast(now - start).count() > 5) { - break; // Timeout after 5 seconds - } - } - - stop_consumers = true; - for (auto& c : consumers) { - c.join(); - } - - assert(consumed_count == num_producers * num_items); - std::cout << "✓ Lock-free queue concurrency passed\n"; -} +#include // For std::latch +#include void test_object_pool() { @@ -94,8 +15,6 @@ void test_object_pool() struct TestObject { int value = 42; - TestObject() { std::cout << "TestObject constructed\n"; } - ~TestObject() { std::cout << "TestObject destructed\n"; } }; auto pool = std::make_shared>(); @@ -141,24 +60,26 @@ void test_concurrent_task_queue() { std::cout << "Testing concurrent task queue...\n"; - reactor::ConcurrentTaskQueue queue(2, "TestQueue"); + reactor::ConcurrentTaskQueue queue(4, "TestQueue"); assert(queue.getName() == "TestQueue"); + constexpr int num_tasks = 50; std::atomic counter{0}; - std::promise all_done; - auto future = all_done.get_future(); + std::latch all_tasks_done(num_tasks); // C++20 latch for synchronization - for (int i = 0; i < 10; ++i) { - queue.runTaskInQueue([&counter, i, &all_done]() { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - if (++counter == 10) { - all_done.set_value(); - } + for (int i = 0; i < num_tasks; ++i) + { + queue.runTaskInQueue([&counter, &all_tasks_done]() + { + counter++; + // Signal that one task is complete + all_tasks_done.count_down(); }); } - future.wait(); - assert(counter == 10); + // Wait for all tasks to complete + all_tasks_done.wait(); + assert(counter.load() == num_tasks); std::cout << "✓ Concurrent task queue passed\n"; } @@ -168,15 +89,15 @@ void test_sync_task() std::cout << "Testing sync task execution...\n"; reactor::ConcurrentTaskQueue queue(1, "SyncTest"); + bool task_executed = false; - // Use shared_ptr to avoid move issues - auto executed_flag = std::make_shared(false); - queue.syncTaskInQueue([executed_flag]() { + queue.syncTaskInQueue([&task_executed]() + { std::this_thread::sleep_for(std::chrono::milliseconds(50)); - *executed_flag = true; + task_executed = true; }); - assert(*executed_flag); + assert(task_executed); std::cout << "✓ Sync task execution passed\n"; } @@ -239,10 +160,8 @@ void test_non_copyable() int main() { - std::cout << "=== Utilities Tests ===\n"; + std::cout << "=== Utilities Tests (C++20 Version) ===\n"; - test_lock_free_queue(); - test_lock_free_queue_concurrent(); test_object_pool(); test_logger(); test_concurrent_task_queue();