1
0

compat pass 3

This commit is contained in:
Sky Johnson 2025-09-03 13:50:03 -05:00
parent c692b25ac0
commit d70daa98be
4 changed files with 513 additions and 690 deletions

View File

@ -9,8 +9,7 @@ import (
// DefaultOpcodeSize is the default opcode size for application packets
var DefaultOpcodeSize uint8 = 2
// AppPacket handles high-level game opcodes and application data
// This is the main packet type used by game logic
// AppPacket handles high-level game opcodes (matches EQApplicationPacket)
type AppPacket struct {
*Packet
@ -29,7 +28,7 @@ func NewAppPacket(manager opcodes.Manager) *AppPacket {
}
}
// NewAppPacketWithOp creates a new application packet with opcode
// NewAppPacketWithOp creates with opcode
func NewAppPacketWithOp(op opcodes.EmuOpcode, manager opcodes.Manager) *AppPacket {
app := &AppPacket{
Packet: NewPacket(0, nil),
@ -40,7 +39,7 @@ func NewAppPacketWithOp(op opcodes.EmuOpcode, manager opcodes.Manager) *AppPacke
return app
}
// NewAppPacketWithSize creates a new application packet with opcode and size
// NewAppPacketWithSize creates with opcode and size
func NewAppPacketWithSize(op opcodes.EmuOpcode, size uint32, manager opcodes.Manager) *AppPacket {
app := &AppPacket{
Packet: NewPacket(0, make([]byte, size)),
@ -51,7 +50,7 @@ func NewAppPacketWithSize(op opcodes.EmuOpcode, size uint32, manager opcodes.Man
return app
}
// NewAppPacketWithData creates a new application packet with opcode and data
// NewAppPacketWithData creates with opcode and data
func NewAppPacketWithData(op opcodes.EmuOpcode, data []byte, manager opcodes.Manager) *AppPacket {
app := &AppPacket{
Packet: NewPacket(0, data),
@ -62,8 +61,7 @@ func NewAppPacketWithData(op opcodes.EmuOpcode, data []byte, manager opcodes.Man
return app
}
// NewAppPacketFromRaw creates app packet from raw buffer (used by ProtoPacket)
// Assumes first bytes are opcode based on opcodeSize
// NewAppPacketFromRaw creates from raw buffer (matches EQApplicationPacket constructor)
func NewAppPacketFromRaw(buf []byte, opcodeSize uint8, manager opcodes.Manager) *AppPacket {
if opcodeSize == 0 {
opcodeSize = DefaultOpcodeSize
@ -75,23 +73,28 @@ func NewAppPacketFromRaw(buf []byte, opcodeSize uint8, manager opcodes.Manager)
manager: manager,
}
offset := 0
// Extract opcode based on size
if opcodeSize == 1 && len(buf) >= 1 {
app.Opcode = uint16(buf[0])
if len(buf) > 1 {
app.Buffer = make([]byte, len(buf)-1)
copy(app.Buffer, buf[1:])
}
offset = 1
} else if len(buf) >= 2 {
app.Opcode = binary.BigEndian.Uint16(buf[:2])
if len(buf) > 2 {
app.Buffer = make([]byte, len(buf)-2)
copy(app.Buffer, buf[2:])
}
offset = 2
}
// Copy remaining data as payload
if len(buf) > offset {
app.Buffer = make([]byte, len(buf)-offset)
copy(app.Buffer, buf[offset:])
}
// Convert EQ opcode to emulator opcode
if app.manager != nil {
app.emuOpcode = app.manager.EQToEmu(app.Opcode)
} else {
app.emuOpcode = opcodes.EmuOpcode(app.Opcode)
}
return app
@ -99,21 +102,12 @@ func NewAppPacketFromRaw(buf []byte, opcodeSize uint8, manager opcodes.Manager)
// Size returns total packet size including opcode
func (a *AppPacket) Size() uint32 {
return uint32(len(a.Buffer)) + uint32(a.opcodeSize)
}
// SetOpcodeSize sets the opcode size
func (a *AppPacket) SetOpcodeSize(size uint8) {
a.opcodeSize = size
}
// SetManager sets the opcode manager for translation
func (a *AppPacket) SetManager(manager opcodes.Manager) {
a.manager = manager
// Re-translate if we have an opcode
if a.emuOpcode != opcodes.OP_Unknown && a.manager != nil {
a.Opcode = a.manager.EmuToEQ(a.emuOpcode)
// Handle special encoding where low byte = 0x00 needs extra byte
extraBytes := uint32(0)
if a.opcodeSize == 2 && (a.Opcode&0x00ff) == 0 {
extraBytes = 1
}
return uint32(len(a.Buffer)) + uint32(a.opcodeSize) + extraBytes
}
// SetOpcode sets the emulator opcode and translates to EQ opcode
@ -122,15 +116,13 @@ func (a *AppPacket) SetOpcode(op opcodes.EmuOpcode) {
if a.manager != nil {
a.Opcode = a.manager.EmuToEQ(op)
} else {
// Fallback to direct assignment if no manager
a.Opcode = uint16(op)
}
}
// GetOpcode returns the emulator opcode (with caching)
// GetOpcode returns the emulator opcode
func (a *AppPacket) GetOpcode() opcodes.EmuOpcode {
if a.emuOpcode == opcodes.OP_Unknown && a.manager != nil {
// Convert from protocol opcode
a.emuOpcode = a.manager.EQToEmu(a.Opcode)
}
return a.emuOpcode
@ -144,20 +136,16 @@ func (a *AppPacket) GetOpcodeName() string {
return "OP_Unknown"
}
// Copy creates a deep copy of this application packet
func (a *AppPacket) Copy() *AppPacket {
newApp := &AppPacket{
Packet: NewPacket(a.Opcode, a.Buffer),
emuOpcode: a.emuOpcode,
opcodeSize: a.opcodeSize,
manager: a.manager,
// SetManager sets the opcode manager
func (a *AppPacket) SetManager(manager opcodes.Manager) {
a.manager = manager
// Re-translate if we have an opcode
if a.emuOpcode != opcodes.OP_Unknown && a.manager != nil {
a.Opcode = a.manager.EmuToEQ(a.emuOpcode)
}
newApp.Packet.CopyInfo(a.Packet)
return newApp
}
// Serialize writes the application packet to a destination buffer
// Handles special opcode encoding rules for application-level packets
// Serialize writes to destination (matches EQApplicationPacket::serialize)
func (a *AppPacket) Serialize(dest []byte) uint32 {
opcodeBytes := a.opcodeSize
pos := 0
@ -187,84 +175,34 @@ func (a *AppPacket) Serialize(dest []byte) uint32 {
return uint32(len(a.Buffer)) + uint32(opcodeBytes)
}
// Combine combines this packet with another application packet
// Combine combines with another app packet (matches EQApplicationPacket::combine)
func (a *AppPacket) Combine(rhs *AppPacket) bool {
const opAppCombined = 0x19 // OP_AppCombined
// Check if we can combine these packets
totalSize := a.Size() + rhs.Size()
if totalSize > 512 { // Reasonable max combined packet size
return false
}
// If this isn't already a combined packet, convert it
if a.Opcode != opAppCombined {
// Create combined packet structure
oldSize := a.Size()
newSize := oldSize + rhs.Size() + 2 // +2 for size headers
newBuffer := make([]byte, newSize)
pos := 0
// Write first packet size and data
newBuffer[pos] = byte(oldSize)
pos++
// Serialize first packet
tmpBuf := make([]byte, oldSize)
a.Serialize(tmpBuf)
copy(newBuffer[pos:], tmpBuf)
pos += int(oldSize)
// Write second packet size and data
rhsSize := rhs.Size()
newBuffer[pos] = byte(rhsSize)
pos++
// Serialize second packet
tmpBuf = make([]byte, rhsSize)
rhs.Serialize(tmpBuf)
copy(newBuffer[pos:], tmpBuf)
// Update this packet to be a combined packet
a.Opcode = opAppCombined
a.Buffer = newBuffer
return true
}
// This is already a combined packet, add to it
rhsSize := rhs.Size()
if rhsSize >= 255 {
// Oversized packet handling
newBuffer := make([]byte, len(a.Buffer)+int(rhsSize)+3)
copy(newBuffer, a.Buffer)
pos := len(a.Buffer)
newBuffer[pos] = 255 // Oversized marker
pos++
binary.BigEndian.PutUint16(newBuffer[pos:], uint16(rhsSize))
pos += 2
tmpBuf := make([]byte, rhsSize)
rhs.Serialize(tmpBuf)
copy(newBuffer[pos:], tmpBuf)
a.Buffer = newBuffer
} else {
// Normal sized addition
newBuffer := make([]byte, len(a.Buffer)+int(rhsSize)+1)
copy(newBuffer, a.Buffer)
pos := len(a.Buffer)
newBuffer[pos] = byte(rhsSize)
pos++
tmpBuf := make([]byte, rhsSize)
rhs.Serialize(tmpBuf)
copy(newBuffer[pos:], tmpBuf)
a.Buffer = newBuffer
}
return true
// Application packet combining is not implemented in original
// Use protocol-level combining instead
return false
}
// Copy creates a deep copy
func (a *AppPacket) Copy() *AppPacket {
newApp := &AppPacket{
Packet: NewPacket(a.Opcode, a.Buffer),
emuOpcode: a.emuOpcode,
opcodeSize: a.opcodeSize,
manager: a.manager,
}
newApp.Packet.CopyInfo(a.Packet)
return newApp
}
// ToProto converts to protocol packet
func (a *AppPacket) ToProto() *ProtoPacket {
// Serialize the application packet
tmpBuf := make([]byte, a.Size())
a.Serialize(tmpBuf)
proto := NewProtoPacket(opcodes.OP_Packet, tmpBuf, a.manager)
proto.LoginOp = a.emuOpcode
proto.CopyInfo(a.Packet)
return proto
}

View File

@ -5,33 +5,42 @@ import (
"compress/zlib"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"git.sharkk.net/EQ2/Protocol/crypto"
)
// ValidateCRC validates packet CRC using EQ's CRC32 implementation
// ValidateCRC validates packet CRC using EQ2's custom CRC16
func ValidateCRC(buffer []byte, key uint32) bool {
if len(buffer) < 4 {
if len(buffer) < 2 {
return false
}
// Extract CRC from last 4 bytes
packetCRC := binary.BigEndian.Uint32(buffer[len(buffer)-4:])
// Extract CRC from last 2 bytes (EQ2 uses CRC16)
packetCRC := binary.BigEndian.Uint16(buffer[len(buffer)-2:])
// Calculate CRC on data portion (excluding CRC bytes)
data := buffer[:len(buffer)-4]
calculatedCRC := CalculateCRC(data, key)
data := buffer[:len(buffer)-2]
calculatedCRC := crypto.CalculateCRC(data, key)
return packetCRC == calculatedCRC
}
// CalculateCRC calculates CRC32 for packet data
func CalculateCRC(data []byte, key uint32) uint32 {
// EQ uses standard CRC32 with XOR key
crc := crc32.ChecksumIEEE(data)
return crc ^ key
// AppendCRC appends CRC16 to packet buffer using EQ2's custom CRC
func AppendCRC(buffer []byte, key uint32) []byte {
crc := crypto.CalculateCRC(buffer, key)
result := make([]byte, len(buffer)+2)
copy(result, buffer)
binary.BigEndian.PutUint16(result[len(buffer):], crc)
return result
}
// StripCRC removes CRC16 from packet buffer
func StripCRC(buffer []byte) []byte {
if len(buffer) < 2 {
return buffer
}
return buffer[:len(buffer)-2]
}
// Compress compresses packet data using zlib (matches EQ compression)
@ -42,16 +51,13 @@ func Compress(src []byte) ([]byte, error) {
var buf bytes.Buffer
// EQ uses default compression level
w := zlib.NewWriter(&buf)
// Write uncompressed length first (4 bytes) - EQ protocol requirement
uncompressedLen := uint32(len(src))
if err := binary.Write(&buf, binary.BigEndian, uncompressedLen); err != nil {
if err := binary.Write(&buf, binary.BigEndian, uint32(len(src))); err != nil {
return nil, err
}
// Compress the data
w := zlib.NewWriter(&buf)
if _, err := w.Write(src); err != nil {
w.Close()
return nil, err
@ -73,7 +79,7 @@ func Decompress(src []byte) ([]byte, error) {
// Read uncompressed length (first 4 bytes)
uncompressedLen := binary.BigEndian.Uint32(src[:4])
// Sanity check - prevent decompression bombs
// Sanity check
if uncompressedLen > MaxPacketSize {
return nil, fmt.Errorf("uncompressed size %d exceeds max packet size", uncompressedLen)
}
@ -95,16 +101,13 @@ func Decompress(src []byte) ([]byte, error) {
}
// ChatEncode encodes chat data using EQ's XOR-based encoding
// EQ uses a simple rotating XOR with the encode key
func ChatEncode(buffer []byte, encodeKey int) {
if len(buffer) == 0 || encodeKey == 0 {
return
}
// EQ chat encoding algorithm
key := byte(encodeKey & 0xFF)
for i := range buffer {
// XOR with rotating key based on position
buffer[i] ^= key
// Rotate key for next byte
key = ((key << 1) | (key >> 7)) & 0xFF
@ -115,23 +118,38 @@ func ChatEncode(buffer []byte, encodeKey int) {
}
}
// ChatDecode decodes chat data using EQ's XOR-based encoding
// Decoding is the same as encoding for XOR
// ChatDecode decodes chat data (XOR is symmetric)
func ChatDecode(buffer []byte, decodeKey int) {
// XOR encoding is symmetric - encode and decode are the same operation
ChatEncode(buffer, decodeKey)
}
// IsChatPacket checks if opcode is a chat-related packet
func IsChatPacket(opcode uint16) bool {
chatOpcodes := map[uint16]bool{
0x0300: true, // OP_ChatMsg
0x0302: true, // OP_TellMsg
0x0307: true, // OP_ChatLeaveChannelMsg
0x0308: true, // OP_ChatTellChannelMsg
0x0309: true, // OP_ChatTellUserMsg
0x0e07: true, // OP_GuildsayMsg
}
return chatOpcodes[opcode]
}
// longToIP converts uint32 IP to string
func longToIP(ip uint32) string {
return fmt.Sprintf("%d.%d.%d.%d",
byte(ip>>24), byte(ip>>16), byte(ip>>8), byte(ip))
}
// IsProtocolPacket checks if buffer contains a valid protocol packet
func IsProtocolPacket(buffer []byte, trimCRC bool) bool {
func IsProtocolPacket(buffer []byte) bool {
if len(buffer) < 2 {
return false
}
// Check for valid protocol opcodes
opcode := binary.BigEndian.Uint16(buffer[:2])
// Protocol opcodes from protocol.go
validOpcodes := map[uint16]bool{
0x0001: true, // OP_SessionRequest
0x0002: true, // OP_SessionResponse
@ -148,95 +166,5 @@ func IsProtocolPacket(buffer []byte, trimCRC bool) bool {
0x001e: true, // OP_OutOfSession
}
if !validOpcodes[opcode] {
return false
}
// If checking CRC, validate it
if trimCRC && len(buffer) >= 6 {
// Protocol packets have 2-byte opcode + data + 4-byte CRC
return ValidateCRC(buffer, 0)
}
return true
}
// EncodePacket applies encoding/compression based on flags
func EncodePacket(packet *ProtoPacket, compressThreshold int, encodeKey int) error {
// Apply compression if packet is large enough
if len(packet.Buffer) > compressThreshold && !packet.IsCompressed() {
compressed, err := Compress(packet.Buffer)
if err != nil {
return err
}
packet.Buffer = compressed
packet.SetCompressed(true)
}
// Apply chat encoding if this is a chat packet
if IsChatPacket(packet.Opcode) && encodeKey != 0 {
ChatEncode(packet.Buffer, encodeKey)
packet.SetEncrypted(true)
}
return nil
}
// DecodePacket reverses encoding/compression
func DecodePacket(packet *ProtoPacket, decodeKey int) error {
// Decrypt if encrypted
if packet.IsEncrypted() && decodeKey != 0 {
ChatDecode(packet.Buffer, decodeKey)
packet.SetEncrypted(false)
}
// Decompress if compressed
if packet.IsCompressed() {
decompressed, err := Decompress(packet.Buffer)
if err != nil {
return err
}
packet.Buffer = decompressed
packet.SetCompressed(false)
}
return nil
}
// IsChatPacket checks if opcode is a chat-related packet
func IsChatPacket(opcode uint16) bool {
// Chat-related opcodes that need encoding
// These would map to OP_ChatMsg, OP_TellMsg, etc in the opcodes package
chatOpcodes := map[uint16]bool{
0x0300: true, // OP_ChatMsg
0x0302: true, // OP_TellMsg
0x0307: true, // OP_ChatLeaveChannelMsg
0x0308: true, // OP_ChatTellChannelMsg
0x0309: true, // OP_ChatTellUserMsg
0x0e07: true, // OP_GuildsayMsg
}
return chatOpcodes[opcode]
}
// Helper function to convert uint32 IP to string
func longToIP(ip uint32) string {
return fmt.Sprintf("%d.%d.%d.%d",
byte(ip>>24), byte(ip>>16), byte(ip>>8), byte(ip))
}
// AppendCRC appends CRC16 to packet buffer using EQ2's custom CRC
func AppendCRC(buffer []byte, key uint32) []byte {
crc := crypto.CalculateCRC(buffer, key)
result := make([]byte, len(buffer)+2)
copy(result, buffer)
binary.BigEndian.PutUint16(result[len(buffer):], crc)
return result
}
// StripCRC removes CRC16 from packet buffer
func StripCRC(buffer []byte) []byte {
if len(buffer) < 2 {
return buffer
}
return buffer[:len(buffer)-2]
return validOpcodes[opcode]
}

View File

@ -7,12 +7,15 @@ import (
"git.sharkk.net/EQ2/Protocol/opcodes"
)
// ProtoPacket handles low-level protocol features including EQ2-specific operations
// ProtoPacket handles low-level protocol features (matches EQProtocolPacket/EQ2Packet)
type ProtoPacket struct {
*Packet
// Protocol state flags (using bitfield)
flags uint8 // bit 0: compressed, bit 1: prepared, bit 2: encrypted, bit 3: acked
// Protocol state flags
eq2Compressed bool
packetPrepared bool
packetEncrypted bool
acked bool
// EQ2-specific
LoginOp opcodes.EmuOpcode
@ -30,15 +33,6 @@ type ProtoPacket struct {
EncodeKey int
}
// Protocol flag constants
const (
FlagCompressed = 1 << iota
FlagPrepared
FlagEncrypted
FlagAcked
)
// Default compression threshold
const DefaultCompressThreshold = 100
// NewProtoPacket creates a protocol packet with opcode and buffer
@ -50,7 +44,7 @@ func NewProtoPacket(op uint16, buf []byte, manager opcodes.Manager) *ProtoPacket
}
}
// NewProtoPacketFromRaw creates a protocol packet from raw buffer
// NewProtoPacketFromRaw creates from raw buffer (matches EQProtocolPacket constructor)
func NewProtoPacketFromRaw(buf []byte, opcodeOverride int, manager opcodes.Manager) *ProtoPacket {
var offset uint32
var opcode uint16
@ -78,7 +72,6 @@ func NewProtoPacketFromRaw(buf []byte, opcodeOverride int, manager opcodes.Manag
CompressThreshold: DefaultCompressThreshold,
}
// Convert EQ opcode to emulator opcode if manager available
if pp.manager != nil {
pp.LoginOp = pp.manager.EQToEmu(opcode)
}
@ -86,249 +79,63 @@ func NewProtoPacketFromRaw(buf []byte, opcodeOverride int, manager opcodes.Manag
return pp
}
// SetManager sets the opcode manager for translation
func (p *ProtoPacket) SetManager(manager opcodes.Manager) {
p.manager = manager
}
// IsCompressed returns true if packet is compressed
func (p *ProtoPacket) IsCompressed() bool {
return p.flags&FlagCompressed != 0
}
// SetCompressed sets the compressed flag
func (p *ProtoPacket) SetCompressed(compressed bool) {
if compressed {
p.flags |= FlagCompressed
} else {
p.flags &^= FlagCompressed
}
}
// IsPrepared returns true if packet has been prepared for sending
func (p *ProtoPacket) IsPrepared() bool {
return p.flags&FlagPrepared != 0
}
// SetPrepared sets the prepared flag
func (p *ProtoPacket) SetPrepared(prepared bool) {
if prepared {
p.flags |= FlagPrepared
} else {
p.flags &^= FlagPrepared
}
}
// IsEncrypted returns true if packet is encrypted
func (p *ProtoPacket) IsEncrypted() bool {
return p.flags&FlagEncrypted != 0
}
// SetEncrypted sets the encrypted flag
func (p *ProtoPacket) SetEncrypted(encrypted bool) {
if encrypted {
p.flags |= FlagEncrypted
} else {
p.flags &^= FlagEncrypted
}
}
// IsAcked returns true if packet has been acknowledged
func (p *ProtoPacket) IsAcked() bool {
return p.flags&FlagAcked != 0
}
// SetAcked sets the acknowledged flag
func (p *ProtoPacket) SetAcked(acked bool) {
if acked {
p.flags |= FlagAcked
} else {
p.flags &^= FlagAcked
}
}
// CompressPacket compresses the packet data if needed
func (p *ProtoPacket) CompressPacket() error {
if p.IsCompressed() || len(p.Buffer) <= p.CompressThreshold {
return nil
}
compressed, err := Compress(p.Buffer)
if err != nil {
return err
}
// Only use compression if it actually saves space
if len(compressed) < len(p.Buffer) {
p.Buffer = compressed
p.SetCompressed(true)
}
return nil
}
// DecompressPacket decompresses the packet data
func (p *ProtoPacket) DecompressPacket() error {
if !p.IsCompressed() {
return nil
}
decompressed, err := Decompress(p.Buffer)
if err != nil {
return err
}
p.Buffer = decompressed
p.SetCompressed(false)
return nil
}
// EncodeChat applies chat encoding if this is a chat packet
func (p *ProtoPacket) EncodeChat() {
if p.EncodeKey == 0 || p.IsEncrypted() {
return
}
if p.IsChatPacket() {
ChatEncode(p.Buffer, p.EncodeKey)
p.SetEncrypted(true)
}
}
// DecodeChat reverses chat encoding
func (p *ProtoPacket) DecodeChat() {
if p.EncodeKey == 0 || !p.IsEncrypted() {
return
}
ChatDecode(p.Buffer, p.EncodeKey)
p.SetEncrypted(false)
}
// IsChatPacket checks if this is a chat packet using opcode manager
func (p *ProtoPacket) IsChatPacket() bool {
if p.manager == nil {
return false
}
// Get emulator opcode
emuOp := p.manager.EQToEmu(p.Opcode)
// Check against known chat opcodes
switch emuOp {
case opcodes.OP_ChatMsg,
opcodes.OP_TellMsg,
opcodes.OP_ChatLeaveChannelMsg,
opcodes.OP_ChatTellChannelMsg,
opcodes.OP_ChatTellUserMsg,
opcodes.OP_GuildsayMsg:
return true
default:
return false
}
}
// Copy creates a deep copy of this protocol packet
func (p *ProtoPacket) Copy() *ProtoPacket {
newPacket := &ProtoPacket{
Packet: NewPacket(p.Opcode, p.Buffer),
flags: p.flags,
LoginOp: p.LoginOp,
Sequence: p.Sequence,
SentTime: p.SentTime,
AttemptCount: p.AttemptCount,
manager: p.manager,
CompressThreshold: p.CompressThreshold,
EncodeKey: p.EncodeKey,
}
newPacket.Packet.CopyInfo(p.Packet)
return newPacket
}
// Serialize writes the protocol packet to a destination buffer
func (p *ProtoPacket) Serialize(dest []byte, offset int8) uint32 {
// Apply compression before serialization
p.CompressPacket()
// Apply chat encoding if needed
p.EncodeChat()
// Write compression flag if compressed
pos := 0
if p.IsCompressed() {
dest[pos] = 0x5a // EQ compression flag
pos++
}
// Write opcode (2 bytes)
if p.Opcode > 0xff {
binary.BigEndian.PutUint16(dest[pos:], p.Opcode)
} else {
dest[pos] = 0
dest[pos+1] = byte(p.Opcode)
}
pos += 2
// Copy packet data after opcode
if offset < int8(len(p.Buffer)) {
copy(dest[pos:], p.Buffer[offset:])
}
return uint32(len(p.Buffer)-int(offset)) + uint32(pos)
}
// PreparePacket prepares an EQ2 packet for transmission
// PreparePacket prepares an EQ2 packet for transmission (matches EQ2Packet::PreparePacket)
func (p *ProtoPacket) PreparePacket(maxLen int16) int8 {
if p.IsPrepared() {
if p.packetPrepared {
return 0
}
p.SetPrepared(true)
// Apply compression if needed
if err := p.CompressPacket(); err != nil {
if p.manager == nil {
return -1
}
// Apply chat encoding if needed
p.EncodeChat()
p.packetPrepared = true
// Convert emulator opcode to network opcode using manager
var loginOpcode uint16
if p.manager != nil {
loginOpcode = p.manager.EmuToEQ(p.LoginOp)
} else {
loginOpcode = uint16(p.LoginOp)
}
// Convert emulator opcode to network opcode
loginOpcode := p.manager.EmuToEQ(p.LoginOp)
var offset int8
newSize := len(p.Buffer) + 2 + 1 // sequence(2) + compressed_flag(1)
oversized := false
// Add compression flag space if compressed
if p.IsCompressed() {
newSize++
}
// Handle different opcode sizes and formats
if loginOpcode != 2 {
newSize++ // opcode type byte
if loginOpcode >= 255 {
newSize += 2 // oversized opcode
oversized = true
// Apply compression if needed
if !p.eq2Compressed && len(p.Buffer) > p.CompressThreshold {
compressed, err := Compress(p.Buffer)
if err == nil && len(compressed) < len(p.Buffer) {
p.Buffer = compressed
p.eq2Compressed = true
}
}
// Build new packet buffer
newBuffer := make([]byte, newSize)
ptr := 2 // Skip sequence field
// Build new packet buffer with proper headers
var offset int8
newSize := len(p.Buffer) + 2 // Base size with sequence
// Add compression flag if needed
if p.IsCompressed() {
newBuffer[ptr] = 0x5a // EQ compression flag
// Add compression flag space if compressed
if p.eq2Compressed {
newSize++
}
// Handle opcode encoding (matches C++ implementation)
oversized := false
if loginOpcode != 2 { // Not OP_SessionResponse
if loginOpcode >= 255 {
newSize += 3 // oversized opcode (0xFF + 2 bytes)
oversized = true
} else {
newSize += 2 // normal opcode
}
} else {
newSize++ // single byte for OP_SessionResponse
}
// Build new buffer
newBuffer := make([]byte, newSize)
ptr := 2 // Skip sequence field (filled by stream)
// Add compression flag
if p.eq2Compressed {
newBuffer[ptr] = 0x5a // EQ2 compression flag
ptr++
}
// Encode opcode
if loginOpcode != 2 {
if oversized {
newBuffer[ptr] = 0xff // Oversized marker
@ -348,52 +155,94 @@ func (p *ProtoPacket) PreparePacket(maxLen int16) int8 {
copy(newBuffer[ptr:], p.Buffer)
p.Buffer = newBuffer
offset = int8(newSize - len(p.Buffer) - 1)
offset = int8(ptr - 2) // Return offset past sequence field
return offset
}
// MakeApplicationPacket converts protocol packet to application packet
func (p *ProtoPacket) MakeApplicationPacket(opcodeSize uint8) *AppPacket {
// Decompress if needed
p.DecompressPacket()
// Serialize writes the protocol packet to a destination buffer
func (p *ProtoPacket) Serialize(dest []byte, offset int8) uint32 {
pos := 0
// Decode chat if needed
p.DecodeChat()
if opcodeSize == 0 {
opcodeSize = DefaultOpcodeSize
// Write compression flag if compressed
if p.eq2Compressed {
dest[pos] = 0x5a
pos++
}
app := &AppPacket{
Packet: NewPacket(0, p.Buffer),
opcodeSize: opcodeSize,
manager: p.manager,
}
app.CopyInfo(p.Packet)
// Write opcode (2 bytes)
binary.BigEndian.PutUint16(dest[pos:], p.Opcode)
pos += 2
// Parse opcode from buffer based on size
if opcodeSize == 1 && len(p.Buffer) >= 1 {
app.Opcode = uint16(p.Buffer[0])
app.Buffer = p.Buffer[1:]
} else if len(p.Buffer) >= 2 {
app.Opcode = binary.BigEndian.Uint16(p.Buffer[:2])
app.Buffer = p.Buffer[2:]
// Copy packet data after opcode
if offset < int8(len(p.Buffer)) {
copy(dest[pos:], p.Buffer[offset:])
return uint32(len(p.Buffer)-int(offset)) + uint32(pos)
}
// Convert EQ opcode to emulator opcode if manager available
if app.manager != nil {
app.emuOpcode = app.manager.EQToEmu(app.Opcode)
}
return app
return uint32(pos)
}
// AppCombine combines this packet with another for efficient transmission
// Combine combines this protocol packet with another (matches EQProtocolPacket::combine)
func (p *ProtoPacket) Combine(rhs *ProtoPacket) bool {
const opCombined = 0x03 // OP_Combined
// Case 1: This packet is already combined - append to it
if p.Opcode == opCombined && p.Size()+rhs.Size()+3 < 256 {
newSize := len(p.Buffer) + int(rhs.Size()) + 1
newBuffer := make([]byte, newSize)
// Copy existing combined data
copy(newBuffer, p.Buffer)
offset := len(p.Buffer)
// Add size prefix for new packet
newBuffer[offset] = byte(rhs.Size())
offset++
// Serialize and append new packet
tmpBuf := make([]byte, rhs.Size())
rhs.Serialize(tmpBuf, 0)
copy(newBuffer[offset:], tmpBuf)
p.Buffer = newBuffer
return true
}
// Case 2: Neither packet is combined - create new combined packet
if p.Size()+rhs.Size()+4 < 256 {
totalSize := int(p.Size()) + int(rhs.Size()) + 2
newBuffer := make([]byte, totalSize)
offset := 0
// Add first packet with size prefix
newBuffer[offset] = byte(p.Size())
offset++
tmpBuf := make([]byte, p.Size())
p.Serialize(tmpBuf, 0)
copy(newBuffer[offset:], tmpBuf)
offset += int(p.Size())
// Add second packet with size prefix
newBuffer[offset] = byte(rhs.Size())
offset++
tmpBuf = make([]byte, rhs.Size())
rhs.Serialize(tmpBuf, 0)
copy(newBuffer[offset:], tmpBuf)
// Update buffer and mark as combined
p.Buffer = newBuffer
p.Opcode = opCombined
return true
}
return false
}
// AppCombine combines app-level packets (matches EQ2Packet::AppCombine)
func (p *ProtoPacket) AppCombine(rhs *ProtoPacket) bool {
const opAppCombined = 0x19 // OP_AppCombined
// Calculate sizes
lhsSize := p.Size()
rhsSize := rhs.Size()
@ -402,109 +251,136 @@ func (p *ProtoPacket) AppCombine(rhs *ProtoPacket) bool {
return false
}
// If this packet is already combined, add to it
// If already combined, add to it
if p.Opcode == opAppCombined {
// Calculate new size
tmpSize := rhsSize - 2 // Subtract opcode bytes
var newSize int
if rhsSize >= 255 {
newSize = len(p.Buffer) + int(rhsSize) + 3 // oversized header
if tmpSize >= 255 {
newSize = len(p.Buffer) + int(tmpSize) + 3 // oversized header
} else {
newSize = len(p.Buffer) + int(rhsSize) + 1 // normal header
newSize = len(p.Buffer) + int(tmpSize) + 1 // normal header
}
// Check size limit
if newSize > MaxCombinedSize {
return false
}
// Create new buffer
newBuffer := make([]byte, newSize)
pos := 0
// Copy existing combined data
copy(newBuffer, p.Buffer)
pos = len(p.Buffer)
pos := len(p.Buffer)
// Add new packet with size header
if rhsSize >= 255 {
// Add size header
if tmpSize >= 255 {
newBuffer[pos] = 255 // Oversized marker
pos++
binary.BigEndian.PutUint16(newBuffer[pos:], uint16(rhsSize))
binary.BigEndian.PutUint16(newBuffer[pos:], uint16(tmpSize))
pos += 2
} else {
newBuffer[pos] = byte(rhsSize)
newBuffer[pos] = byte(tmpSize)
pos++
}
// Serialize rhs packet
tmpBuf := make([]byte, rhsSize)
rhs.Serialize(tmpBuf, 0)
copy(newBuffer[pos:], tmpBuf)
// Serialize rhs packet (skip first 2 bytes - opcode)
if len(rhs.Buffer) > 2 {
copy(newBuffer[pos:], rhs.Buffer[2:])
}
p.Buffer = newBuffer
return true
}
// Create new combined packet from two non-combined packets
// Calculate total size: size1(1-3) + packet1 + size2(1-3) + packet2
var totalSize int
if lhsSize >= 255 {
totalSize += 3 + int(lhsSize)
// Create new combined packet
lSize := lhsSize - 2
rSize := rhsSize - 2
totalSize := 0
if lSize >= 255 {
totalSize += 3 + int(lSize)
} else {
totalSize += 1 + int(lhsSize)
}
if rhsSize >= 255 {
totalSize += 3 + int(rhsSize)
} else {
totalSize += 1 + int(rhsSize)
totalSize += 1 + int(lSize)
}
if rSize >= 255 {
totalSize += 3 + int(rSize)
} else {
totalSize += 1 + int(rSize)
}
// Check size limit
if totalSize > MaxCombinedSize {
return false
}
// Build combined packet
newBuffer := make([]byte, totalSize)
pos := 0
// Add first packet with size header
if lhsSize >= 255 {
// Add first packet
if lSize >= 255 {
newBuffer[pos] = 255
pos++
binary.BigEndian.PutUint16(newBuffer[pos:], uint16(lhsSize))
binary.BigEndian.PutUint16(newBuffer[pos:], uint16(lSize))
pos += 2
} else {
newBuffer[pos] = byte(lhsSize)
newBuffer[pos] = byte(lSize)
pos++
}
if len(p.Buffer) > 2 {
copy(newBuffer[pos:], p.Buffer[2:])
pos += int(lSize)
}
// Serialize first packet
tmpBuf := make([]byte, lhsSize)
p.Serialize(tmpBuf, 0)
copy(newBuffer[pos:], tmpBuf)
pos += int(lhsSize)
// Add second packet with size header
if rhsSize >= 255 {
// Add second packet
if rSize >= 255 {
newBuffer[pos] = 255
pos++
binary.BigEndian.PutUint16(newBuffer[pos:], uint16(rhsSize))
binary.BigEndian.PutUint16(newBuffer[pos:], uint16(rSize))
pos += 2
} else {
newBuffer[pos] = byte(rhsSize)
newBuffer[pos] = byte(rSize)
pos++
}
if len(rhs.Buffer) > 2 {
copy(newBuffer[pos:], rhs.Buffer[2:])
}
// Serialize second packet
tmpBuf = make([]byte, rhsSize)
rhs.Serialize(tmpBuf, 0)
copy(newBuffer[pos:], tmpBuf)
// Update this packet to be combined
p.Opcode = opAppCombined
p.Buffer = newBuffer
p.SetPrepared(false) // Need to re-prepare
p.packetPrepared = false
return true
}
// MakeApplicationPacket converts to app packet (matches EQProtocolPacket::MakeApplicationPacket)
func (p *ProtoPacket) MakeApplicationPacket(opcodeSize uint8) *AppPacket {
// Decompress if needed
if p.eq2Compressed {
if decompressed, err := Decompress(p.Buffer); err == nil {
p.Buffer = decompressed
p.eq2Compressed = false
}
}
// Decode chat if needed
if p.packetEncrypted && p.EncodeKey != 0 {
ChatDecode(p.Buffer, p.EncodeKey)
p.packetEncrypted = false
}
return NewAppPacketFromRaw(p.Buffer, opcodeSize, p.manager)
}
// Copy creates a deep copy
func (p *ProtoPacket) Copy() *ProtoPacket {
newPacket := &ProtoPacket{
Packet: NewPacket(p.Opcode, p.Buffer),
eq2Compressed: p.eq2Compressed,
packetPrepared: p.packetPrepared,
packetEncrypted: p.packetEncrypted,
acked: p.acked,
LoginOp: p.LoginOp,
Sequence: p.Sequence,
SentTime: p.SentTime,
AttemptCount: p.AttemptCount,
manager: p.manager,
CompressThreshold: p.CompressThreshold,
EncodeKey: p.EncodeKey,
}
newPacket.Packet.CopyInfo(p.Packet)
return newPacket
}

View File

@ -13,7 +13,7 @@ import (
"github.com/panjf2000/gnet/v2"
)
// Stream implements EQ2's reliable UDP protocol
// Stream implements EQ2's reliable UDP protocol (matches EQStream)
type Stream struct {
conn gnet.Conn
mu sync.RWMutex
@ -41,12 +41,15 @@ type Stream struct {
duplicateAckCnt map[uint16]int
// Fragment assembly with expiry
fragments map[uint16]*fragmentBuffer
nextFragID uint16
fragments map[uint16]*fragmentBuffer
currentFragment *fragmentBuffer
// Out of order handling with expiry
outOfOrder map[uint16]*outOfOrderPacket
// Combined packet for efficient sending
combinedPacket *packets.ProtoPacket
// Packet queues
reliableQueue []*packets.ProtoPacket
unreliableQueue []*packets.ProtoPacket
@ -64,6 +67,7 @@ type Stream struct {
timeoutTimer *time.Timer
retransmitTimer *time.Timer
cleanupTimer *time.Timer
combineTimer *time.Timer
// Retransmission settings
rtt time.Duration
@ -121,6 +125,7 @@ const (
fragmentTimeout = 30 * time.Second
outOfOrderTimeout = 10 * time.Second
duplicateAckThreshold = 3
combineFlushInterval = 10 * time.Millisecond
)
// Config holds stream configuration
@ -188,24 +193,23 @@ func NewStream(conn gnet.Conn, cfg *Config) *Stream {
}
s.retransmitTimer = time.AfterFunc(retransmitInterval, s.processRetransmits)
s.cleanupTimer = time.AfterFunc(cleanupInterval, s.cleanup)
s.combineTimer = time.AfterFunc(combineFlushInterval, s.flushCombined)
return s
}
// Process handles incoming data from gnet
// Process handles incoming data with proper CRC validation
func (s *Stream) Process(data []byte) error {
if len(data) < 2 {
return nil
}
// Check for CRC
// Validate and strip CRC16
if len(data) > 2 {
providedCRC := binary.BigEndian.Uint16(data[len(data)-2:])
calculatedCRC := crypto.CalculateCRC(data[:len(data)-2], s.crcKey)
if providedCRC != calculatedCRC {
if !packets.ValidateCRC(data, s.crcKey) {
return nil
}
data = data[:len(data)-2]
data = packets.StripCRC(data)
}
// Decrypt if needed
@ -248,32 +252,71 @@ func (s *Stream) Process(data []byte) error {
return nil
}
// SendPacket sends an application packet
// SendPacket sends an application packet with proper preparation
func (s *Stream) SendPacket(app *packets.AppPacket) error {
if s.state.Load() != StateEstablished {
return fmt.Errorf("stream not established")
}
// Validate packet size
if app.Size() > uint32(s.maxLen) {
proto := s.appToProto(app)
// Convert to protocol packet
proto := app.ToProto()
proto.CompressThreshold = 100
proto.EncodeKey = s.encodeKey
// Check if packet needs fragmentation
if proto.Size() > uint32(s.maxLen-8) {
return s.sendFragmented(proto)
}
proto := s.appToProto(app)
// Try to combine with pending combined packet
s.mu.Lock()
defer s.mu.Unlock()
if s.combinedPacket != nil {
if s.combinedPacket.AppCombine(proto) {
if s.combinedPacket.Size() > uint32(s.maxLen/2) {
s.flushCombinedPacketLocked()
}
return nil
}
s.flushCombinedPacketLocked()
}
// Queue based on reliability
isUnreliable := s.isUnreliableOpcode(app.GetOpcode())
s.mu.Lock()
if isUnreliable {
s.unreliableQueue = append(s.unreliableQueue, proto)
} else {
s.reliableQueue = append(s.reliableQueue, proto)
// Start new combined packet if small enough
if proto.Size() < uint32(s.maxLen/4) {
s.combinedPacket = proto
s.combineTimer.Reset(combineFlushInterval)
} else {
s.reliableQueue = append(s.reliableQueue, proto)
}
}
s.mu.Unlock()
return s.processQueues()
}
// flushCombined timer callback
func (s *Stream) flushCombined() {
s.mu.Lock()
s.flushCombinedPacketLocked()
s.mu.Unlock()
s.processQueues()
s.combineTimer.Reset(combineFlushInterval)
}
// flushCombinedPacketLocked sends any pending combined packet (must hold lock)
func (s *Stream) flushCombinedPacketLocked() {
if s.combinedPacket != nil {
s.reliableQueue = append(s.reliableQueue, s.combinedPacket)
s.combinedPacket = nil
}
}
// processQueues processes all packet queues
func (s *Stream) processQueues() error {
s.mu.Lock()
@ -340,6 +383,75 @@ func (s *Stream) processQueues() error {
return nil
}
// sendFragmented sends a fragmented packet (matches EQStream::SendPacket)
func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error {
// Prepare packet first
offset := proto.PreparePacket(int16(s.maxLen))
if offset < 0 {
return fmt.Errorf("failed to prepare packet")
}
data := proto.Buffer
length := uint32(len(data))
// First fragment with total length
out := packets.NewProtoPacket(opcodes.OP_Fragment, nil, s.opcodeManager)
out.Buffer = make([]byte, s.maxLen-4)
binary.BigEndian.PutUint32(out.Buffer[:4], length)
used := copy(out.Buffer[4:], data)
out.Buffer = out.Buffer[:4+used]
if err := s.sendReliable(out); err != nil {
return err
}
// Send remaining fragments
pos := used
for pos < int(length) {
chunkSize := int(length) - pos
if chunkSize > int(s.maxLen)-4 {
chunkSize = int(s.maxLen) - 4
}
frag := packets.NewProtoPacket(opcodes.OP_Fragment, data[pos:pos+chunkSize], s.opcodeManager)
if err := s.sendReliable(frag); err != nil {
return err
}
pos += chunkSize
}
return nil
}
// sendReliable sends a packet reliably with sequencing
func (s *Stream) sendReliable(proto *packets.ProtoPacket) error {
s.mu.Lock()
seq := s.seqOut
s.seqOut = s.incrementSequence(s.seqOut)
s.mu.Unlock()
data := make([]byte, proto.Size()+2)
binary.BigEndian.PutUint16(data[:2], seq)
proto.Serialize(data[2:], 0)
pending := &pendingPacket{
packet: proto.Copy(),
seq: seq,
sentTime: time.Now(),
attempts: 1,
nextRetry: time.Now().Add(s.rto),
}
s.mu.Lock()
s.pendingAcks[seq] = pending
s.mu.Unlock()
return s.sendRaw(opcodes.OP_Packet, data)
}
// Helper methods
func (s *Stream) isUnreliableOpcode(emuOp opcodes.EmuOpcode) bool {
switch emuOp {
@ -363,6 +475,7 @@ func (s *Stream) isSequenceAhead(seq, base uint16) bool {
return diff > 0 && diff < 0x8000
}
// Protocol handlers
func (s *Stream) handleSessionRequest(data []byte) error {
if len(data) < 10 {
return fmt.Errorf("session request too small")
@ -430,6 +543,7 @@ func (s *Stream) handlePacket(data []byte) error {
return err
}
// Process any buffered out-of-order packets
for {
if buffered, exists := s.outOfOrder[s.seqExpected]; exists {
delete(s.outOfOrder, s.seqExpected)
@ -443,12 +557,14 @@ func (s *Stream) handlePacket(data []byte) error {
}
}
} else if s.isSequenceAhead(seq, s.seqExpected) {
// Out of order - buffer it
s.outOfOrder[seq] = &outOfOrderPacket{
data: append([]byte(nil), data...),
timestamp: time.Now(),
}
go s.sendOutOfOrderAck(seq)
} else {
// Duplicate packet - just ack it
go s.sendAckImmediate(seq)
}
@ -457,43 +573,46 @@ func (s *Stream) handlePacket(data []byte) error {
}
func (s *Stream) handleFragment(data []byte) error {
if len(data) < 10 {
if len(data) < 2 {
return nil
}
seq := binary.BigEndian.Uint16(data[:2])
fragID := binary.BigEndian.Uint32(data[2:6])
fragTotal := binary.BigEndian.Uint16(data[6:8])
fragCur := binary.BigEndian.Uint16(data[8:10])
data = data[10:]
data = data[2:]
s.mu.Lock()
defer s.mu.Unlock()
frag, exists := s.fragments[uint16(fragID)]
if !exists {
frag = &fragmentBuffer{
totalSize: uint32(fragTotal),
// Check if this is start of new fragment stream
if s.currentFragment == nil && len(data) >= 4 {
totalLen := binary.BigEndian.Uint32(data[:4])
s.currentFragment = &fragmentBuffer{
totalSize: totalLen,
chunks: make(map[uint16][]byte),
startTime: time.Now(),
}
s.fragments[uint16(fragID)] = frag
}
frag.chunks[fragCur] = append([]byte(nil), data...)
frag.received++
if frag.received == uint32(fragTotal) {
complete := make([]byte, 0)
for i := uint16(0); i < fragTotal; i++ {
if chunk, ok := frag.chunks[i]; ok {
complete = append(complete, chunk...)
} else {
return nil
}
if len(data) > 4 {
s.currentFragment.chunks[0] = append([]byte(nil), data[4:]...)
s.currentFragment.received = uint32(len(data) - 4)
}
} else if s.currentFragment != nil {
// Continuation fragment
chunkNum := uint16(len(s.currentFragment.chunks))
s.currentFragment.chunks[chunkNum] = append([]byte(nil), data...)
s.currentFragment.received += uint32(len(data))
// Check if complete
if s.currentFragment.received >= s.currentFragment.totalSize {
complete := make([]byte, 0, s.currentFragment.totalSize)
for i := uint16(0); i < uint16(len(s.currentFragment.chunks)); i++ {
if chunk, ok := s.currentFragment.chunks[i]; ok {
complete = append(complete, chunk...)
}
}
s.currentFragment = nil
return s.processPacketData(complete)
}
delete(s.fragments, uint16(fragID))
return s.processPacketData(complete)
}
s.ackQueue = append(s.ackQueue, seq)
@ -515,6 +634,7 @@ func (s *Stream) handleAck(data []byte) error {
delete(s.pendingAcks, ackSeq)
delete(s.duplicateAckCnt, ackSeq)
// Update RTT
sample := time.Since(pending.sentTime)
if s.rtt == 0 {
s.rtt = sample
@ -533,9 +653,11 @@ func (s *Stream) handleAck(data []byte) error {
s.rto = s.maxRTO
}
} else {
// Duplicate ACK
s.duplicateAckCnt[ackSeq]++
if s.duplicateAckCnt[ackSeq] >= duplicateAckThreshold {
// Fast retransmit
for seq, pending := range s.pendingAcks {
if s.isSequenceAhead(seq, ackSeq) {
s.resendQueue = append(s.resendQueue, pending)
@ -641,83 +763,19 @@ func (s *Stream) handleDisconnect() error {
return nil
}
func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error {
data := make([]byte, proto.Size())
proto.Serialize(data, 0)
fragSize := int(s.maxLen) - 12
numFrags := (len(data) + fragSize - 1) / fragSize
if numFrags > 0xFFFF {
return fmt.Errorf("packet too large to fragment")
}
s.mu.Lock()
fragID := s.nextFragID
s.nextFragID++
s.mu.Unlock()
for i := 0; i < numFrags; i++ {
start := i * fragSize
end := start + fragSize
if end > len(data) {
end = len(data)
}
fragHeader := make([]byte, 10)
binary.BigEndian.PutUint32(fragHeader[0:4], uint32(fragID))
binary.BigEndian.PutUint16(fragHeader[4:6], uint16(numFrags))
binary.BigEndian.PutUint16(fragHeader[6:8], uint16(i))
binary.BigEndian.PutUint16(fragHeader[8:10], uint16(end-start))
fragment := append(fragHeader, data[start:end]...)
fragProto := packets.NewProtoPacket(opcodes.OP_Fragment, fragment, s.opcodeManager)
if err := s.sendReliable(fragProto); err != nil {
return err
}
}
return nil
}
func (s *Stream) sendReliable(proto *packets.ProtoPacket) error {
s.mu.Lock()
seq := s.seqOut
s.seqOut = s.incrementSequence(s.seqOut)
s.mu.Unlock()
data := make([]byte, proto.Size()+2)
binary.BigEndian.PutUint16(data[:2], seq)
proto.Serialize(data[2:], 0)
pending := &pendingPacket{
packet: proto.Copy(),
seq: seq,
sentTime: time.Now(),
attempts: 1,
nextRetry: time.Now().Add(s.rto),
}
s.mu.Lock()
s.pendingAcks[seq] = pending
s.mu.Unlock()
return s.sendRaw(opcodes.OP_Packet, data)
}
// processPacketData processes received packet data
func (s *Stream) processPacketData(data []byte) error {
proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager)
if err := proto.DecompressPacket(); err != nil {
return err
}
app := proto.MakeApplicationPacket(s.opcodeSize)
if s.onPacket != nil {
go s.onPacket(app)
}
return nil
}
// Timer handlers
func (s *Stream) processRetransmits() {
if s.state.Load() != StateEstablished {
return
@ -757,18 +815,21 @@ func (s *Stream) cleanup() {
s.mu.Lock()
now := time.Now()
// Clean up old fragments
for id, frag := range s.fragments {
if now.Sub(frag.startTime) > fragmentTimeout {
delete(s.fragments, id)
}
}
// Clean up old out-of-order packets
for seq, oop := range s.outOfOrder {
if now.Sub(oop.timestamp) > outOfOrderTimeout {
delete(s.outOfOrder, seq)
}
}
// Clean up duplicate ACK counts
for seq := range s.duplicateAckCnt {
if _, exists := s.pendingAcks[seq]; !exists {
delete(s.duplicateAckCnt, seq)
@ -841,14 +902,16 @@ func (s *Stream) handleTimeout() {
}
}
// sendRaw sends raw packet with CRC16
func (s *Stream) sendRaw(opcode uint16, data []byte) error {
packet := make([]byte, 2+len(data)+2)
packet := make([]byte, 2+len(data))
binary.BigEndian.PutUint16(packet[:2], opcode)
copy(packet[2:], data)
crc := crypto.CalculateCRC(packet[:len(packet)-2], s.crcKey)
binary.BigEndian.PutUint16(packet[len(packet)-2:], crc)
// Add CRC16
packet = packets.AppendCRC(packet, s.crcKey)
// Encrypt if needed
if s.cipher != nil {
s.cipher.Encrypt(packet)
}
@ -859,14 +922,6 @@ func (s *Stream) sendRaw(opcode uint16, data []byte) error {
return s.conn.AsyncWrite(packet, nil)
}
func (s *Stream) appToProto(app *packets.AppPacket) *packets.ProtoPacket {
proto := packets.NewProtoPacket(app.Opcode, app.Buffer, s.opcodeManager)
proto.CopyInfo(app.Packet)
proto.CompressThreshold = 100
proto.EncodeKey = s.encodeKey
return proto
}
// Public methods
func (s *Stream) SetPacketCallback(fn func(*packets.AppPacket)) {
s.onPacket = fn
@ -886,7 +941,7 @@ func (s *Stream) IsConnected() bool {
func (s *Stream) SendSessionRequest() error {
data := make([]byte, 10)
binary.BigEndian.PutUint32(data[0:4], 2)
binary.BigEndian.PutUint32(data[0:4], 2) // protocol version
binary.BigEndian.PutUint32(data[4:8], s.sessionID)
binary.BigEndian.PutUint16(data[8:10], s.maxLen)
@ -921,6 +976,7 @@ func (s *Stream) Close() error {
s.state.Store(StateClosed)
s.sendRaw(opcodes.OP_SessionDisconnect, nil)
// Stop all timers
if s.keepAliveTimer != nil {
s.keepAliveTimer.Stop()
}
@ -936,7 +992,11 @@ func (s *Stream) Close() error {
if s.cleanupTimer != nil {
s.cleanupTimer.Stop()
}
if s.combineTimer != nil {
s.combineTimer.Stop()
}
// Clear queues
s.mu.Lock()
s.reliableQueue = nil
s.unreliableQueue = nil
@ -944,11 +1004,32 @@ func (s *Stream) Close() error {
s.pendingAcks = nil
s.fragments = nil
s.outOfOrder = nil
s.combinedPacket = nil
s.currentFragment = nil
s.mu.Unlock()
return nil
}
// GetStats returns stream statistics
func (s *Stream) GetStats() map[string]interface{} {
s.mu.RLock()
defer s.mu.RUnlock()
return map[string]interface{}{
"packets_out": atomic.LoadUint64(&s.packetsOut),
"packets_in": atomic.LoadUint64(&s.packetsIn),
"bytes_out": atomic.LoadUint64(&s.bytesOut),
"bytes_in": atomic.LoadUint64(&s.bytesIn),
"retransmits": atomic.LoadUint64(&s.retransmits),
"pending_acks": len(s.pendingAcks),
"out_of_order": len(s.outOfOrder),
"fragments": len(s.fragments),
"rtt": s.rtt,
"rto": s.rto,
}
}
func absTime(d time.Duration) time.Duration {
if d < 0 {
return -d