Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions gcc/arrival_group_accumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package gcc

import (
"time"
)

type arrivalGroupItem struct {
SequenceNumber uint64
Departure time.Time
Arrival time.Time
Size int
}

type arrivalGroup []arrivalGroupItem

type arrivalGroupAccumulator struct {
next arrivalGroup
burstInterval time.Duration
maxBurstDuration time.Duration
}

func newArrivalGroupAccumulator() *arrivalGroupAccumulator {
return &arrivalGroupAccumulator{
next: make([]arrivalGroupItem, 0),
burstInterval: 5 * time.Millisecond,
maxBurstDuration: 5 * time.Millisecond,
}
}

func (a *arrivalGroupAccumulator) onPacketAcked(
sequenceNumber uint64,
size int,
departure, arrival time.Time,
) arrivalGroup {
if len(a.next) == 0 {
a.next = append(a.next, arrivalGroupItem{
SequenceNumber: sequenceNumber,
Size: size,
Departure: departure,
Arrival: arrival,
})

return nil
}

sendTimeDelta := departure.Sub(a.next[0].Departure)
if sendTimeDelta < a.burstInterval {
a.next = append(a.next, arrivalGroupItem{
SequenceNumber: sequenceNumber,
Size: size,
Departure: departure,
Arrival: arrival,
})

return nil
}

arrivalTimeDeltaFirst := arrival.Sub(a.next[0].Arrival)
propagationDelta := arrivalTimeDeltaFirst - sendTimeDelta

if propagationDelta < 0 && arrivalTimeDeltaFirst < a.maxBurstDuration {
a.next = append(a.next, arrivalGroupItem{
SequenceNumber: sequenceNumber,
Size: size,
Departure: departure,
Arrival: arrival,
})

return nil
}

group := make(arrivalGroup, len(a.next))
copy(group, a.next)
a.next = arrivalGroup{arrivalGroupItem{
SequenceNumber: sequenceNumber,
Size: size,
Departure: departure,
Arrival: arrival,
}}

return group
}
244 changes: 244 additions & 0 deletions gcc/arrival_group_accumulator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package gcc

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestArrivalGroupAccumulator(t *testing.T) {
type logItem struct {
SequenceNumber uint64
Departure time.Time
Arrival time.Time
}
triggerNewGroupElement := logItem{
Departure: time.Time{}.Add(time.Second),
Arrival: time.Time{}.Add(time.Second),
}
cases := []struct {
name string
log []logItem
exp []arrivalGroup
}{
{
name: "emptyCreatesNoGroups",
log: []logItem{},
exp: []arrivalGroup{},
},
{
name: "createsSingleElementGroup",
log: []logItem{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(time.Millisecond),
},
},
},
},
{
name: "createsTwoElementGroup",
log: []logItem{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(20 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(20 * time.Millisecond),
},
}},
},
{
name: "createsTwoArrivalGroups1",
log: []logItem{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(20 * time.Millisecond),
},
{
Departure: time.Time{}.Add(9 * time.Millisecond),
Arrival: time.Time{}.Add(24 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(20 * time.Millisecond),
},
},
{
{
Departure: time.Time{}.Add(9 * time.Millisecond),
Arrival: time.Time{}.Add(24 * time.Millisecond),
},
},
},
},
{
name: "ignoresOutOfOrderPackets",
log: []logItem{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
Departure: time.Time{}.Add(6 * time.Millisecond),
Arrival: time.Time{}.Add(34 * time.Millisecond),
},
{
Departure: time.Time{}.Add(8 * time.Millisecond),
Arrival: time.Time{}.Add(30 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
},
{
{
Departure: time.Time{}.Add(6 * time.Millisecond),
Arrival: time.Time{}.Add(34 * time.Millisecond),
},
{
Departure: time.Time{}.Add(8 * time.Millisecond),
Arrival: time.Time{}.Add(30 * time.Millisecond),
},
},
},
},
{
name: "newGroupBecauseOfInterDepartureTime",
log: []logItem{
{
SequenceNumber: 0,
Departure: time.Time{},
Arrival: time.Time{}.Add(4 * time.Millisecond),
},
{
SequenceNumber: 1,
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(4 * time.Millisecond),
},
{
SequenceNumber: 2,
Departure: time.Time{}.Add(6 * time.Millisecond),
Arrival: time.Time{}.Add(10 * time.Millisecond),
},
{
SequenceNumber: 3,
Departure: time.Time{}.Add(9 * time.Millisecond),
Arrival: time.Time{}.Add(10 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
SequenceNumber: 0,
Departure: time.Time{},
Arrival: time.Time{}.Add(4 * time.Millisecond),
},
{
SequenceNumber: 1,
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(4 * time.Millisecond),
},
},
{
{
SequenceNumber: 2,
Departure: time.Time{}.Add(6 * time.Millisecond),
Arrival: time.Time{}.Add(10 * time.Millisecond),
},
{
SequenceNumber: 3,
Departure: time.Time{}.Add(9 * time.Millisecond),
Arrival: time.Time{}.Add(10 * time.Millisecond),
},
},
},
},
{
name: "createsSingleGroupArrivalBurst",
log: []logItem{
{
SequenceNumber: 0,
Departure: time.Time{},
Arrival: time.Time{}.Add(10 * time.Millisecond),
},
{
SequenceNumber: 1,
Departure: time.Time{}.Add(10 * time.Millisecond),
Arrival: time.Time{}.Add(12 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
SequenceNumber: 0,
Departure: time.Time{},
Arrival: time.Time{}.Add(10 * time.Millisecond),
},
{
SequenceNumber: 1,
Departure: time.Time{}.Add(10 * time.Millisecond),
Arrival: time.Time{}.Add(12 * time.Millisecond),
},
},
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
aga := newArrivalGroupAccumulator()
received := []arrivalGroup{}
for _, ack := range tc.log {
next := aga.onPacketAcked(ack.SequenceNumber, 0, ack.Departure, ack.Arrival)
if next != nil {
received = append(received, next)
}
}
assert.Equal(t, tc.exp, received)
})
}
}
Loading