diff --git a/gcc/arrival_group_accumulator.go b/gcc/arrival_group_accumulator.go new file mode 100644 index 0000000..9cc8719 --- /dev/null +++ b/gcc/arrival_group_accumulator.go @@ -0,0 +1,85 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package gcc + +import ( + "time" +) + +type arrivalGroupItem struct { + SequenceNumber uint64 + Departure time.Time + Arrival time.Time + Size int +} + +type arrivalGroup []arrivalGroupItem + +type arrivalGroupAccumulator struct { + next arrivalGroup + burstInterval time.Duration + maxBurstDuration time.Duration +} + +func newArrivalGroupAccumulator() *arrivalGroupAccumulator { + return &arrivalGroupAccumulator{ + next: make([]arrivalGroupItem, 0), + burstInterval: 5 * time.Millisecond, + maxBurstDuration: 5 * time.Millisecond, + } +} + +func (a *arrivalGroupAccumulator) onPacketAcked( + sequenceNumber uint64, + size int, + departure, arrival time.Time, +) arrivalGroup { + if len(a.next) == 0 { + a.next = append(a.next, arrivalGroupItem{ + SequenceNumber: sequenceNumber, + Size: size, + Departure: departure, + Arrival: arrival, + }) + + return nil + } + + sendTimeDelta := departure.Sub(a.next[0].Departure) + if sendTimeDelta < a.burstInterval { + a.next = append(a.next, arrivalGroupItem{ + SequenceNumber: sequenceNumber, + Size: size, + Departure: departure, + Arrival: arrival, + }) + + return nil + } + + arrivalTimeDeltaFirst := arrival.Sub(a.next[0].Arrival) + propagationDelta := arrivalTimeDeltaFirst - sendTimeDelta + + if propagationDelta < 0 && arrivalTimeDeltaFirst < a.maxBurstDuration { + a.next = append(a.next, arrivalGroupItem{ + SequenceNumber: sequenceNumber, + Size: size, + Departure: departure, + Arrival: arrival, + }) + + return nil + } + + group := make(arrivalGroup, len(a.next)) + copy(group, a.next) + a.next = arrivalGroup{arrivalGroupItem{ + SequenceNumber: sequenceNumber, + Size: size, + Departure: departure, + Arrival: arrival, + }} + + return group +} diff --git a/gcc/arrival_group_accumulator_test.go b/gcc/arrival_group_accumulator_test.go new file mode 100644 index 0000000..213856b --- /dev/null +++ b/gcc/arrival_group_accumulator_test.go @@ -0,0 +1,244 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package gcc + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestArrivalGroupAccumulator(t *testing.T) { + type logItem struct { + SequenceNumber uint64 + Departure time.Time + Arrival time.Time + } + triggerNewGroupElement := logItem{ + Departure: time.Time{}.Add(time.Second), + Arrival: time.Time{}.Add(time.Second), + } + cases := []struct { + name string + log []logItem + exp []arrivalGroup + }{ + { + name: "emptyCreatesNoGroups", + log: []logItem{}, + exp: []arrivalGroup{}, + }, + { + name: "createsSingleElementGroup", + log: []logItem{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{ + { + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(time.Millisecond), + }, + }, + }, + }, + { + name: "createsTwoElementGroup", + log: []logItem{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(20 * time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(20 * time.Millisecond), + }, + }}, + }, + { + name: "createsTwoArrivalGroups1", + log: []logItem{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(20 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(9 * time.Millisecond), + Arrival: time.Time{}.Add(24 * time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{ + { + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(20 * time.Millisecond), + }, + }, + { + { + Departure: time.Time{}.Add(9 * time.Millisecond), + Arrival: time.Time{}.Add(24 * time.Millisecond), + }, + }, + }, + }, + { + name: "ignoresOutOfOrderPackets", + log: []logItem{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(6 * time.Millisecond), + Arrival: time.Time{}.Add(34 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(8 * time.Millisecond), + Arrival: time.Time{}.Add(30 * time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{ + { + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + }, + { + { + Departure: time.Time{}.Add(6 * time.Millisecond), + Arrival: time.Time{}.Add(34 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(8 * time.Millisecond), + Arrival: time.Time{}.Add(30 * time.Millisecond), + }, + }, + }, + }, + { + name: "newGroupBecauseOfInterDepartureTime", + log: []logItem{ + { + SequenceNumber: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(4 * time.Millisecond), + }, + { + SequenceNumber: 1, + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(4 * time.Millisecond), + }, + { + SequenceNumber: 2, + Departure: time.Time{}.Add(6 * time.Millisecond), + Arrival: time.Time{}.Add(10 * time.Millisecond), + }, + { + SequenceNumber: 3, + Departure: time.Time{}.Add(9 * time.Millisecond), + Arrival: time.Time{}.Add(10 * time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{ + { + { + SequenceNumber: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(4 * time.Millisecond), + }, + { + SequenceNumber: 1, + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(4 * time.Millisecond), + }, + }, + { + { + SequenceNumber: 2, + Departure: time.Time{}.Add(6 * time.Millisecond), + Arrival: time.Time{}.Add(10 * time.Millisecond), + }, + { + SequenceNumber: 3, + Departure: time.Time{}.Add(9 * time.Millisecond), + Arrival: time.Time{}.Add(10 * time.Millisecond), + }, + }, + }, + }, + { + name: "createsSingleGroupArrivalBurst", + log: []logItem{ + { + SequenceNumber: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(10 * time.Millisecond), + }, + { + SequenceNumber: 1, + Departure: time.Time{}.Add(10 * time.Millisecond), + Arrival: time.Time{}.Add(12 * time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{ + { + { + SequenceNumber: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(10 * time.Millisecond), + }, + { + SequenceNumber: 1, + Departure: time.Time{}.Add(10 * time.Millisecond), + Arrival: time.Time{}.Add(12 * time.Millisecond), + }, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + aga := newArrivalGroupAccumulator() + received := []arrivalGroup{} + for _, ack := range tc.log { + next := aga.onPacketAcked(ack.SequenceNumber, 0, ack.Departure, ack.Arrival) + if next != nil { + received = append(received, next) + } + } + assert.Equal(t, tc.exp, received) + }) + } +}