From 9cc3273a81db99c67fc6e5da2ca6585479fb483f Mon Sep 17 00:00:00 2001 From: Sky Johnson Date: Wed, 3 Sep 2025 13:00:54 -0500 Subject: [PATCH] work on stream --- stream/stream.go | 741 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 601 insertions(+), 140 deletions(-) diff --git a/stream/stream.go b/stream/stream.go index 1553708..f2bd68d 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -2,6 +2,7 @@ package stream import ( "encoding/binary" + "fmt" "sync" "sync/atomic" "time" @@ -26,20 +27,25 @@ type Stream struct { decodeKey int // Sequence management - seqOut uint16 - seqIn uint16 - seqLastAck uint16 + seqOut uint16 + seqIn uint16 + seqLastAck uint16 + seqExpected uint16 // Acknowledgment tracking pendingAcks map[uint16]*pendingPacket ackTimer *time.Timer lastAckSent time.Time ackThreshold time.Duration + ackQueue []uint16 // Sequences needing ACK // Fragment assembly fragments map[uint16]*fragmentBuffer nextFragID uint16 + // Out of order handling + outOfOrder map[uint16][]byte + // Packet queues reliableQueue []*packets.ProtoPacket unreliableQueue []*packets.ProtoPacket @@ -56,11 +62,20 @@ type Stream struct { keepAliveTimer *time.Timer timeoutTimer *time.Timer + // Retransmission settings + rtt time.Duration + rttVar time.Duration + rto time.Duration + minRTO time.Duration + maxRTO time.Duration + maxRetries int + // Stats - packetsOut uint64 - packetsIn uint64 - bytesOut uint64 - bytesIn uint64 + packetsOut uint64 + packetsIn uint64 + bytesOut uint64 + bytesIn uint64 + retransmits uint64 // Callbacks onPacket func(*packets.AppPacket) @@ -120,8 +135,17 @@ func NewStream(conn gnet.Conn, cfg *Config) *Stream { ackThreshold: cfg.AckThreshold, pendingAcks: make(map[uint16]*pendingPacket), fragments: make(map[uint16]*fragmentBuffer), + outOfOrder: make(map[uint16][]byte), + ackQueue: make([]uint16, 0), seqOut: 0, seqIn: 0, + seqExpected: 0, + rtt: time.Second, + rttVar: 500 * time.Millisecond, + rto: time.Second, + minRTO: 200 * time.Millisecond, + maxRTO: 10 * time.Second, + maxRetries: 10, } if s.maxLen == 0 { @@ -144,6 +168,11 @@ func NewStream(conn gnet.Conn, cfg *Config) *Stream { s.keepAliveTimer = time.AfterFunc(cfg.KeepaliveTime, s.sendKeepalive) } + // Start timeout timer + if cfg.TimeoutDuration > 0 { + s.timeoutTimer = time.AfterFunc(cfg.TimeoutDuration, s.handleTimeout) + } + return s } @@ -195,6 +224,8 @@ func (s *Stream) Process(data []byte) error { return nil case opcodes.OP_SessionDisconnect: return s.handleDisconnect() + case opcodes.OP_OutOfOrderAck: + return s.handleOutOfOrderAck(data[2:]) } atomic.AddUint64(&s.packetsIn, 1) @@ -203,6 +234,62 @@ func (s *Stream) Process(data []byte) error { return nil } +// handleSessionRequest processes session establishment request +func (s *Stream) handleSessionRequest(data []byte) error { + if len(data) < 10 { + return fmt.Errorf("session request too small") + } + + // Parse session request + // Format: version(4) + sessionID(4) + maxLen(2) + version := binary.BigEndian.Uint32(data[:4]) + sessionID := binary.BigEndian.Uint32(data[4:8]) + maxLen := binary.BigEndian.Uint16(data[8:10]) + + // Update session info + s.mu.Lock() + s.sessionID = sessionID + if maxLen < s.maxLen { + s.maxLen = maxLen // Use smaller of the two + } + s.mu.Unlock() + + // Send session response + response := make([]byte, 14) + binary.BigEndian.PutUint32(response[0:4], sessionID) + binary.BigEndian.PutUint32(response[4:8], s.crcKey) + response[8] = 2 // Encoding type + binary.BigEndian.PutUint16(response[9:11], s.maxLen) + binary.BigEndian.PutUint32(response[11:15], version) + + s.state.Store(StateEstablished) + return s.sendRaw(opcodes.OP_SessionResponse, response) +} + +// handleSessionResponse processes session establishment response +func (s *Stream) handleSessionResponse(data []byte) error { + if len(data) < 14 { + return fmt.Errorf("session response too small") + } + + // Parse response + sessionID := binary.BigEndian.Uint32(data[0:4]) + crcKey := binary.BigEndian.Uint32(data[4:8]) + // encodingType := data[8] + maxLen := binary.BigEndian.Uint16(data[9:11]) + + s.mu.Lock() + s.sessionID = sessionID + s.crcKey = crcKey + if maxLen < s.maxLen { + s.maxLen = maxLen + } + s.mu.Unlock() + + s.state.Store(StateEstablished) + return nil +} + // handlePacket processes reliable packets func (s *Stream) handlePacket(data []byte) error { if len(data) < 2 { @@ -215,47 +302,70 @@ func (s *Stream) handlePacket(data []byte) error { s.mu.Lock() defer s.mu.Unlock() - // Check sequence - if seq != s.seqIn { - // Out of order - send immediate ACK - s.sendAckImmediate(seq) - return nil + // Check if this is the expected sequence + if seq == s.seqExpected { + // Process in-order packet + s.seqExpected++ + s.ackQueue = append(s.ackQueue, seq) + + // Process packet data + if err := s.processPacketData(data); err != nil { + return err + } + + // Check for buffered out-of-order packets + for { + if buffered, exists := s.outOfOrder[s.seqExpected]; exists { + delete(s.outOfOrder, s.seqExpected) + s.ackQueue = append(s.ackQueue, s.seqExpected) + s.seqExpected++ + if err := s.processPacketData(buffered); err != nil { + return err + } + } else { + break + } + } + } else if seq > s.seqExpected { + // Out of order - buffer it + s.outOfOrder[seq] = append([]byte(nil), data...) + // Send out-of-order ACK + go s.sendOutOfOrderAck(seq) + } else { + // Duplicate packet - just ACK it + go s.sendAckImmediate(seq) } - // Update sequence - s.seqIn++ + // Schedule grouped ACK + s.scheduleAck() - // Schedule ACK - s.scheduleAck(seq) - - // Process packet data - return s.processPacketData(data) + return nil } // handleFragment assembles fragmented packets func (s *Stream) handleFragment(data []byte) error { - if len(data) < 8 { + if len(data) < 10 { return nil } seq := binary.BigEndian.Uint16(data[:2]) - fragSeq := binary.BigEndian.Uint16(data[2:4]) - fragTotal := binary.BigEndian.Uint16(data[4:6]) - fragCur := binary.BigEndian.Uint16(data[6:8]) - data = data[8:] + fragID := binary.BigEndian.Uint32(data[2:6]) + fragTotal := binary.BigEndian.Uint16(data[6:8]) + fragCur := binary.BigEndian.Uint16(data[8:10]) + data = data[10:] s.mu.Lock() defer s.mu.Unlock() // Get or create fragment buffer - frag, exists := s.fragments[fragSeq] + frag, exists := s.fragments[uint16(fragID)] if !exists { frag = &fragmentBuffer{ totalSize: uint32(fragTotal), chunks: make(map[uint16][]byte), startTime: time.Now(), } - s.fragments[fragSeq] = frag + s.fragments[uint16(fragID)] = frag } // Store chunk @@ -264,21 +374,25 @@ func (s *Stream) handleFragment(data []byte) error { // Check if complete if frag.received == uint32(fragTotal) { - // Reassemble + // Reassemble in order complete := make([]byte, 0) for i := uint16(0); i < fragTotal; i++ { if chunk, ok := frag.chunks[i]; ok { complete = append(complete, chunk...) + } else { + // Missing chunk - wait for retransmit + return nil } } - delete(s.fragments, fragSeq) + delete(s.fragments, uint16(fragID)) // Process reassembled packet return s.processPacketData(complete) } - // Schedule ACK for fragment - s.scheduleAck(seq) + // ACK the fragment + s.ackQueue = append(s.ackQueue, seq) + s.scheduleAck() return nil } @@ -294,16 +408,138 @@ func (s *Stream) handleAck(data []byte) error { s.mu.Lock() defer s.mu.Unlock() - // Remove from pending + // Remove from pending and update RTT if pending, exists := s.pendingAcks[ackSeq]; exists { delete(s.pendingAcks, ackSeq) - // Update RTT stats here if needed - _ = pending + + // Update RTT estimates (TCP-like algorithm) + sample := time.Since(pending.sentTime) + if s.rtt == 0 { + s.rtt = sample + s.rttVar = sample / 2 + } else { + alpha := 0.125 + beta := 0.25 + s.rttVar = time.Duration((1-beta)*float64(s.rttVar) + beta*float64(absTime(s.rtt-sample))) + s.rtt = time.Duration((1-alpha)*float64(s.rtt) + alpha*float64(sample)) + } + s.rto = s.rtt + 4*s.rttVar + if s.rto < s.minRTO { + s.rto = s.minRTO + } + if s.rto > s.maxRTO { + s.rto = s.maxRTO + } } - // Update last ACK seen + // Fast retransmit: if ACK is higher than pending packets, trigger retransmit if ackSeq > s.seqLastAck { s.seqLastAck = ackSeq + + // Check for gaps that need immediate retransmit + for seq, pending := range s.pendingAcks { + if seq < ackSeq { + // This packet was likely lost, retransmit immediately + s.resendQueue = append(s.resendQueue, pending) + } + } + } + + return nil +} + +// handleOutOfOrderAck handles out-of-order acknowledgments +func (s *Stream) handleOutOfOrderAck(data []byte) error { + if len(data) < 2 { + return nil + } + + seq := binary.BigEndian.Uint16(data[:2]) + + s.mu.Lock() + defer s.mu.Unlock() + + // Immediately retransmit if we have this packet pending + if pending, exists := s.pendingAcks[seq]; exists { + s.resendQueue = append(s.resendQueue, pending) + } + + return nil +} + +// handleCombined processes combined protocol packets +func (s *Stream) handleCombined(data []byte) error { + pos := 0 + + for pos < len(data) { + if pos+1 > len(data) { + break + } + + // Read packet size + size := uint16(data[pos]) + pos++ + + // Check for oversized marker + if size == 0xff { + if pos+2 > len(data) { + break + } + size = binary.BigEndian.Uint16(data[pos : pos+2]) + pos += 2 + } + + // Extract packet + if pos+int(size) > len(data) { + break + } + + packet := data[pos : pos+int(size)] + pos += int(size) + + // Process each sub-packet + if err := s.Process(packet); err != nil { + return err + } + } + + return nil +} + +// handleAppCombined processes combined application packets +func (s *Stream) handleAppCombined(data []byte) error { + pos := 0 + + for pos < len(data) { + if pos+1 > len(data) { + break + } + + // Read packet size + size := uint16(data[pos]) + pos++ + + // Check for oversized marker + if size == 0xff { + if pos+2 > len(data) { + break + } + size = binary.BigEndian.Uint16(data[pos : pos+2]) + pos += 2 + } + + // Extract packet + if pos+int(size) > len(data) { + break + } + + packet := data[pos : pos+int(size)] + pos += int(size) + + // Process as application packet + if err := s.processPacketData(packet); err != nil { + return err + } } return nil @@ -311,29 +547,79 @@ func (s *Stream) handleAck(data []byte) error { // SendPacket sends an application packet func (s *Stream) SendPacket(app *packets.AppPacket) error { + // Check state + if s.state.Load() != StateEstablished { + return fmt.Errorf("stream not established") + } + // Convert to protocol packet proto := s.appToProto(app) // Check if needs fragmentation - if proto.Size() > uint32(s.maxLen) { + if proto.Size() > uint32(s.maxLen-10) { // Reserve space for headers return s.sendFragmented(proto) } - // Add to appropriate queue + // Add to appropriate queue based on reliability + s.mu.Lock() if app.Priority > packets.PriorityNormal { - s.mu.Lock() s.reliableQueue = append(s.reliableQueue, proto) - s.mu.Unlock() } else { - s.mu.Lock() s.unreliableQueue = append(s.unreliableQueue, proto) - s.mu.Unlock() } + s.mu.Unlock() // Process queues return s.processQueues() } +// sendFragmented fragments and sends large packets +func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { + data := make([]byte, proto.Size()) + proto.Serialize(data, 0) + + // Calculate fragment size (leave room for headers) + fragSize := int(s.maxLen) - 12 // Fragment header is 10 bytes + 2 for protocol + numFrags := (len(data) + fragSize - 1) / fragSize + + if numFrags > 0xFFFF { + return fmt.Errorf("packet too large to fragment") + } + + s.mu.Lock() + fragID := s.nextFragID + s.nextFragID++ + s.mu.Unlock() + + // Send each fragment + for i := 0; i < numFrags; i++ { + start := i * fragSize + end := start + fragSize + if end > len(data) { + end = len(data) + } + + // Build fragment header + fragHeader := make([]byte, 10) + // Sequence added by sendReliable + binary.BigEndian.PutUint32(fragHeader[0:4], uint32(fragID)) + binary.BigEndian.PutUint16(fragHeader[4:6], uint16(numFrags)) + binary.BigEndian.PutUint16(fragHeader[6:8], uint16(i)) + binary.BigEndian.PutUint16(fragHeader[8:10], uint16(end-start)) + + // Combine header and data + fragment := append(fragHeader, data[start:end]...) + + // Send as reliable packet + fragProto := packets.NewProtoPacket(opcodes.OP_Fragment, fragment, s.opcodeManager) + if err := s.sendReliable(fragProto); err != nil { + return err + } + } + + return nil +} + // sendReliable sends a reliable packet with sequence number func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { s.mu.Lock() @@ -348,11 +634,11 @@ func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { // Track for retransmission pending := &pendingPacket{ - packet: proto, + packet: proto.Copy(), seq: seq, sentTime: time.Now(), attempts: 1, - nextRetry: time.Now().Add(time.Second), + nextRetry: time.Now().Add(s.rto), } s.mu.Lock() @@ -363,6 +649,183 @@ func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { return s.sendRaw(opcodes.OP_Packet, data) } +// processQueues processes all packet queues +func (s *Stream) processQueues() error { + s.mu.Lock() + + // Process resends first (highest priority) + for len(s.resendQueue) > 0 { + pending := s.resendQueue[0] + s.resendQueue = s.resendQueue[1:] + + if time.Now().After(pending.nextRetry) { + s.mu.Unlock() + + // Rebuild packet + data := make([]byte, pending.packet.Size()+2) + binary.BigEndian.PutUint16(data[:2], pending.seq) + pending.packet.Serialize(data[2:], 0) + + // Update pending info + pending.attempts++ + pending.sentTime = time.Now() + pending.nextRetry = time.Now().Add(s.rto * time.Duration(pending.attempts)) + + // Check max retries + if pending.attempts > s.maxRetries { + s.mu.Lock() + delete(s.pendingAcks, pending.seq) + s.mu.Unlock() + // Connection likely dead + s.handleTimeout() + return fmt.Errorf("max retransmissions exceeded") + } + + atomic.AddUint64(&s.retransmits, 1) + s.sendRaw(opcodes.OP_Packet, data) + + s.mu.Lock() + } else { + // Not time yet, re-queue + s.resendQueue = append(s.resendQueue, pending) + } + } + + // Process reliable queue + for len(s.reliableQueue) > 0 { + proto := s.reliableQueue[0] + s.reliableQueue = s.reliableQueue[1:] + s.mu.Unlock() + + if err := s.sendReliable(proto); err != nil { + return err + } + + s.mu.Lock() + } + + // Process unreliable queue + for len(s.unreliableQueue) > 0 { + proto := s.unreliableQueue[0] + s.unreliableQueue = s.unreliableQueue[1:] + s.mu.Unlock() + + // Send unreliable (no sequence/retransmit) + data := make([]byte, proto.Size()) + proto.Serialize(data, 0) + s.sendRaw(proto.Opcode, data) + + s.mu.Lock() + } + + // Check for retransmissions needed + now := time.Now() + for _, pending := range s.pendingAcks { + if now.After(pending.nextRetry) { + s.resendQueue = append(s.resendQueue, pending) + } + } + + s.mu.Unlock() + + // Process any pending ACKs + s.sendPendingAcks() + + return nil +} + +// Helper methods + +func (s *Stream) processPacketData(data []byte) error { + // Create ProtoPacket from raw data + proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager) + + // Decompress if needed + if err := proto.DecompressPacket(); err != nil { + return err + } + + // Convert to AppPacket + app := proto.MakeApplicationPacket(s.opcodeSize) + + // Deliver to callback + if s.onPacket != nil { + go s.onPacket(app) + } + + return nil +} + +func (s *Stream) scheduleAck() { + if s.ackTimer == nil { + s.ackTimer = time.AfterFunc(s.ackThreshold, func() { + s.sendPendingAcks() + }) + } +} + +func (s *Stream) sendPendingAcks() { + s.mu.Lock() + if len(s.ackQueue) == 0 { + s.mu.Unlock() + return + } + + // Send all pending ACKs + for _, seq := range s.ackQueue { + data := make([]byte, 2) + binary.BigEndian.PutUint16(data, seq) + s.sendRaw(opcodes.OP_Ack, data) + } + + s.ackQueue = s.ackQueue[:0] + s.lastAckSent = time.Now() + s.mu.Unlock() +} + +func (s *Stream) sendAckImmediate(seq uint16) error { + data := make([]byte, 2) + binary.BigEndian.PutUint16(data, seq) + return s.sendRaw(opcodes.OP_Ack, data) +} + +func (s *Stream) sendOutOfOrderAck(seq uint16) error { + data := make([]byte, 2) + binary.BigEndian.PutUint16(data, seq) + return s.sendRaw(opcodes.OP_OutOfOrderAck, data) +} + +func (s *Stream) sendKeepalive() { + if s.state.Load() == StateEstablished { + s.sendRaw(opcodes.OP_KeepAlive, nil) + } + // Reschedule + if s.keepAliveTimer != nil { + s.keepAliveTimer.Reset(10 * time.Second) + } +} + +func (s *Stream) resetTimeout() { + if s.timeoutTimer != nil { + s.timeoutTimer.Reset(30 * time.Second) + } +} + +func (s *Stream) handleTimeout() { + s.state.Store(StateClosed) + if s.onDisconnect != nil { + s.onDisconnect() + } +} + +func (s *Stream) handleDisconnect() error { + s.state.Store(StateClosed) + if s.onDisconnect != nil { + go s.onDisconnect() + } + return nil +} + // sendRaw sends raw data with protocol opcode func (s *Stream) sendRaw(opcode uint16, data []byte) error { // Build packet: opcode + data + CRC @@ -386,121 +849,96 @@ func (s *Stream) sendRaw(opcode uint16, data []byte) error { return s.conn.AsyncWrite(packet, nil) } -// Helper methods - -func (s *Stream) processPacketData(data []byte) error { - // Create ProtoPacket from raw data - proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager) - - // Decompress if needed - proto.DecompressPacket() - - // Convert to AppPacket - app := proto.MakeApplicationPacket(s.opcodeSize) - - // Deliver to callback - if s.onPacket != nil { - s.onPacket(app) - } - - return nil -} - -func (s *Stream) scheduleAck(seq uint16) { - if s.ackTimer == nil { - s.ackTimer = time.AfterFunc(s.ackThreshold, func() { - s.sendAckImmediate(seq) - }) - } -} - -func (s *Stream) sendAckImmediate(seq uint16) error { - data := make([]byte, 2) - binary.BigEndian.PutUint16(data, seq) - return s.sendRaw(opcodes.OP_Ack, data) -} - -func (s *Stream) sendKeepalive() { - s.sendRaw(opcodes.OP_KeepAlive, nil) - // Reschedule - if s.keepAliveTimer != nil { - s.keepAliveTimer.Reset(10 * time.Second) - } -} - -func (s *Stream) resetTimeout() { - if s.timeoutTimer != nil { - s.timeoutTimer.Reset(30 * time.Second) - } -} - func (s *Stream) appToProto(app *packets.AppPacket) *packets.ProtoPacket { proto := packets.NewProtoPacket(app.Opcode, app.Buffer, s.opcodeManager) proto.CopyInfo(app.Packet) + proto.CompressThreshold = 100 + proto.EncodeKey = s.encodeKey return proto } -func (s *Stream) processQueues() error { - // Process reliable queue first - s.mu.Lock() - for len(s.reliableQueue) > 0 { - proto := s.reliableQueue[0] - s.reliableQueue = s.reliableQueue[1:] - s.mu.Unlock() - if err := s.sendReliable(proto); err != nil { - return err - } - s.mu.Lock() - } - s.mu.Unlock() - - return nil -} - -func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { - // Fragment implementation - return nil // TODO -} - -func (s *Stream) handleSessionRequest(data []byte) error { - // Session setup - s.state.Store(StateConnecting) - return nil -} - -func (s *Stream) handleSessionResponse(data []byte) error { - s.state.Store(StateEstablished) - return nil -} - -func (s *Stream) handleCombined(data []byte) error { - // Process combined packets - return nil -} - -func (s *Stream) handleAppCombined(data []byte) error { - // Process app-combined packets - return nil -} - -func (s *Stream) handleDisconnect() error { - s.state.Store(StateClosed) - if s.onDisconnect != nil { - s.onDisconnect() - } - return nil -} +// Public methods // SetPacketCallback sets the callback for received packets func (s *Stream) SetPacketCallback(fn func(*packets.AppPacket)) { s.onPacket = fn } +// SetDisconnectCallback sets the callback for disconnection +func (s *Stream) SetDisconnectCallback(fn func()) { + s.onDisconnect = fn +} + +// GetState returns current stream state +func (s *Stream) GetState() StreamState { + return s.state.Load().(StreamState) +} + +// IsConnected returns true if stream is established +func (s *Stream) IsConnected() bool { + return s.GetState() == StateEstablished +} + +// SendSessionRequest initiates client connection +func (s *Stream) SendSessionRequest() error { + // Build session request packet + // Format: version(4) + sessionID(4) + maxLen(2) + data := make([]byte, 10) + binary.BigEndian.PutUint32(data[0:4], 2) // Protocol version + binary.BigEndian.PutUint32(data[4:8], s.sessionID) + binary.BigEndian.PutUint16(data[8:10], s.maxLen) + + s.state.Store(StateConnecting) + return s.sendRaw(opcodes.OP_SessionRequest, data) +} + +// validateSession checks if packet belongs to this session +func (s *Stream) validateSession(sessionID uint32) bool { + if s.sessionID == 0 { + // Not yet established, accept any + return true + } + if sessionID != s.sessionID { + // Wrong session - send out of session response + go s.sendRaw(opcodes.OP_OutOfSession, nil) + return false + } + return true +} + +// GetSessionID returns current session ID +func (s *Stream) GetSessionID() uint32 { + s.mu.RLock() + defer s.mu.RUnlock() + return s.sessionID +} + +// SetSessionID sets the session ID (for client mode) +func (s *Stream) SetSessionID(id uint32) { + s.mu.Lock() + s.sessionID = id + s.mu.Unlock() +} + +// GetRemoteAddr returns remote address +func (s *Stream) GetRemoteAddr() string { + if s.conn != nil { + return s.conn.RemoteAddr().String() + } + return "" +} + // Close closes the stream func (s *Stream) Close() error { + if s.state.Load() == StateClosed { + return nil + } + s.state.Store(StateClosed) + // Send disconnect s.sendRaw(opcodes.OP_SessionDisconnect, nil) + // Clean up timers if s.keepAliveTimer != nil { s.keepAliveTimer.Stop() @@ -508,5 +946,28 @@ func (s *Stream) Close() error { if s.ackTimer != nil { s.ackTimer.Stop() } + if s.timeoutTimer != nil { + s.timeoutTimer.Stop() + } + + // Clear queues + s.mu.Lock() + s.reliableQueue = nil + s.unreliableQueue = nil + s.resendQueue = nil + s.pendingAcks = nil + s.fragments = nil + s.outOfOrder = nil + s.mu.Unlock() + return nil } + +// Utility functions + +func absTime(d time.Duration) time.Duration { + if d < 0 { + return -d + } + return d +}