udp compressor middleware
This commit is contained in:
parent
9b3f113716
commit
5f486173f7
166
internal/udp/middleware/compressor.go
Normal file
166
internal/udp/middleware/compressor.go
Normal file
@ -0,0 +1,166 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/flate"
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// CompressorConfig holds configuration for compression
|
||||
type CompressorConfig struct {
|
||||
Level int // Compression level (1-9, flate.BestSpeed to flate.BestCompression)
|
||||
MinSize int // Minimum packet size to compress
|
||||
CompressionMarker byte // Byte marker to identify compressed packets
|
||||
}
|
||||
|
||||
// DefaultCompressorConfig returns default compressor configuration
|
||||
func DefaultCompressorConfig() *CompressorConfig {
|
||||
return &CompressorConfig{
|
||||
Level: flate.DefaultCompression,
|
||||
MinSize: 128,
|
||||
CompressionMarker: 0x5A, // Same as original EQ2 implementation
|
||||
}
|
||||
}
|
||||
|
||||
// Compressor implements compression middleware using DEFLATE
|
||||
type Compressor struct {
|
||||
config *CompressorConfig
|
||||
writerPool sync.Pool
|
||||
readerPool sync.Pool
|
||||
bufferPool sync.Pool
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
// NewCompressor creates a new compression middleware
|
||||
func NewCompressor(config *CompressorConfig) *Compressor {
|
||||
if config == nil {
|
||||
config = DefaultCompressorConfig()
|
||||
}
|
||||
|
||||
c := &Compressor{
|
||||
config: config,
|
||||
}
|
||||
|
||||
// Initialize writer pool
|
||||
c.writerPool.New = func() interface{} {
|
||||
writer, _ := flate.NewWriter(nil, config.Level)
|
||||
return writer
|
||||
}
|
||||
|
||||
// Initialize reader pool
|
||||
c.readerPool.New = func() interface{} {
|
||||
return flate.NewReader(nil)
|
||||
}
|
||||
|
||||
// Initialize buffer pool
|
||||
c.bufferPool.New = func() interface{} {
|
||||
return &bytes.Buffer{}
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// ProcessOutbound implements Middleware.ProcessOutbound
|
||||
func (c *Compressor) ProcessOutbound(data []byte, next func([]byte) (int, error)) (int, error) {
|
||||
if len(data) < c.config.MinSize {
|
||||
return next(data)
|
||||
}
|
||||
|
||||
compressed, err := c.compress(data)
|
||||
if err != nil || len(compressed) >= len(data) {
|
||||
// Compression failed or didn't help - send original
|
||||
return next(data)
|
||||
}
|
||||
|
||||
return next(compressed)
|
||||
}
|
||||
|
||||
// ProcessInbound implements Middleware.ProcessInbound
|
||||
func (c *Compressor) ProcessInbound(data []byte, next func([]byte) (int, error)) (int, error) {
|
||||
if len(data) < 2 {
|
||||
return next(data)
|
||||
}
|
||||
|
||||
// Check for compression marker
|
||||
if data[0] != c.config.CompressionMarker {
|
||||
return next(data)
|
||||
}
|
||||
|
||||
decompressed, err := c.decompress(data[1:])
|
||||
if err != nil {
|
||||
// Decompression failed - pass through original
|
||||
return next(data)
|
||||
}
|
||||
|
||||
return next(decompressed)
|
||||
}
|
||||
|
||||
func (c *Compressor) compress(data []byte) ([]byte, error) {
|
||||
// Get buffer from pool
|
||||
buf := c.bufferPool.Get().(*bytes.Buffer)
|
||||
defer c.bufferPool.Put(buf)
|
||||
buf.Reset()
|
||||
|
||||
// Write compression marker
|
||||
buf.WriteByte(c.config.CompressionMarker)
|
||||
|
||||
// Get writer from pool
|
||||
writer := c.writerPool.Get().(*flate.Writer)
|
||||
defer c.writerPool.Put(writer)
|
||||
|
||||
writer.Reset(buf)
|
||||
|
||||
// Compress data
|
||||
if _, err := writer.Write(data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return copy of compressed data
|
||||
result := make([]byte, buf.Len())
|
||||
copy(result, buf.Bytes())
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *Compressor) decompress(data []byte) ([]byte, error) {
|
||||
// Get reader from pool
|
||||
reader := c.readerPool.Get().(io.ReadCloser)
|
||||
defer c.readerPool.Put(reader)
|
||||
|
||||
// Reset reader with new data
|
||||
if resetter, ok := reader.(flate.Resetter); ok {
|
||||
resetter.Reset(bytes.NewReader(data), nil)
|
||||
} else {
|
||||
reader.Close()
|
||||
reader = flate.NewReader(bytes.NewReader(data))
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
// Get buffer from pool
|
||||
buf := c.bufferPool.Get().(*bytes.Buffer)
|
||||
defer c.bufferPool.Put(buf)
|
||||
buf.Reset()
|
||||
|
||||
// Decompress data
|
||||
if _, err := io.Copy(buf, reader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return copy of decompressed data
|
||||
result := make([]byte, buf.Len())
|
||||
copy(result, buf.Bytes())
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Close implements Middleware.Close
|
||||
func (c *Compressor) Close() error {
|
||||
c.closeOnce.Do(func() {
|
||||
// Close any active readers/writers in pools
|
||||
// Note: flate writers/readers don't need explicit cleanup
|
||||
})
|
||||
return nil
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user