#pragma once #include "Utilities.hpp" #include #include #include #include #include #include namespace reactor { class Buffer : public NonCopyable { private: std::vector buffer_; size_t readIndex_; size_t writeIndex_; size_t initialCap_; static constexpr size_t kBufferOffset = 8; public: static constexpr size_t kInitialSize = 1024; explicit Buffer(size_t initialSize = kInitialSize) : buffer_(initialSize + kBufferOffset), readIndex_(kBufferOffset), writeIndex_(kBufferOffset), initialCap_(initialSize) { LOG_TRACE << "Buffer created with size " << initialSize; } ~Buffer() { LOG_TRACE << "Buffer destroyed, had " << readableBytes() << " readable bytes"; } size_t readableBytes() const { return writeIndex_ - readIndex_; } size_t writableBytes() const { return buffer_.size() - writeIndex_; } size_t prependableBytes() const { return readIndex_; } const char* peek() const { return &buffer_[readIndex_]; } char* beginWrite() { return &buffer_[writeIndex_]; } const char* beginWrite() const { return &buffer_[writeIndex_]; } void retrieve(size_t len) { if (len < readableBytes()) { readIndex_ += len; } else { retrieveAll(); } } void retrieveAll() { if (buffer_.size() > (initialCap_ * 2)) { buffer_.resize(initialCap_ + kBufferOffset); LOG_DEBUG << "Buffer shrunk to " << buffer_.size(); } readIndex_ = kBufferOffset; writeIndex_ = kBufferOffset; } void hasWritten(size_t len) { writeIndex_ += len; LOG_TRACE << "Buffer written " << len << " bytes, total readable: " << readableBytes(); } void append(const char* data, size_t len) { ensureWritableBytes(len); std::copy(data, data + len, beginWrite()); hasWritten(len); } void append(const std::string& str) { append(str.data(), str.size()); } void ensureWritableBytes(size_t len) { if (writableBytes() >= len) return; if (readIndex_ + writableBytes() >= (len + kBufferOffset)) { // Move readable data to front std::copy(&buffer_[readIndex_], &buffer_[writeIndex_], &buffer_[kBufferOffset]); writeIndex_ = kBufferOffset + (writeIndex_ - readIndex_); readIndex_ = kBufferOffset; LOG_TRACE << "Buffer compacted, readable bytes: " << readableBytes(); } else { // Grow buffer size_t newLen = std::max(buffer_.size() * 2, kBufferOffset + readableBytes() + len); buffer_.resize(newLen); LOG_DEBUG << "Buffer grown to " << newLen; } } // Network byte order append functions void appendInt8(uint8_t x) { append(reinterpret_cast(&x), sizeof(x)); } void appendInt16(uint16_t x) { uint16_t be = hton16(x); append(reinterpret_cast(&be), sizeof(be)); } void appendInt32(uint32_t x) { uint32_t be = hton32(x); append(reinterpret_cast(&be), sizeof(be)); } void appendInt64(uint64_t x) { uint64_t be = hton64(x); append(reinterpret_cast(&be), sizeof(be)); } // Network byte order read functions uint8_t readInt8() { uint8_t result = peekInt8(); retrieve(sizeof(result)); return result; } uint16_t readInt16() { uint16_t result = peekInt16(); retrieve(sizeof(result)); return result; } uint32_t readInt32() { uint32_t result = peekInt32(); retrieve(sizeof(result)); return result; } uint64_t readInt64() { uint64_t result = peekInt64(); retrieve(sizeof(result)); return result; } // Network byte order peek functions uint8_t peekInt8() const { assert(readableBytes() >= sizeof(uint8_t)); return *reinterpret_cast(peek()); } uint16_t peekInt16() const { assert(readableBytes() >= sizeof(uint16_t)); uint16_t be = *reinterpret_cast(peek()); return ntoh16(be); } uint32_t peekInt32() const { assert(readableBytes() >= sizeof(uint32_t)); uint32_t be = *reinterpret_cast(peek()); return ntoh32(be); } uint64_t peekInt64() const { assert(readableBytes() >= sizeof(uint64_t)); uint64_t be = *reinterpret_cast(peek()); return ntoh64(be); } // Prepend functions for efficient header insertion void prepend(const char* data, size_t len) { assert(len <= prependableBytes()); readIndex_ -= len; std::copy(data, data + len, &buffer_[readIndex_]); } void prependInt8(uint8_t x) { prepend(reinterpret_cast(&x), sizeof(x)); } void prependInt16(uint16_t x) { uint16_t be = hton16(x); prepend(reinterpret_cast(&be), sizeof(be)); } void prependInt32(uint32_t x) { uint32_t be = hton32(x); prepend(reinterpret_cast(&be), sizeof(be)); } void prependInt64(uint64_t x) { uint64_t be = hton64(x); prepend(reinterpret_cast(&be), sizeof(be)); } std::string read(size_t len) { if (len > readableBytes()) len = readableBytes(); std::string result(peek(), len); retrieve(len); return result; } std::string readAll() { return read(readableBytes()); } ssize_t readFd(int fd, int* savedErrno = nullptr) { char extrabuf[65536]; iovec vec[2]; size_t writable = writableBytes(); vec[0].iov_base = beginWrite(); vec[0].iov_len = writable; vec[1].iov_base = extrabuf; vec[1].iov_len = sizeof(extrabuf); const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1; ssize_t n = readv(fd, vec, iovcnt); if (n < 0) { if (savedErrno) *savedErrno = errno; LOG_ERROR << "readFd error: " << strerror(errno); } else if (static_cast(n) <= writable) { writeIndex_ += n; } else { writeIndex_ = buffer_.size(); append(extrabuf, n - writable); } LOG_TRACE << "readFd returned " << n << " bytes, buffer now has " << readableBytes(); return n; } void swap(Buffer& other) noexcept { buffer_.swap(other.buffer_); std::swap(readIndex_, other.readIndex_); std::swap(writeIndex_, other.writeIndex_); std::swap(initialCap_, other.initialCap_); } }; } // namespace reactor