diff --git a/packets/apppacket.go b/packets/apppacket.go index c5c3aab..cd11c0c 100644 --- a/packets/apppacket.go +++ b/packets/apppacket.go @@ -9,8 +9,7 @@ import ( // DefaultOpcodeSize is the default opcode size for application packets var DefaultOpcodeSize uint8 = 2 -// AppPacket handles high-level game opcodes and application data -// This is the main packet type used by game logic +// AppPacket handles high-level game opcodes (matches EQApplicationPacket) type AppPacket struct { *Packet @@ -29,7 +28,7 @@ func NewAppPacket(manager opcodes.Manager) *AppPacket { } } -// NewAppPacketWithOp creates a new application packet with opcode +// NewAppPacketWithOp creates with opcode func NewAppPacketWithOp(op opcodes.EmuOpcode, manager opcodes.Manager) *AppPacket { app := &AppPacket{ Packet: NewPacket(0, nil), @@ -40,7 +39,7 @@ func NewAppPacketWithOp(op opcodes.EmuOpcode, manager opcodes.Manager) *AppPacke return app } -// NewAppPacketWithSize creates a new application packet with opcode and size +// NewAppPacketWithSize creates with opcode and size func NewAppPacketWithSize(op opcodes.EmuOpcode, size uint32, manager opcodes.Manager) *AppPacket { app := &AppPacket{ Packet: NewPacket(0, make([]byte, size)), @@ -51,7 +50,7 @@ func NewAppPacketWithSize(op opcodes.EmuOpcode, size uint32, manager opcodes.Man return app } -// NewAppPacketWithData creates a new application packet with opcode and data +// NewAppPacketWithData creates with opcode and data func NewAppPacketWithData(op opcodes.EmuOpcode, data []byte, manager opcodes.Manager) *AppPacket { app := &AppPacket{ Packet: NewPacket(0, data), @@ -62,8 +61,7 @@ func NewAppPacketWithData(op opcodes.EmuOpcode, data []byte, manager opcodes.Man return app } -// NewAppPacketFromRaw creates app packet from raw buffer (used by ProtoPacket) -// Assumes first bytes are opcode based on opcodeSize +// NewAppPacketFromRaw creates from raw buffer (matches EQApplicationPacket constructor) func NewAppPacketFromRaw(buf []byte, opcodeSize uint8, manager opcodes.Manager) *AppPacket { if opcodeSize == 0 { opcodeSize = DefaultOpcodeSize @@ -75,23 +73,28 @@ func NewAppPacketFromRaw(buf []byte, opcodeSize uint8, manager opcodes.Manager) manager: manager, } + offset := 0 + + // Extract opcode based on size if opcodeSize == 1 && len(buf) >= 1 { app.Opcode = uint16(buf[0]) - if len(buf) > 1 { - app.Buffer = make([]byte, len(buf)-1) - copy(app.Buffer, buf[1:]) - } + offset = 1 } else if len(buf) >= 2 { app.Opcode = binary.BigEndian.Uint16(buf[:2]) - if len(buf) > 2 { - app.Buffer = make([]byte, len(buf)-2) - copy(app.Buffer, buf[2:]) - } + offset = 2 + } + + // Copy remaining data as payload + if len(buf) > offset { + app.Buffer = make([]byte, len(buf)-offset) + copy(app.Buffer, buf[offset:]) } // Convert EQ opcode to emulator opcode if app.manager != nil { app.emuOpcode = app.manager.EQToEmu(app.Opcode) + } else { + app.emuOpcode = opcodes.EmuOpcode(app.Opcode) } return app @@ -99,21 +102,12 @@ func NewAppPacketFromRaw(buf []byte, opcodeSize uint8, manager opcodes.Manager) // Size returns total packet size including opcode func (a *AppPacket) Size() uint32 { - return uint32(len(a.Buffer)) + uint32(a.opcodeSize) -} - -// SetOpcodeSize sets the opcode size -func (a *AppPacket) SetOpcodeSize(size uint8) { - a.opcodeSize = size -} - -// SetManager sets the opcode manager for translation -func (a *AppPacket) SetManager(manager opcodes.Manager) { - a.manager = manager - // Re-translate if we have an opcode - if a.emuOpcode != opcodes.OP_Unknown && a.manager != nil { - a.Opcode = a.manager.EmuToEQ(a.emuOpcode) + // Handle special encoding where low byte = 0x00 needs extra byte + extraBytes := uint32(0) + if a.opcodeSize == 2 && (a.Opcode&0x00ff) == 0 { + extraBytes = 1 } + return uint32(len(a.Buffer)) + uint32(a.opcodeSize) + extraBytes } // SetOpcode sets the emulator opcode and translates to EQ opcode @@ -122,15 +116,13 @@ func (a *AppPacket) SetOpcode(op opcodes.EmuOpcode) { if a.manager != nil { a.Opcode = a.manager.EmuToEQ(op) } else { - // Fallback to direct assignment if no manager a.Opcode = uint16(op) } } -// GetOpcode returns the emulator opcode (with caching) +// GetOpcode returns the emulator opcode func (a *AppPacket) GetOpcode() opcodes.EmuOpcode { if a.emuOpcode == opcodes.OP_Unknown && a.manager != nil { - // Convert from protocol opcode a.emuOpcode = a.manager.EQToEmu(a.Opcode) } return a.emuOpcode @@ -144,20 +136,16 @@ func (a *AppPacket) GetOpcodeName() string { return "OP_Unknown" } -// Copy creates a deep copy of this application packet -func (a *AppPacket) Copy() *AppPacket { - newApp := &AppPacket{ - Packet: NewPacket(a.Opcode, a.Buffer), - emuOpcode: a.emuOpcode, - opcodeSize: a.opcodeSize, - manager: a.manager, +// SetManager sets the opcode manager +func (a *AppPacket) SetManager(manager opcodes.Manager) { + a.manager = manager + // Re-translate if we have an opcode + if a.emuOpcode != opcodes.OP_Unknown && a.manager != nil { + a.Opcode = a.manager.EmuToEQ(a.emuOpcode) } - newApp.Packet.CopyInfo(a.Packet) - return newApp } -// Serialize writes the application packet to a destination buffer -// Handles special opcode encoding rules for application-level packets +// Serialize writes to destination (matches EQApplicationPacket::serialize) func (a *AppPacket) Serialize(dest []byte) uint32 { opcodeBytes := a.opcodeSize pos := 0 @@ -187,84 +175,34 @@ func (a *AppPacket) Serialize(dest []byte) uint32 { return uint32(len(a.Buffer)) + uint32(opcodeBytes) } -// Combine combines this packet with another application packet +// Combine combines with another app packet (matches EQApplicationPacket::combine) func (a *AppPacket) Combine(rhs *AppPacket) bool { - const opAppCombined = 0x19 // OP_AppCombined - - // Check if we can combine these packets - totalSize := a.Size() + rhs.Size() - if totalSize > 512 { // Reasonable max combined packet size - return false - } - - // If this isn't already a combined packet, convert it - if a.Opcode != opAppCombined { - // Create combined packet structure - oldSize := a.Size() - newSize := oldSize + rhs.Size() + 2 // +2 for size headers - - newBuffer := make([]byte, newSize) - pos := 0 - - // Write first packet size and data - newBuffer[pos] = byte(oldSize) - pos++ - - // Serialize first packet - tmpBuf := make([]byte, oldSize) - a.Serialize(tmpBuf) - copy(newBuffer[pos:], tmpBuf) - pos += int(oldSize) - - // Write second packet size and data - rhsSize := rhs.Size() - newBuffer[pos] = byte(rhsSize) - pos++ - - // Serialize second packet - tmpBuf = make([]byte, rhsSize) - rhs.Serialize(tmpBuf) - copy(newBuffer[pos:], tmpBuf) - - // Update this packet to be a combined packet - a.Opcode = opAppCombined - a.Buffer = newBuffer - return true - } - - // This is already a combined packet, add to it - rhsSize := rhs.Size() - if rhsSize >= 255 { - // Oversized packet handling - newBuffer := make([]byte, len(a.Buffer)+int(rhsSize)+3) - copy(newBuffer, a.Buffer) - pos := len(a.Buffer) - - newBuffer[pos] = 255 // Oversized marker - pos++ - binary.BigEndian.PutUint16(newBuffer[pos:], uint16(rhsSize)) - pos += 2 - - tmpBuf := make([]byte, rhsSize) - rhs.Serialize(tmpBuf) - copy(newBuffer[pos:], tmpBuf) - - a.Buffer = newBuffer - } else { - // Normal sized addition - newBuffer := make([]byte, len(a.Buffer)+int(rhsSize)+1) - copy(newBuffer, a.Buffer) - pos := len(a.Buffer) - - newBuffer[pos] = byte(rhsSize) - pos++ - - tmpBuf := make([]byte, rhsSize) - rhs.Serialize(tmpBuf) - copy(newBuffer[pos:], tmpBuf) - - a.Buffer = newBuffer - } - - return true + // Application packet combining is not implemented in original + // Use protocol-level combining instead + return false +} + +// Copy creates a deep copy +func (a *AppPacket) Copy() *AppPacket { + newApp := &AppPacket{ + Packet: NewPacket(a.Opcode, a.Buffer), + emuOpcode: a.emuOpcode, + opcodeSize: a.opcodeSize, + manager: a.manager, + } + newApp.Packet.CopyInfo(a.Packet) + return newApp +} + +// ToProto converts to protocol packet +func (a *AppPacket) ToProto() *ProtoPacket { + // Serialize the application packet + tmpBuf := make([]byte, a.Size()) + a.Serialize(tmpBuf) + + proto := NewProtoPacket(opcodes.OP_Packet, tmpBuf, a.manager) + proto.LoginOp = a.emuOpcode + proto.CopyInfo(a.Packet) + + return proto } diff --git a/packets/helpers.go b/packets/helpers.go index 3899b02..dce1803 100644 --- a/packets/helpers.go +++ b/packets/helpers.go @@ -5,33 +5,42 @@ import ( "compress/zlib" "encoding/binary" "fmt" - "hash/crc32" "io" "git.sharkk.net/EQ2/Protocol/crypto" ) -// ValidateCRC validates packet CRC using EQ's CRC32 implementation +// ValidateCRC validates packet CRC using EQ2's custom CRC16 func ValidateCRC(buffer []byte, key uint32) bool { - if len(buffer) < 4 { + if len(buffer) < 2 { return false } - // Extract CRC from last 4 bytes - packetCRC := binary.BigEndian.Uint32(buffer[len(buffer)-4:]) + // Extract CRC from last 2 bytes (EQ2 uses CRC16) + packetCRC := binary.BigEndian.Uint16(buffer[len(buffer)-2:]) // Calculate CRC on data portion (excluding CRC bytes) - data := buffer[:len(buffer)-4] - calculatedCRC := CalculateCRC(data, key) + data := buffer[:len(buffer)-2] + calculatedCRC := crypto.CalculateCRC(data, key) return packetCRC == calculatedCRC } -// CalculateCRC calculates CRC32 for packet data -func CalculateCRC(data []byte, key uint32) uint32 { - // EQ uses standard CRC32 with XOR key - crc := crc32.ChecksumIEEE(data) - return crc ^ key +// AppendCRC appends CRC16 to packet buffer using EQ2's custom CRC +func AppendCRC(buffer []byte, key uint32) []byte { + crc := crypto.CalculateCRC(buffer, key) + result := make([]byte, len(buffer)+2) + copy(result, buffer) + binary.BigEndian.PutUint16(result[len(buffer):], crc) + return result +} + +// StripCRC removes CRC16 from packet buffer +func StripCRC(buffer []byte) []byte { + if len(buffer) < 2 { + return buffer + } + return buffer[:len(buffer)-2] } // Compress compresses packet data using zlib (matches EQ compression) @@ -42,16 +51,13 @@ func Compress(src []byte) ([]byte, error) { var buf bytes.Buffer - // EQ uses default compression level - w := zlib.NewWriter(&buf) - // Write uncompressed length first (4 bytes) - EQ protocol requirement - uncompressedLen := uint32(len(src)) - if err := binary.Write(&buf, binary.BigEndian, uncompressedLen); err != nil { + if err := binary.Write(&buf, binary.BigEndian, uint32(len(src))); err != nil { return nil, err } // Compress the data + w := zlib.NewWriter(&buf) if _, err := w.Write(src); err != nil { w.Close() return nil, err @@ -73,7 +79,7 @@ func Decompress(src []byte) ([]byte, error) { // Read uncompressed length (first 4 bytes) uncompressedLen := binary.BigEndian.Uint32(src[:4]) - // Sanity check - prevent decompression bombs + // Sanity check if uncompressedLen > MaxPacketSize { return nil, fmt.Errorf("uncompressed size %d exceeds max packet size", uncompressedLen) } @@ -95,16 +101,13 @@ func Decompress(src []byte) ([]byte, error) { } // ChatEncode encodes chat data using EQ's XOR-based encoding -// EQ uses a simple rotating XOR with the encode key func ChatEncode(buffer []byte, encodeKey int) { if len(buffer) == 0 || encodeKey == 0 { return } - // EQ chat encoding algorithm key := byte(encodeKey & 0xFF) for i := range buffer { - // XOR with rotating key based on position buffer[i] ^= key // Rotate key for next byte key = ((key << 1) | (key >> 7)) & 0xFF @@ -115,23 +118,38 @@ func ChatEncode(buffer []byte, encodeKey int) { } } -// ChatDecode decodes chat data using EQ's XOR-based encoding -// Decoding is the same as encoding for XOR +// ChatDecode decodes chat data (XOR is symmetric) func ChatDecode(buffer []byte, decodeKey int) { - // XOR encoding is symmetric - encode and decode are the same operation ChatEncode(buffer, decodeKey) } +// IsChatPacket checks if opcode is a chat-related packet +func IsChatPacket(opcode uint16) bool { + chatOpcodes := map[uint16]bool{ + 0x0300: true, // OP_ChatMsg + 0x0302: true, // OP_TellMsg + 0x0307: true, // OP_ChatLeaveChannelMsg + 0x0308: true, // OP_ChatTellChannelMsg + 0x0309: true, // OP_ChatTellUserMsg + 0x0e07: true, // OP_GuildsayMsg + } + return chatOpcodes[opcode] +} + +// longToIP converts uint32 IP to string +func longToIP(ip uint32) string { + return fmt.Sprintf("%d.%d.%d.%d", + byte(ip>>24), byte(ip>>16), byte(ip>>8), byte(ip)) +} + // IsProtocolPacket checks if buffer contains a valid protocol packet -func IsProtocolPacket(buffer []byte, trimCRC bool) bool { +func IsProtocolPacket(buffer []byte) bool { if len(buffer) < 2 { return false } - // Check for valid protocol opcodes opcode := binary.BigEndian.Uint16(buffer[:2]) - // Protocol opcodes from protocol.go validOpcodes := map[uint16]bool{ 0x0001: true, // OP_SessionRequest 0x0002: true, // OP_SessionResponse @@ -148,95 +166,5 @@ func IsProtocolPacket(buffer []byte, trimCRC bool) bool { 0x001e: true, // OP_OutOfSession } - if !validOpcodes[opcode] { - return false - } - - // If checking CRC, validate it - if trimCRC && len(buffer) >= 6 { - // Protocol packets have 2-byte opcode + data + 4-byte CRC - return ValidateCRC(buffer, 0) - } - - return true -} - -// EncodePacket applies encoding/compression based on flags -func EncodePacket(packet *ProtoPacket, compressThreshold int, encodeKey int) error { - // Apply compression if packet is large enough - if len(packet.Buffer) > compressThreshold && !packet.IsCompressed() { - compressed, err := Compress(packet.Buffer) - if err != nil { - return err - } - packet.Buffer = compressed - packet.SetCompressed(true) - } - - // Apply chat encoding if this is a chat packet - if IsChatPacket(packet.Opcode) && encodeKey != 0 { - ChatEncode(packet.Buffer, encodeKey) - packet.SetEncrypted(true) - } - - return nil -} - -// DecodePacket reverses encoding/compression -func DecodePacket(packet *ProtoPacket, decodeKey int) error { - // Decrypt if encrypted - if packet.IsEncrypted() && decodeKey != 0 { - ChatDecode(packet.Buffer, decodeKey) - packet.SetEncrypted(false) - } - - // Decompress if compressed - if packet.IsCompressed() { - decompressed, err := Decompress(packet.Buffer) - if err != nil { - return err - } - packet.Buffer = decompressed - packet.SetCompressed(false) - } - - return nil -} - -// IsChatPacket checks if opcode is a chat-related packet -func IsChatPacket(opcode uint16) bool { - // Chat-related opcodes that need encoding - // These would map to OP_ChatMsg, OP_TellMsg, etc in the opcodes package - chatOpcodes := map[uint16]bool{ - 0x0300: true, // OP_ChatMsg - 0x0302: true, // OP_TellMsg - 0x0307: true, // OP_ChatLeaveChannelMsg - 0x0308: true, // OP_ChatTellChannelMsg - 0x0309: true, // OP_ChatTellUserMsg - 0x0e07: true, // OP_GuildsayMsg - } - return chatOpcodes[opcode] -} - -// Helper function to convert uint32 IP to string -func longToIP(ip uint32) string { - return fmt.Sprintf("%d.%d.%d.%d", - byte(ip>>24), byte(ip>>16), byte(ip>>8), byte(ip)) -} - -// AppendCRC appends CRC16 to packet buffer using EQ2's custom CRC -func AppendCRC(buffer []byte, key uint32) []byte { - crc := crypto.CalculateCRC(buffer, key) - result := make([]byte, len(buffer)+2) - copy(result, buffer) - binary.BigEndian.PutUint16(result[len(buffer):], crc) - return result -} - -// StripCRC removes CRC16 from packet buffer -func StripCRC(buffer []byte) []byte { - if len(buffer) < 2 { - return buffer - } - return buffer[:len(buffer)-2] + return validOpcodes[opcode] } diff --git a/packets/protopacket.go b/packets/protopacket.go index 5480b1b..14b2662 100644 --- a/packets/protopacket.go +++ b/packets/protopacket.go @@ -7,12 +7,15 @@ import ( "git.sharkk.net/EQ2/Protocol/opcodes" ) -// ProtoPacket handles low-level protocol features including EQ2-specific operations +// ProtoPacket handles low-level protocol features (matches EQProtocolPacket/EQ2Packet) type ProtoPacket struct { *Packet - // Protocol state flags (using bitfield) - flags uint8 // bit 0: compressed, bit 1: prepared, bit 2: encrypted, bit 3: acked + // Protocol state flags + eq2Compressed bool + packetPrepared bool + packetEncrypted bool + acked bool // EQ2-specific LoginOp opcodes.EmuOpcode @@ -30,15 +33,6 @@ type ProtoPacket struct { EncodeKey int } -// Protocol flag constants -const ( - FlagCompressed = 1 << iota - FlagPrepared - FlagEncrypted - FlagAcked -) - -// Default compression threshold const DefaultCompressThreshold = 100 // NewProtoPacket creates a protocol packet with opcode and buffer @@ -50,7 +44,7 @@ func NewProtoPacket(op uint16, buf []byte, manager opcodes.Manager) *ProtoPacket } } -// NewProtoPacketFromRaw creates a protocol packet from raw buffer +// NewProtoPacketFromRaw creates from raw buffer (matches EQProtocolPacket constructor) func NewProtoPacketFromRaw(buf []byte, opcodeOverride int, manager opcodes.Manager) *ProtoPacket { var offset uint32 var opcode uint16 @@ -78,7 +72,6 @@ func NewProtoPacketFromRaw(buf []byte, opcodeOverride int, manager opcodes.Manag CompressThreshold: DefaultCompressThreshold, } - // Convert EQ opcode to emulator opcode if manager available if pp.manager != nil { pp.LoginOp = pp.manager.EQToEmu(opcode) } @@ -86,249 +79,63 @@ func NewProtoPacketFromRaw(buf []byte, opcodeOverride int, manager opcodes.Manag return pp } -// SetManager sets the opcode manager for translation -func (p *ProtoPacket) SetManager(manager opcodes.Manager) { - p.manager = manager -} - -// IsCompressed returns true if packet is compressed -func (p *ProtoPacket) IsCompressed() bool { - return p.flags&FlagCompressed != 0 -} - -// SetCompressed sets the compressed flag -func (p *ProtoPacket) SetCompressed(compressed bool) { - if compressed { - p.flags |= FlagCompressed - } else { - p.flags &^= FlagCompressed - } -} - -// IsPrepared returns true if packet has been prepared for sending -func (p *ProtoPacket) IsPrepared() bool { - return p.flags&FlagPrepared != 0 -} - -// SetPrepared sets the prepared flag -func (p *ProtoPacket) SetPrepared(prepared bool) { - if prepared { - p.flags |= FlagPrepared - } else { - p.flags &^= FlagPrepared - } -} - -// IsEncrypted returns true if packet is encrypted -func (p *ProtoPacket) IsEncrypted() bool { - return p.flags&FlagEncrypted != 0 -} - -// SetEncrypted sets the encrypted flag -func (p *ProtoPacket) SetEncrypted(encrypted bool) { - if encrypted { - p.flags |= FlagEncrypted - } else { - p.flags &^= FlagEncrypted - } -} - -// IsAcked returns true if packet has been acknowledged -func (p *ProtoPacket) IsAcked() bool { - return p.flags&FlagAcked != 0 -} - -// SetAcked sets the acknowledged flag -func (p *ProtoPacket) SetAcked(acked bool) { - if acked { - p.flags |= FlagAcked - } else { - p.flags &^= FlagAcked - } -} - -// CompressPacket compresses the packet data if needed -func (p *ProtoPacket) CompressPacket() error { - if p.IsCompressed() || len(p.Buffer) <= p.CompressThreshold { - return nil - } - - compressed, err := Compress(p.Buffer) - if err != nil { - return err - } - - // Only use compression if it actually saves space - if len(compressed) < len(p.Buffer) { - p.Buffer = compressed - p.SetCompressed(true) - } - - return nil -} - -// DecompressPacket decompresses the packet data -func (p *ProtoPacket) DecompressPacket() error { - if !p.IsCompressed() { - return nil - } - - decompressed, err := Decompress(p.Buffer) - if err != nil { - return err - } - - p.Buffer = decompressed - p.SetCompressed(false) - return nil -} - -// EncodeChat applies chat encoding if this is a chat packet -func (p *ProtoPacket) EncodeChat() { - if p.EncodeKey == 0 || p.IsEncrypted() { - return - } - - if p.IsChatPacket() { - ChatEncode(p.Buffer, p.EncodeKey) - p.SetEncrypted(true) - } -} - -// DecodeChat reverses chat encoding -func (p *ProtoPacket) DecodeChat() { - if p.EncodeKey == 0 || !p.IsEncrypted() { - return - } - - ChatDecode(p.Buffer, p.EncodeKey) - p.SetEncrypted(false) -} - -// IsChatPacket checks if this is a chat packet using opcode manager -func (p *ProtoPacket) IsChatPacket() bool { - if p.manager == nil { - return false - } - - // Get emulator opcode - emuOp := p.manager.EQToEmu(p.Opcode) - - // Check against known chat opcodes - switch emuOp { - case opcodes.OP_ChatMsg, - opcodes.OP_TellMsg, - opcodes.OP_ChatLeaveChannelMsg, - opcodes.OP_ChatTellChannelMsg, - opcodes.OP_ChatTellUserMsg, - opcodes.OP_GuildsayMsg: - return true - default: - return false - } -} - -// Copy creates a deep copy of this protocol packet -func (p *ProtoPacket) Copy() *ProtoPacket { - newPacket := &ProtoPacket{ - Packet: NewPacket(p.Opcode, p.Buffer), - flags: p.flags, - LoginOp: p.LoginOp, - Sequence: p.Sequence, - SentTime: p.SentTime, - AttemptCount: p.AttemptCount, - manager: p.manager, - CompressThreshold: p.CompressThreshold, - EncodeKey: p.EncodeKey, - } - newPacket.Packet.CopyInfo(p.Packet) - return newPacket -} - -// Serialize writes the protocol packet to a destination buffer -func (p *ProtoPacket) Serialize(dest []byte, offset int8) uint32 { - // Apply compression before serialization - p.CompressPacket() - - // Apply chat encoding if needed - p.EncodeChat() - - // Write compression flag if compressed - pos := 0 - if p.IsCompressed() { - dest[pos] = 0x5a // EQ compression flag - pos++ - } - - // Write opcode (2 bytes) - if p.Opcode > 0xff { - binary.BigEndian.PutUint16(dest[pos:], p.Opcode) - } else { - dest[pos] = 0 - dest[pos+1] = byte(p.Opcode) - } - pos += 2 - - // Copy packet data after opcode - if offset < int8(len(p.Buffer)) { - copy(dest[pos:], p.Buffer[offset:]) - } - - return uint32(len(p.Buffer)-int(offset)) + uint32(pos) -} - -// PreparePacket prepares an EQ2 packet for transmission +// PreparePacket prepares an EQ2 packet for transmission (matches EQ2Packet::PreparePacket) func (p *ProtoPacket) PreparePacket(maxLen int16) int8 { - if p.IsPrepared() { + if p.packetPrepared { return 0 } - p.SetPrepared(true) - - // Apply compression if needed - if err := p.CompressPacket(); err != nil { + if p.manager == nil { return -1 } - // Apply chat encoding if needed - p.EncodeChat() + p.packetPrepared = true - // Convert emulator opcode to network opcode using manager - var loginOpcode uint16 - if p.manager != nil { - loginOpcode = p.manager.EmuToEQ(p.LoginOp) - } else { - loginOpcode = uint16(p.LoginOp) - } + // Convert emulator opcode to network opcode + loginOpcode := p.manager.EmuToEQ(p.LoginOp) - var offset int8 - newSize := len(p.Buffer) + 2 + 1 // sequence(2) + compressed_flag(1) - oversized := false - - // Add compression flag space if compressed - if p.IsCompressed() { - newSize++ - } - - // Handle different opcode sizes and formats - if loginOpcode != 2 { - newSize++ // opcode type byte - if loginOpcode >= 255 { - newSize += 2 // oversized opcode - oversized = true + // Apply compression if needed + if !p.eq2Compressed && len(p.Buffer) > p.CompressThreshold { + compressed, err := Compress(p.Buffer) + if err == nil && len(compressed) < len(p.Buffer) { + p.Buffer = compressed + p.eq2Compressed = true } } - // Build new packet buffer - newBuffer := make([]byte, newSize) - ptr := 2 // Skip sequence field + // Build new packet buffer with proper headers + var offset int8 + newSize := len(p.Buffer) + 2 // Base size with sequence - // Add compression flag if needed - if p.IsCompressed() { - newBuffer[ptr] = 0x5a // EQ compression flag + // Add compression flag space if compressed + if p.eq2Compressed { + newSize++ + } + + // Handle opcode encoding (matches C++ implementation) + oversized := false + if loginOpcode != 2 { // Not OP_SessionResponse + if loginOpcode >= 255 { + newSize += 3 // oversized opcode (0xFF + 2 bytes) + oversized = true + } else { + newSize += 2 // normal opcode + } + } else { + newSize++ // single byte for OP_SessionResponse + } + + // Build new buffer + newBuffer := make([]byte, newSize) + ptr := 2 // Skip sequence field (filled by stream) + + // Add compression flag + if p.eq2Compressed { + newBuffer[ptr] = 0x5a // EQ2 compression flag ptr++ } + // Encode opcode if loginOpcode != 2 { if oversized { newBuffer[ptr] = 0xff // Oversized marker @@ -348,52 +155,94 @@ func (p *ProtoPacket) PreparePacket(maxLen int16) int8 { copy(newBuffer[ptr:], p.Buffer) p.Buffer = newBuffer - offset = int8(newSize - len(p.Buffer) - 1) + offset = int8(ptr - 2) // Return offset past sequence field return offset } -// MakeApplicationPacket converts protocol packet to application packet -func (p *ProtoPacket) MakeApplicationPacket(opcodeSize uint8) *AppPacket { - // Decompress if needed - p.DecompressPacket() +// Serialize writes the protocol packet to a destination buffer +func (p *ProtoPacket) Serialize(dest []byte, offset int8) uint32 { + pos := 0 - // Decode chat if needed - p.DecodeChat() - - if opcodeSize == 0 { - opcodeSize = DefaultOpcodeSize + // Write compression flag if compressed + if p.eq2Compressed { + dest[pos] = 0x5a + pos++ } - app := &AppPacket{ - Packet: NewPacket(0, p.Buffer), - opcodeSize: opcodeSize, - manager: p.manager, - } - app.CopyInfo(p.Packet) + // Write opcode (2 bytes) + binary.BigEndian.PutUint16(dest[pos:], p.Opcode) + pos += 2 - // Parse opcode from buffer based on size - if opcodeSize == 1 && len(p.Buffer) >= 1 { - app.Opcode = uint16(p.Buffer[0]) - app.Buffer = p.Buffer[1:] - } else if len(p.Buffer) >= 2 { - app.Opcode = binary.BigEndian.Uint16(p.Buffer[:2]) - app.Buffer = p.Buffer[2:] + // Copy packet data after opcode + if offset < int8(len(p.Buffer)) { + copy(dest[pos:], p.Buffer[offset:]) + return uint32(len(p.Buffer)-int(offset)) + uint32(pos) } - // Convert EQ opcode to emulator opcode if manager available - if app.manager != nil { - app.emuOpcode = app.manager.EQToEmu(app.Opcode) - } - - return app + return uint32(pos) } -// AppCombine combines this packet with another for efficient transmission +// Combine combines this protocol packet with another (matches EQProtocolPacket::combine) +func (p *ProtoPacket) Combine(rhs *ProtoPacket) bool { + const opCombined = 0x03 // OP_Combined + + // Case 1: This packet is already combined - append to it + if p.Opcode == opCombined && p.Size()+rhs.Size()+3 < 256 { + newSize := len(p.Buffer) + int(rhs.Size()) + 1 + newBuffer := make([]byte, newSize) + + // Copy existing combined data + copy(newBuffer, p.Buffer) + offset := len(p.Buffer) + + // Add size prefix for new packet + newBuffer[offset] = byte(rhs.Size()) + offset++ + + // Serialize and append new packet + tmpBuf := make([]byte, rhs.Size()) + rhs.Serialize(tmpBuf, 0) + copy(newBuffer[offset:], tmpBuf) + + p.Buffer = newBuffer + return true + } + + // Case 2: Neither packet is combined - create new combined packet + if p.Size()+rhs.Size()+4 < 256 { + totalSize := int(p.Size()) + int(rhs.Size()) + 2 + newBuffer := make([]byte, totalSize) + offset := 0 + + // Add first packet with size prefix + newBuffer[offset] = byte(p.Size()) + offset++ + tmpBuf := make([]byte, p.Size()) + p.Serialize(tmpBuf, 0) + copy(newBuffer[offset:], tmpBuf) + offset += int(p.Size()) + + // Add second packet with size prefix + newBuffer[offset] = byte(rhs.Size()) + offset++ + tmpBuf = make([]byte, rhs.Size()) + rhs.Serialize(tmpBuf, 0) + copy(newBuffer[offset:], tmpBuf) + + // Update buffer and mark as combined + p.Buffer = newBuffer + p.Opcode = opCombined + return true + } + + return false +} + +// AppCombine combines app-level packets (matches EQ2Packet::AppCombine) func (p *ProtoPacket) AppCombine(rhs *ProtoPacket) bool { const opAppCombined = 0x19 // OP_AppCombined - // Calculate sizes lhsSize := p.Size() rhsSize := rhs.Size() @@ -402,109 +251,136 @@ func (p *ProtoPacket) AppCombine(rhs *ProtoPacket) bool { return false } - // If this packet is already combined, add to it + // If already combined, add to it if p.Opcode == opAppCombined { - // Calculate new size + tmpSize := rhsSize - 2 // Subtract opcode bytes var newSize int - if rhsSize >= 255 { - newSize = len(p.Buffer) + int(rhsSize) + 3 // oversized header + + if tmpSize >= 255 { + newSize = len(p.Buffer) + int(tmpSize) + 3 // oversized header } else { - newSize = len(p.Buffer) + int(rhsSize) + 1 // normal header + newSize = len(p.Buffer) + int(tmpSize) + 1 // normal header } - // Check size limit - if newSize > MaxCombinedSize { - return false - } - - // Create new buffer newBuffer := make([]byte, newSize) - pos := 0 - - // Copy existing combined data copy(newBuffer, p.Buffer) - pos = len(p.Buffer) + pos := len(p.Buffer) - // Add new packet with size header - if rhsSize >= 255 { + // Add size header + if tmpSize >= 255 { newBuffer[pos] = 255 // Oversized marker pos++ - binary.BigEndian.PutUint16(newBuffer[pos:], uint16(rhsSize)) + binary.BigEndian.PutUint16(newBuffer[pos:], uint16(tmpSize)) pos += 2 } else { - newBuffer[pos] = byte(rhsSize) + newBuffer[pos] = byte(tmpSize) pos++ } - // Serialize rhs packet - tmpBuf := make([]byte, rhsSize) - rhs.Serialize(tmpBuf, 0) - copy(newBuffer[pos:], tmpBuf) + // Serialize rhs packet (skip first 2 bytes - opcode) + if len(rhs.Buffer) > 2 { + copy(newBuffer[pos:], rhs.Buffer[2:]) + } p.Buffer = newBuffer return true } - // Create new combined packet from two non-combined packets - // Calculate total size: size1(1-3) + packet1 + size2(1-3) + packet2 - var totalSize int - if lhsSize >= 255 { - totalSize += 3 + int(lhsSize) + // Create new combined packet + lSize := lhsSize - 2 + rSize := rhsSize - 2 + totalSize := 0 + + if lSize >= 255 { + totalSize += 3 + int(lSize) } else { - totalSize += 1 + int(lhsSize) - } - if rhsSize >= 255 { - totalSize += 3 + int(rhsSize) - } else { - totalSize += 1 + int(rhsSize) + totalSize += 1 + int(lSize) + } + + if rSize >= 255 { + totalSize += 3 + int(rSize) + } else { + totalSize += 1 + int(rSize) } - // Check size limit if totalSize > MaxCombinedSize { return false } - // Build combined packet newBuffer := make([]byte, totalSize) pos := 0 - // Add first packet with size header - if lhsSize >= 255 { + // Add first packet + if lSize >= 255 { newBuffer[pos] = 255 pos++ - binary.BigEndian.PutUint16(newBuffer[pos:], uint16(lhsSize)) + binary.BigEndian.PutUint16(newBuffer[pos:], uint16(lSize)) pos += 2 } else { - newBuffer[pos] = byte(lhsSize) + newBuffer[pos] = byte(lSize) pos++ } + if len(p.Buffer) > 2 { + copy(newBuffer[pos:], p.Buffer[2:]) + pos += int(lSize) + } - // Serialize first packet - tmpBuf := make([]byte, lhsSize) - p.Serialize(tmpBuf, 0) - copy(newBuffer[pos:], tmpBuf) - pos += int(lhsSize) - - // Add second packet with size header - if rhsSize >= 255 { + // Add second packet + if rSize >= 255 { newBuffer[pos] = 255 pos++ - binary.BigEndian.PutUint16(newBuffer[pos:], uint16(rhsSize)) + binary.BigEndian.PutUint16(newBuffer[pos:], uint16(rSize)) pos += 2 } else { - newBuffer[pos] = byte(rhsSize) + newBuffer[pos] = byte(rSize) pos++ } + if len(rhs.Buffer) > 2 { + copy(newBuffer[pos:], rhs.Buffer[2:]) + } - // Serialize second packet - tmpBuf = make([]byte, rhsSize) - rhs.Serialize(tmpBuf, 0) - copy(newBuffer[pos:], tmpBuf) - - // Update this packet to be combined p.Opcode = opAppCombined p.Buffer = newBuffer - p.SetPrepared(false) // Need to re-prepare + p.packetPrepared = false return true } + +// MakeApplicationPacket converts to app packet (matches EQProtocolPacket::MakeApplicationPacket) +func (p *ProtoPacket) MakeApplicationPacket(opcodeSize uint8) *AppPacket { + // Decompress if needed + if p.eq2Compressed { + if decompressed, err := Decompress(p.Buffer); err == nil { + p.Buffer = decompressed + p.eq2Compressed = false + } + } + + // Decode chat if needed + if p.packetEncrypted && p.EncodeKey != 0 { + ChatDecode(p.Buffer, p.EncodeKey) + p.packetEncrypted = false + } + + return NewAppPacketFromRaw(p.Buffer, opcodeSize, p.manager) +} + +// Copy creates a deep copy +func (p *ProtoPacket) Copy() *ProtoPacket { + newPacket := &ProtoPacket{ + Packet: NewPacket(p.Opcode, p.Buffer), + eq2Compressed: p.eq2Compressed, + packetPrepared: p.packetPrepared, + packetEncrypted: p.packetEncrypted, + acked: p.acked, + LoginOp: p.LoginOp, + Sequence: p.Sequence, + SentTime: p.SentTime, + AttemptCount: p.AttemptCount, + manager: p.manager, + CompressThreshold: p.CompressThreshold, + EncodeKey: p.EncodeKey, + } + newPacket.Packet.CopyInfo(p.Packet) + return newPacket +} diff --git a/stream/stream.go b/stream/stream.go index 24f1931..6287139 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -13,7 +13,7 @@ import ( "github.com/panjf2000/gnet/v2" ) -// Stream implements EQ2's reliable UDP protocol +// Stream implements EQ2's reliable UDP protocol (matches EQStream) type Stream struct { conn gnet.Conn mu sync.RWMutex @@ -41,12 +41,15 @@ type Stream struct { duplicateAckCnt map[uint16]int // Fragment assembly with expiry - fragments map[uint16]*fragmentBuffer - nextFragID uint16 + fragments map[uint16]*fragmentBuffer + currentFragment *fragmentBuffer // Out of order handling with expiry outOfOrder map[uint16]*outOfOrderPacket + // Combined packet for efficient sending + combinedPacket *packets.ProtoPacket + // Packet queues reliableQueue []*packets.ProtoPacket unreliableQueue []*packets.ProtoPacket @@ -64,6 +67,7 @@ type Stream struct { timeoutTimer *time.Timer retransmitTimer *time.Timer cleanupTimer *time.Timer + combineTimer *time.Timer // Retransmission settings rtt time.Duration @@ -121,6 +125,7 @@ const ( fragmentTimeout = 30 * time.Second outOfOrderTimeout = 10 * time.Second duplicateAckThreshold = 3 + combineFlushInterval = 10 * time.Millisecond ) // Config holds stream configuration @@ -188,24 +193,23 @@ func NewStream(conn gnet.Conn, cfg *Config) *Stream { } s.retransmitTimer = time.AfterFunc(retransmitInterval, s.processRetransmits) s.cleanupTimer = time.AfterFunc(cleanupInterval, s.cleanup) + s.combineTimer = time.AfterFunc(combineFlushInterval, s.flushCombined) return s } -// Process handles incoming data from gnet +// Process handles incoming data with proper CRC validation func (s *Stream) Process(data []byte) error { if len(data) < 2 { return nil } - // Check for CRC + // Validate and strip CRC16 if len(data) > 2 { - providedCRC := binary.BigEndian.Uint16(data[len(data)-2:]) - calculatedCRC := crypto.CalculateCRC(data[:len(data)-2], s.crcKey) - if providedCRC != calculatedCRC { + if !packets.ValidateCRC(data, s.crcKey) { return nil } - data = data[:len(data)-2] + data = packets.StripCRC(data) } // Decrypt if needed @@ -248,32 +252,71 @@ func (s *Stream) Process(data []byte) error { return nil } -// SendPacket sends an application packet +// SendPacket sends an application packet with proper preparation func (s *Stream) SendPacket(app *packets.AppPacket) error { if s.state.Load() != StateEstablished { return fmt.Errorf("stream not established") } - // Validate packet size - if app.Size() > uint32(s.maxLen) { - proto := s.appToProto(app) + // Convert to protocol packet + proto := app.ToProto() + proto.CompressThreshold = 100 + proto.EncodeKey = s.encodeKey + + // Check if packet needs fragmentation + if proto.Size() > uint32(s.maxLen-8) { return s.sendFragmented(proto) } - proto := s.appToProto(app) + // Try to combine with pending combined packet + s.mu.Lock() + defer s.mu.Unlock() + + if s.combinedPacket != nil { + if s.combinedPacket.AppCombine(proto) { + if s.combinedPacket.Size() > uint32(s.maxLen/2) { + s.flushCombinedPacketLocked() + } + return nil + } + s.flushCombinedPacketLocked() + } + + // Queue based on reliability isUnreliable := s.isUnreliableOpcode(app.GetOpcode()) - s.mu.Lock() if isUnreliable { s.unreliableQueue = append(s.unreliableQueue, proto) } else { - s.reliableQueue = append(s.reliableQueue, proto) + // Start new combined packet if small enough + if proto.Size() < uint32(s.maxLen/4) { + s.combinedPacket = proto + s.combineTimer.Reset(combineFlushInterval) + } else { + s.reliableQueue = append(s.reliableQueue, proto) + } } - s.mu.Unlock() return s.processQueues() } +// flushCombined timer callback +func (s *Stream) flushCombined() { + s.mu.Lock() + s.flushCombinedPacketLocked() + s.mu.Unlock() + s.processQueues() + s.combineTimer.Reset(combineFlushInterval) +} + +// flushCombinedPacketLocked sends any pending combined packet (must hold lock) +func (s *Stream) flushCombinedPacketLocked() { + if s.combinedPacket != nil { + s.reliableQueue = append(s.reliableQueue, s.combinedPacket) + s.combinedPacket = nil + } +} + // processQueues processes all packet queues func (s *Stream) processQueues() error { s.mu.Lock() @@ -340,6 +383,75 @@ func (s *Stream) processQueues() error { return nil } +// sendFragmented sends a fragmented packet (matches EQStream::SendPacket) +func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { + // Prepare packet first + offset := proto.PreparePacket(int16(s.maxLen)) + if offset < 0 { + return fmt.Errorf("failed to prepare packet") + } + + data := proto.Buffer + length := uint32(len(data)) + + // First fragment with total length + out := packets.NewProtoPacket(opcodes.OP_Fragment, nil, s.opcodeManager) + out.Buffer = make([]byte, s.maxLen-4) + + binary.BigEndian.PutUint32(out.Buffer[:4], length) + + used := copy(out.Buffer[4:], data) + out.Buffer = out.Buffer[:4+used] + + if err := s.sendReliable(out); err != nil { + return err + } + + // Send remaining fragments + pos := used + for pos < int(length) { + chunkSize := int(length) - pos + if chunkSize > int(s.maxLen)-4 { + chunkSize = int(s.maxLen) - 4 + } + + frag := packets.NewProtoPacket(opcodes.OP_Fragment, data[pos:pos+chunkSize], s.opcodeManager) + if err := s.sendReliable(frag); err != nil { + return err + } + + pos += chunkSize + } + + return nil +} + +// sendReliable sends a packet reliably with sequencing +func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { + s.mu.Lock() + seq := s.seqOut + s.seqOut = s.incrementSequence(s.seqOut) + s.mu.Unlock() + + data := make([]byte, proto.Size()+2) + binary.BigEndian.PutUint16(data[:2], seq) + proto.Serialize(data[2:], 0) + + pending := &pendingPacket{ + packet: proto.Copy(), + seq: seq, + sentTime: time.Now(), + attempts: 1, + nextRetry: time.Now().Add(s.rto), + } + + s.mu.Lock() + s.pendingAcks[seq] = pending + s.mu.Unlock() + + return s.sendRaw(opcodes.OP_Packet, data) +} + // Helper methods func (s *Stream) isUnreliableOpcode(emuOp opcodes.EmuOpcode) bool { switch emuOp { @@ -363,6 +475,7 @@ func (s *Stream) isSequenceAhead(seq, base uint16) bool { return diff > 0 && diff < 0x8000 } +// Protocol handlers func (s *Stream) handleSessionRequest(data []byte) error { if len(data) < 10 { return fmt.Errorf("session request too small") @@ -430,6 +543,7 @@ func (s *Stream) handlePacket(data []byte) error { return err } + // Process any buffered out-of-order packets for { if buffered, exists := s.outOfOrder[s.seqExpected]; exists { delete(s.outOfOrder, s.seqExpected) @@ -443,12 +557,14 @@ func (s *Stream) handlePacket(data []byte) error { } } } else if s.isSequenceAhead(seq, s.seqExpected) { + // Out of order - buffer it s.outOfOrder[seq] = &outOfOrderPacket{ data: append([]byte(nil), data...), timestamp: time.Now(), } go s.sendOutOfOrderAck(seq) } else { + // Duplicate packet - just ack it go s.sendAckImmediate(seq) } @@ -457,43 +573,46 @@ func (s *Stream) handlePacket(data []byte) error { } func (s *Stream) handleFragment(data []byte) error { - if len(data) < 10 { + if len(data) < 2 { return nil } seq := binary.BigEndian.Uint16(data[:2]) - fragID := binary.BigEndian.Uint32(data[2:6]) - fragTotal := binary.BigEndian.Uint16(data[6:8]) - fragCur := binary.BigEndian.Uint16(data[8:10]) - data = data[10:] + data = data[2:] s.mu.Lock() defer s.mu.Unlock() - frag, exists := s.fragments[uint16(fragID)] - if !exists { - frag = &fragmentBuffer{ - totalSize: uint32(fragTotal), + // Check if this is start of new fragment stream + if s.currentFragment == nil && len(data) >= 4 { + totalLen := binary.BigEndian.Uint32(data[:4]) + s.currentFragment = &fragmentBuffer{ + totalSize: totalLen, chunks: make(map[uint16][]byte), startTime: time.Now(), } - s.fragments[uint16(fragID)] = frag - } - frag.chunks[fragCur] = append([]byte(nil), data...) - frag.received++ - - if frag.received == uint32(fragTotal) { - complete := make([]byte, 0) - for i := uint16(0); i < fragTotal; i++ { - if chunk, ok := frag.chunks[i]; ok { - complete = append(complete, chunk...) - } else { - return nil - } + if len(data) > 4 { + s.currentFragment.chunks[0] = append([]byte(nil), data[4:]...) + s.currentFragment.received = uint32(len(data) - 4) + } + } else if s.currentFragment != nil { + // Continuation fragment + chunkNum := uint16(len(s.currentFragment.chunks)) + s.currentFragment.chunks[chunkNum] = append([]byte(nil), data...) + s.currentFragment.received += uint32(len(data)) + + // Check if complete + if s.currentFragment.received >= s.currentFragment.totalSize { + complete := make([]byte, 0, s.currentFragment.totalSize) + for i := uint16(0); i < uint16(len(s.currentFragment.chunks)); i++ { + if chunk, ok := s.currentFragment.chunks[i]; ok { + complete = append(complete, chunk...) + } + } + s.currentFragment = nil + return s.processPacketData(complete) } - delete(s.fragments, uint16(fragID)) - return s.processPacketData(complete) } s.ackQueue = append(s.ackQueue, seq) @@ -515,6 +634,7 @@ func (s *Stream) handleAck(data []byte) error { delete(s.pendingAcks, ackSeq) delete(s.duplicateAckCnt, ackSeq) + // Update RTT sample := time.Since(pending.sentTime) if s.rtt == 0 { s.rtt = sample @@ -533,9 +653,11 @@ func (s *Stream) handleAck(data []byte) error { s.rto = s.maxRTO } } else { + // Duplicate ACK s.duplicateAckCnt[ackSeq]++ if s.duplicateAckCnt[ackSeq] >= duplicateAckThreshold { + // Fast retransmit for seq, pending := range s.pendingAcks { if s.isSequenceAhead(seq, ackSeq) { s.resendQueue = append(s.resendQueue, pending) @@ -641,83 +763,19 @@ func (s *Stream) handleDisconnect() error { return nil } -func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { - data := make([]byte, proto.Size()) - proto.Serialize(data, 0) - - fragSize := int(s.maxLen) - 12 - numFrags := (len(data) + fragSize - 1) / fragSize - - if numFrags > 0xFFFF { - return fmt.Errorf("packet too large to fragment") - } - - s.mu.Lock() - fragID := s.nextFragID - s.nextFragID++ - s.mu.Unlock() - - for i := 0; i < numFrags; i++ { - start := i * fragSize - end := start + fragSize - if end > len(data) { - end = len(data) - } - - fragHeader := make([]byte, 10) - binary.BigEndian.PutUint32(fragHeader[0:4], uint32(fragID)) - binary.BigEndian.PutUint16(fragHeader[4:6], uint16(numFrags)) - binary.BigEndian.PutUint16(fragHeader[6:8], uint16(i)) - binary.BigEndian.PutUint16(fragHeader[8:10], uint16(end-start)) - - fragment := append(fragHeader, data[start:end]...) - fragProto := packets.NewProtoPacket(opcodes.OP_Fragment, fragment, s.opcodeManager) - - if err := s.sendReliable(fragProto); err != nil { - return err - } - } - - return nil -} - -func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { - s.mu.Lock() - seq := s.seqOut - s.seqOut = s.incrementSequence(s.seqOut) - s.mu.Unlock() - - data := make([]byte, proto.Size()+2) - binary.BigEndian.PutUint16(data[:2], seq) - proto.Serialize(data[2:], 0) - - pending := &pendingPacket{ - packet: proto.Copy(), - seq: seq, - sentTime: time.Now(), - attempts: 1, - nextRetry: time.Now().Add(s.rto), - } - - s.mu.Lock() - s.pendingAcks[seq] = pending - s.mu.Unlock() - - return s.sendRaw(opcodes.OP_Packet, data) -} - +// processPacketData processes received packet data func (s *Stream) processPacketData(data []byte) error { proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager) - if err := proto.DecompressPacket(); err != nil { - return err - } app := proto.MakeApplicationPacket(s.opcodeSize) + if s.onPacket != nil { go s.onPacket(app) } + return nil } +// Timer handlers func (s *Stream) processRetransmits() { if s.state.Load() != StateEstablished { return @@ -757,18 +815,21 @@ func (s *Stream) cleanup() { s.mu.Lock() now := time.Now() + // Clean up old fragments for id, frag := range s.fragments { if now.Sub(frag.startTime) > fragmentTimeout { delete(s.fragments, id) } } + // Clean up old out-of-order packets for seq, oop := range s.outOfOrder { if now.Sub(oop.timestamp) > outOfOrderTimeout { delete(s.outOfOrder, seq) } } + // Clean up duplicate ACK counts for seq := range s.duplicateAckCnt { if _, exists := s.pendingAcks[seq]; !exists { delete(s.duplicateAckCnt, seq) @@ -841,14 +902,16 @@ func (s *Stream) handleTimeout() { } } +// sendRaw sends raw packet with CRC16 func (s *Stream) sendRaw(opcode uint16, data []byte) error { - packet := make([]byte, 2+len(data)+2) + packet := make([]byte, 2+len(data)) binary.BigEndian.PutUint16(packet[:2], opcode) copy(packet[2:], data) - crc := crypto.CalculateCRC(packet[:len(packet)-2], s.crcKey) - binary.BigEndian.PutUint16(packet[len(packet)-2:], crc) + // Add CRC16 + packet = packets.AppendCRC(packet, s.crcKey) + // Encrypt if needed if s.cipher != nil { s.cipher.Encrypt(packet) } @@ -859,14 +922,6 @@ func (s *Stream) sendRaw(opcode uint16, data []byte) error { return s.conn.AsyncWrite(packet, nil) } -func (s *Stream) appToProto(app *packets.AppPacket) *packets.ProtoPacket { - proto := packets.NewProtoPacket(app.Opcode, app.Buffer, s.opcodeManager) - proto.CopyInfo(app.Packet) - proto.CompressThreshold = 100 - proto.EncodeKey = s.encodeKey - return proto -} - // Public methods func (s *Stream) SetPacketCallback(fn func(*packets.AppPacket)) { s.onPacket = fn @@ -886,7 +941,7 @@ func (s *Stream) IsConnected() bool { func (s *Stream) SendSessionRequest() error { data := make([]byte, 10) - binary.BigEndian.PutUint32(data[0:4], 2) + binary.BigEndian.PutUint32(data[0:4], 2) // protocol version binary.BigEndian.PutUint32(data[4:8], s.sessionID) binary.BigEndian.PutUint16(data[8:10], s.maxLen) @@ -921,6 +976,7 @@ func (s *Stream) Close() error { s.state.Store(StateClosed) s.sendRaw(opcodes.OP_SessionDisconnect, nil) + // Stop all timers if s.keepAliveTimer != nil { s.keepAliveTimer.Stop() } @@ -936,7 +992,11 @@ func (s *Stream) Close() error { if s.cleanupTimer != nil { s.cleanupTimer.Stop() } + if s.combineTimer != nil { + s.combineTimer.Stop() + } + // Clear queues s.mu.Lock() s.reliableQueue = nil s.unreliableQueue = nil @@ -944,11 +1004,32 @@ func (s *Stream) Close() error { s.pendingAcks = nil s.fragments = nil s.outOfOrder = nil + s.combinedPacket = nil + s.currentFragment = nil s.mu.Unlock() return nil } +// GetStats returns stream statistics +func (s *Stream) GetStats() map[string]interface{} { + s.mu.RLock() + defer s.mu.RUnlock() + + return map[string]interface{}{ + "packets_out": atomic.LoadUint64(&s.packetsOut), + "packets_in": atomic.LoadUint64(&s.packetsIn), + "bytes_out": atomic.LoadUint64(&s.bytesOut), + "bytes_in": atomic.LoadUint64(&s.bytesIn), + "retransmits": atomic.LoadUint64(&s.retransmits), + "pending_acks": len(s.pendingAcks), + "out_of_order": len(s.outOfOrder), + "fragments": len(s.fragments), + "rtt": s.rtt, + "rto": s.rto, + } +} + func absTime(d time.Duration) time.Duration { if d < 0 { return -d