perf op
This commit is contained in:
parent
df7b06507d
commit
2e1793998b
382
sockeye.hpp
382
sockeye.hpp
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <sys/epoll.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <netinet/tcp.h>
|
#include <netinet/tcp.h>
|
||||||
@ -7,218 +8,34 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <array>
|
#include <array>
|
||||||
#include <vector>
|
|
||||||
#include <unordered_map>
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
|
||||||
#ifdef __linux__
|
|
||||||
#include <sys/epoll.h>
|
|
||||||
#elif defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__)
|
|
||||||
#include <sys/event.h>
|
|
||||||
#else
|
|
||||||
#error "Unsupported platform - requires Linux or BSD/macOS"
|
|
||||||
#endif
|
|
||||||
|
|
||||||
namespace sockeye {
|
namespace sockeye {
|
||||||
|
|
||||||
class BufferPool {
|
class Socket
|
||||||
public:
|
{
|
||||||
static constexpr size_t BUFFER_SIZE = 24576;
|
|
||||||
|
|
||||||
explicit BufferPool(size_t pool_size = 1024) {
|
|
||||||
buffers_.reserve(pool_size);
|
|
||||||
for (size_t i = 0; i < pool_size; ++i) {
|
|
||||||
buffers_.emplace_back(BUFFER_SIZE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<char>* acquire() {
|
|
||||||
if (buffers_.empty()) {
|
|
||||||
return new std::vector<char>(BUFFER_SIZE);
|
|
||||||
}
|
|
||||||
auto* buffer = new std::vector<char>(std::move(buffers_.back()));
|
|
||||||
buffers_.pop_back();
|
|
||||||
return buffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
void release(std::vector<char>* buffer) {
|
|
||||||
if (!buffer) return;
|
|
||||||
if (buffers_.size() < buffers_.capacity()) {
|
|
||||||
buffer->clear();
|
|
||||||
buffer->resize(BUFFER_SIZE);
|
|
||||||
buffers_.emplace_back(std::move(*buffer));
|
|
||||||
delete buffer;
|
|
||||||
} else {
|
|
||||||
delete buffer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::vector<std::vector<char>> buffers_;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct ClientBuffer {
|
|
||||||
std::vector<char>* read_buffer = nullptr;
|
|
||||||
size_t bytes_read = 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
class SocketBase {
|
|
||||||
public:
|
public:
|
||||||
using ConnectionHandler = std::function<void(int client_fd)>;
|
using ConnectionHandler = std::function<void(int client_fd)>;
|
||||||
using DataHandler = std::function<void(int client_fd, const char* data, size_t len)>;
|
using DataHandler = std::function<void(int client_fd, const char* data, size_t len)>;
|
||||||
using DisconnectHandler = std::function<void(int client_fd)>;
|
using DisconnectHandler = std::function<void(int client_fd)>;
|
||||||
|
|
||||||
explicit SocketBase(uint16_t port = 8080) : port_(port) {}
|
explicit Socket(uint16_t port = 8080) : port_(port) {}
|
||||||
|
|
||||||
virtual ~SocketBase() {
|
|
||||||
cleanup_resources();
|
|
||||||
}
|
|
||||||
|
|
||||||
virtual bool start() = 0;
|
|
||||||
virtual void run() = 0;
|
|
||||||
void stop() { running_ = false; }
|
|
||||||
|
|
||||||
void on_connection(ConnectionHandler handler) { on_connection_ = std::move(handler); }
|
|
||||||
void on_data(DataHandler handler) { on_data_ = std::move(handler); }
|
|
||||||
void on_disconnect(DisconnectHandler handler) { on_disconnect_ = std::move(handler); }
|
|
||||||
|
|
||||||
protected:
|
|
||||||
static constexpr int LISTEN_BACKLOG = SOMAXCONN;
|
|
||||||
|
|
||||||
uint16_t port_;
|
|
||||||
int server_fd_ = -1;
|
|
||||||
bool running_ = true;
|
|
||||||
|
|
||||||
BufferPool buffer_pool_;
|
|
||||||
std::unordered_map<int, ClientBuffer> client_buffers_;
|
|
||||||
|
|
||||||
ConnectionHandler on_connection_;
|
|
||||||
DataHandler on_data_;
|
|
||||||
DisconnectHandler on_disconnect_;
|
|
||||||
|
|
||||||
void cleanup_resources() {
|
|
||||||
for (auto& [fd, client_buffer] : client_buffers_) {
|
|
||||||
buffer_pool_.release(client_buffer.read_buffer);
|
|
||||||
close(fd);
|
|
||||||
}
|
|
||||||
client_buffers_.clear();
|
|
||||||
|
|
||||||
|
~Socket()
|
||||||
|
{
|
||||||
if (server_fd_ != -1) close(server_fd_);
|
if (server_fd_ != -1) close(server_fd_);
|
||||||
}
|
|
||||||
|
|
||||||
bool create_server_socket() {
|
|
||||||
server_fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
|
|
||||||
if (server_fd_ == -1) return false;
|
|
||||||
|
|
||||||
int opt = 1;
|
|
||||||
if (setsockopt(server_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1 ||
|
|
||||||
setsockopt(server_fd_, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == -1 ||
|
|
||||||
setsockopt(server_fd_, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
sockaddr_in addr{};
|
|
||||||
addr.sin_family = AF_INET;
|
|
||||||
addr.sin_addr.s_addr = INADDR_ANY;
|
|
||||||
addr.sin_port = htons(port_);
|
|
||||||
|
|
||||||
return bind(server_fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) != -1 &&
|
|
||||||
listen(server_fd_, LISTEN_BACKLOG) != -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool add_client_buffer(int client_fd) {
|
|
||||||
client_buffers_[client_fd] = {buffer_pool_.acquire(), 0};
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void remove_client_common(int client_fd) {
|
|
||||||
auto it = client_buffers_.find(client_fd);
|
|
||||||
if (it != client_buffers_.end()) {
|
|
||||||
buffer_pool_.release(it->second.read_buffer);
|
|
||||||
client_buffers_.erase(it);
|
|
||||||
}
|
|
||||||
|
|
||||||
close(client_fd);
|
|
||||||
if (on_disconnect_) on_disconnect_(client_fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
void accept_connections() {
|
|
||||||
while (true) {
|
|
||||||
sockaddr_in client_addr{};
|
|
||||||
socklen_t client_len = sizeof(client_addr);
|
|
||||||
|
|
||||||
int client_fd = accept4(server_fd_,
|
|
||||||
reinterpret_cast<sockaddr*>(&client_addr),
|
|
||||||
&client_len,
|
|
||||||
SOCK_NONBLOCK | SOCK_CLOEXEC);
|
|
||||||
|
|
||||||
if (client_fd == -1) {
|
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int opt = 1;
|
|
||||||
setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
|
|
||||||
|
|
||||||
if (add_client_impl(client_fd)) {
|
|
||||||
if (on_connection_) on_connection_(client_fd);
|
|
||||||
} else {
|
|
||||||
close(client_fd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void handle_client_data(int client_fd) {
|
|
||||||
auto it = client_buffers_.find(client_fd);
|
|
||||||
if (it == client_buffers_.end()) {
|
|
||||||
remove_client_impl(client_fd);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto& client_buffer = it->second;
|
|
||||||
while (true) {
|
|
||||||
ssize_t bytes = recv(client_fd,
|
|
||||||
client_buffer.read_buffer->data() + client_buffer.bytes_read,
|
|
||||||
BufferPool::BUFFER_SIZE - client_buffer.bytes_read, 0);
|
|
||||||
|
|
||||||
if (bytes <= 0) {
|
|
||||||
if (bytes == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
|
|
||||||
remove_client_impl(client_fd);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
client_buffer.bytes_read += bytes;
|
|
||||||
|
|
||||||
if (on_data_) {
|
|
||||||
on_data_(client_fd, client_buffer.read_buffer->data(), client_buffer.bytes_read);
|
|
||||||
}
|
|
||||||
|
|
||||||
client_buffer.bytes_read = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
virtual bool add_client_impl(int client_fd) = 0;
|
|
||||||
virtual void remove_client_impl(int client_fd) = 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
#ifdef __linux__
|
|
||||||
|
|
||||||
class EpollSocket : public SocketBase {
|
|
||||||
public:
|
|
||||||
explicit EpollSocket(uint16_t port = 8080) : SocketBase(port) {}
|
|
||||||
|
|
||||||
~EpollSocket() {
|
|
||||||
if (epoll_fd_ != -1) close(epoll_fd_);
|
if (epoll_fd_ != -1) close(epoll_fd_);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool start() override {
|
bool start()
|
||||||
|
{
|
||||||
return create_server_socket() &&
|
return create_server_socket() &&
|
||||||
create_epoll() &&
|
create_epoll() &&
|
||||||
add_server_to_epoll();
|
add_server_to_epoll();
|
||||||
}
|
}
|
||||||
|
|
||||||
void run() override {
|
void run()
|
||||||
|
{
|
||||||
std::array<epoll_event, MAX_EVENTS> events;
|
std::array<epoll_event, MAX_EVENTS> events;
|
||||||
|
|
||||||
while (running_) {
|
while (running_) {
|
||||||
@ -238,126 +55,121 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool add_client(int client_fd) {
|
void stop() { running_ = false; }
|
||||||
|
|
||||||
|
bool add_client(int client_fd)
|
||||||
|
{
|
||||||
epoll_event event{};
|
epoll_event event{};
|
||||||
event.events = EPOLLIN | EPOLLET;
|
event.events = EPOLLIN | EPOLLET;
|
||||||
event.data.fd = client_fd;
|
event.data.fd = client_fd;
|
||||||
|
return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event) != -1;
|
||||||
return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event) != -1 &&
|
|
||||||
add_client_buffer(client_fd);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void remove_client(int client_fd) {
|
void remove_client(int client_fd)
|
||||||
|
{
|
||||||
epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, nullptr);
|
epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||||
remove_client_common(client_fd);
|
close(client_fd);
|
||||||
|
if (on_disconnect_) on_disconnect_(client_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void on_connection(ConnectionHandler handler) { on_connection_ = std::move(handler); }
|
||||||
|
void on_data(DataHandler handler) { on_data_ = std::move(handler); }
|
||||||
|
void on_disconnect(DisconnectHandler handler) { on_disconnect_ = std::move(handler); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static constexpr int MAX_EVENTS = 1024;
|
static constexpr int MAX_EVENTS = 1024;
|
||||||
int epoll_fd_ = -1;
|
static constexpr int BUFFER_SIZE = 8192;
|
||||||
|
|
||||||
bool create_epoll() {
|
uint16_t port_;
|
||||||
|
int server_fd_ = -1;
|
||||||
|
int epoll_fd_ = -1;
|
||||||
|
bool running_ = true;
|
||||||
|
|
||||||
|
ConnectionHandler on_connection_;
|
||||||
|
DataHandler on_data_;
|
||||||
|
DisconnectHandler on_disconnect_;
|
||||||
|
|
||||||
|
bool create_server_socket()
|
||||||
|
{
|
||||||
|
server_fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
|
||||||
|
if (server_fd_ == -1) return false;
|
||||||
|
|
||||||
|
int opt = 1;
|
||||||
|
if (setsockopt(server_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1 ||
|
||||||
|
setsockopt(server_fd_, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == -1 ||
|
||||||
|
setsockopt(server_fd_, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
sockaddr_in addr{};
|
||||||
|
addr.sin_family = AF_INET;
|
||||||
|
addr.sin_addr.s_addr = INADDR_ANY;
|
||||||
|
addr.sin_port = htons(port_);
|
||||||
|
|
||||||
|
return bind(server_fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) != -1 &&
|
||||||
|
listen(server_fd_, SOMAXCONN) != -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool create_epoll()
|
||||||
|
{
|
||||||
epoll_fd_ = epoll_create1(EPOLL_CLOEXEC);
|
epoll_fd_ = epoll_create1(EPOLL_CLOEXEC);
|
||||||
return epoll_fd_ != -1;
|
return epoll_fd_ != -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool add_server_to_epoll() {
|
bool add_server_to_epoll()
|
||||||
|
{
|
||||||
epoll_event event{};
|
epoll_event event{};
|
||||||
event.events = EPOLLIN | EPOLLET;
|
event.events = EPOLLIN | EPOLLET;
|
||||||
event.data.fd = server_fd_;
|
event.data.fd = server_fd_;
|
||||||
return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, server_fd_, &event) != -1;
|
return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, server_fd_, &event) != -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool add_client_impl(int client_fd) override {
|
inline void accept_connections()
|
||||||
return add_client(client_fd);
|
{
|
||||||
|
while (true) {
|
||||||
|
sockaddr_in client_addr{};
|
||||||
|
socklen_t client_len = sizeof(client_addr);
|
||||||
|
|
||||||
|
int client_fd = accept4(server_fd_,
|
||||||
|
reinterpret_cast<sockaddr*>(&client_addr),
|
||||||
|
&client_len,
|
||||||
|
SOCK_NONBLOCK | SOCK_CLOEXEC);
|
||||||
|
|
||||||
|
if (client_fd == -1) {
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
void remove_client_impl(int client_fd) override {
|
int opt = 1;
|
||||||
|
setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
|
||||||
|
|
||||||
|
if (add_client(client_fd)) {
|
||||||
|
if (on_connection_) on_connection_(client_fd);
|
||||||
|
} else {
|
||||||
|
close(client_fd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void handle_client_data(int client_fd)
|
||||||
|
{
|
||||||
|
if (!on_data_) return;
|
||||||
|
|
||||||
|
char buffer[BUFFER_SIZE];
|
||||||
|
while (true) {
|
||||||
|
ssize_t bytes = recv(client_fd, buffer, BUFFER_SIZE, 0);
|
||||||
|
if (bytes > 0) {
|
||||||
|
on_data_(client_fd, buffer, bytes);
|
||||||
|
} else if (bytes == 0) {
|
||||||
|
remove_client(client_fd);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
|
||||||
remove_client(client_fd);
|
remove_client(client_fd);
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
using Socket = EpollSocket;
|
|
||||||
|
|
||||||
#else // BSD/macOS
|
|
||||||
|
|
||||||
class KqueueSocket : public SocketBase {
|
|
||||||
public:
|
|
||||||
explicit KqueueSocket(uint16_t port = 8080) : SocketBase(port) {}
|
|
||||||
|
|
||||||
~KqueueSocket() {
|
|
||||||
if (kqueue_fd_ != -1) close(kqueue_fd_);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool start() override {
|
|
||||||
return create_server_socket() &&
|
|
||||||
create_kqueue() &&
|
|
||||||
add_server_to_kqueue();
|
|
||||||
}
|
|
||||||
|
|
||||||
void run() override {
|
|
||||||
std::array<struct kevent, MAX_EVENTS> events;
|
|
||||||
|
|
||||||
while (running_) {
|
|
||||||
struct timespec timeout{1, 0};
|
|
||||||
int num_events = kevent(kqueue_fd_, nullptr, 0, events.data(), MAX_EVENTS, &timeout);
|
|
||||||
if (num_events == -1) {
|
|
||||||
if (errno == EINTR) continue;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < num_events; ++i) {
|
|
||||||
int fd = static_cast<int>(events[i].ident);
|
|
||||||
if (fd == server_fd_) {
|
|
||||||
accept_connections();
|
|
||||||
} else {
|
|
||||||
handle_client_data(fd);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool add_client(int client_fd) {
|
|
||||||
struct kevent event;
|
|
||||||
EV_SET(&event, client_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, nullptr);
|
|
||||||
|
|
||||||
return kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr) != -1 &&
|
|
||||||
add_client_buffer(client_fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
void remove_client(int client_fd) {
|
|
||||||
struct kevent event;
|
|
||||||
EV_SET(&event, client_fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
|
|
||||||
kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr);
|
|
||||||
remove_client_common(client_fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
static constexpr int MAX_EVENTS = 1024;
|
|
||||||
int kqueue_fd_ = -1;
|
|
||||||
|
|
||||||
bool create_kqueue() {
|
|
||||||
kqueue_fd_ = kqueue();
|
|
||||||
return kqueue_fd_ != -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool add_server_to_kqueue() {
|
|
||||||
struct kevent event;
|
|
||||||
EV_SET(&event, server_fd_, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, nullptr);
|
|
||||||
return kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr) != -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool add_client_impl(int client_fd) override {
|
|
||||||
return add_client(client_fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
void remove_client_impl(int client_fd) override {
|
|
||||||
remove_client(client_fd);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
using Socket = KqueueSocket;
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
} // namespace sockeye
|
} // namespace sockeye
|
||||||
|
Loading…
x
Reference in New Issue
Block a user