Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions pkg/gcc/adaptive_threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import (
"time"
)

const (
maxDeltas = 60
)

type adaptiveThresholdOption func(*adaptiveThreshold)

func setInitialThreshold(t time.Duration) adaptiveThresholdOption {
Expand Down Expand Up @@ -65,16 +61,16 @@ func (a *adaptiveThreshold) compare(estimate, _ time.Duration) (usage, time.Dura
if a.numDeltas < 2 {
return usageNormal, estimate, a.max
}
t := time.Duration(minInt(a.numDeltas, maxDeltas)) * estimate

use := usageNormal
if t > a.thresh {
if estimate > a.thresh {
use = usageOver
} else if t < -a.thresh {
} else if estimate < -a.thresh {
use = usageUnder
}
thresh := a.thresh
a.update(t)
return use, t, thresh
a.update(estimate)
return use, estimate, thresh
}

func (a *adaptiveThreshold) update(estimate time.Duration) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/gcc/adaptive_threshold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestAdaptiveThreshold(t *testing.T) {
},
expected: []usage{usageNormal, usageOver, usageNormal},
options: []adaptiveThresholdOption{
setInitialThreshold(40 * time.Millisecond),
setInitialThreshold(20 * time.Millisecond),
},
},
{
Expand Down
50 changes: 39 additions & 11 deletions pkg/gcc/kalman.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ import (
)

const (
chi = 0.001
chi = 0.01
kCount = 25
)

type kalmanOption func(*kalman)

type kalman struct {
gain float64
estimate time.Duration
processUncertainty float64 // Q_i
estimateError float64
measurementUncertainty float64

gain float64
estimate time.Duration
processUncertainty float64 // Q_i
estimateError float64
measurementUncertainty float64
K [kCount]time.Duration
Kmin time.Duration
kIndex int
disableMeasurementUncertaintyUpdates bool
}

Expand Down Expand Up @@ -69,23 +72,48 @@ func newKalman(opts ...kalmanOption) *kalman {
return k
}

func (k *kalman) updateEstimate(measurement time.Duration) time.Duration {
func (k *kalman) updateEstimate(measurement, lastReceiveDelta time.Duration) time.Duration {
z := measurement - k.estimate

zms := float64(z.Microseconds()) / 1000.0

if !k.disableMeasurementUncertaintyUpdates {
alpha := math.Pow((1 - chi), 30.0/(1000.0*5*float64(time.Millisecond)))
index := k.kIndex % kCount

switch {
case k.kIndex == 0:
k.Kmin = lastReceiveDelta
case lastReceiveDelta < k.Kmin:
k.Kmin = lastReceiveDelta
case k.kIndex >= kCount && k.K[index] == k.Kmin:
k.Kmin = lastReceiveDelta

for i := 0; i < k.kIndex && i < kCount; i++ {
if i != index && k.Kmin > k.K[i] {
k.Kmin = k.K[i]
}
}
default:
}

k.K[index] = lastReceiveDelta
kMinms := float64(k.Kmin.Microseconds()) / 1000.0
fmax := 1 / kMinms

alpha := math.Pow((1 - chi), 30.0/(1000.0*fmax))
root := math.Sqrt(k.measurementUncertainty)
root3 := 3 * root
if zms > root3 {
k.measurementUncertainty = math.Max(alpha*k.measurementUncertainty+(1-alpha)*root3*root3, 1)
} else {
k.measurementUncertainty = math.Max(alpha*k.measurementUncertainty+(1-alpha)*zms*zms, 1)
}
k.measurementUncertainty = math.Max(alpha*k.measurementUncertainty+(1-alpha)*zms*zms, 1)

k.kIndex++
}

estimateUncertainty := k.estimateError + k.processUncertainty
k.gain = estimateUncertainty / (estimateUncertainty + k.measurementUncertainty)
k.gain = math.Max(estimateUncertainty/(estimateUncertainty+k.measurementUncertainty), 0.01)

k.estimate += time.Duration(k.gain * zms * float64(time.Millisecond))

Expand Down
2 changes: 1 addition & 1 deletion pkg/gcc/kalman_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestKalman(t *testing.T) {
k := newKalman(append(tc.opts, setDisableMeasurementUncertaintyUpdates(true))...)
estimates := []time.Duration{}
for _, m := range tc.measurements {
estimates = append(estimates, k.updateEstimate(m))
estimates = append(estimates, k.updateEstimate(m, 5*time.Millisecond))
}
assert.Equal(t, tc.expected, estimates, "%v != %v", tc.expected, estimates)
})
Expand Down
32 changes: 28 additions & 4 deletions pkg/gcc/rate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,35 @@ type rateController struct {
}

type exponentialMovingAverage struct {
init bool
average float64
variance float64
stdDeviation float64
lastUpdate time.Time
}

func (a *exponentialMovingAverage) reset() {
a.init = false
a.average = 0
a.variance = 0
a.stdDeviation = 0
}

func (a *exponentialMovingAverage) update(value float64) {
if a.average == 0.0 {
if !a.init {
a.average = value
a.init = true
} else {
x := value - a.average
a.average += decreaseEMAAlpha * x
a.variance = (1 - decreaseEMAAlpha) * (a.variance + decreaseEMAAlpha*x*x)
a.variance = decreaseEMAAlpha*x*x + (1-decreaseEMAAlpha)*a.variance
a.stdDeviation = math.Sqrt(a.variance)
}
a.lastUpdate = time.Now()
}

func (a *exponentialMovingAverage) expired(now time.Time) bool {
return a.init && now.Sub(a.lastUpdate) > time.Minute
}

func newRateController(now now, initialTargetBitrate, minBitrate, maxBitrate int, dsw func(DelayStats)) *rateController {
Expand Down Expand Up @@ -104,7 +119,7 @@ func (c *rateController) onDelayStats(ds DelayStats) {
case stateHold:
// should never occur due to check above, but makes the linter happy
case stateIncrease:
c.target = clampInt(c.increase(now), c.minBitrate, c.maxBitrate)
c.target = clampInt(c.increase(now), c.target, c.maxBitrate)
next = DelayStats{
Measurement: c.delayStats.Measurement,
Estimate: c.delayStats.Estimate,
Expand Down Expand Up @@ -134,7 +149,16 @@ func (c *rateController) onDelayStats(ds DelayStats) {
}

func (c *rateController) increase(now time.Time) int {
if c.latestDecreaseRate.average > 0 && float64(c.latestReceivedRate) > c.latestDecreaseRate.average-3*c.latestDecreaseRate.stdDeviation &&
if c.latestDecreaseRate.init &&
float64(c.latestReceivedRate) > c.latestDecreaseRate.average+3*c.latestDecreaseRate.stdDeviation {
c.latestDecreaseRate.reset()
}

if c.latestDecreaseRate.expired(now) {
c.latestDecreaseRate.reset()
}

if c.latestDecreaseRate.init && float64(c.latestReceivedRate) > c.latestDecreaseRate.average-3*c.latestDecreaseRate.stdDeviation &&
float64(c.latestReceivedRate) < c.latestDecreaseRate.average+3*c.latestDecreaseRate.stdDeviation {
bitsPerFrame := float64(c.target) / 30.0
packetsPerFrame := math.Ceil(bitsPerFrame / (1200 * 8))
Expand Down
10 changes: 5 additions & 5 deletions pkg/gcc/slope_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
)

type estimator interface {
updateEstimate(measurement time.Duration) time.Duration
updateEstimate(measurement, lastReceiveDelta time.Duration) time.Duration
}

type estimatorFunc func(time.Duration) time.Duration
type estimatorFunc func(time.Duration, time.Duration) time.Duration

func (f estimatorFunc) updateEstimate(d time.Duration) time.Duration {
return f(d)
func (f estimatorFunc) updateEstimate(d, c time.Duration) time.Duration {
return f(d, c)
}

type slopeEstimator struct {
Expand Down Expand Up @@ -42,7 +42,7 @@ func (e *slopeEstimator) onArrivalGroup(ag arrivalGroup) {
e.group = ag
e.delayStatsWriter(DelayStats{
Measurement: measurement,
Estimate: e.updateEstimate(measurement),
Estimate: e.updateEstimate(measurement, delta),
Threshold: 0,
LastReceiveDelta: delta,
Usage: 0,
Expand Down
2 changes: 1 addition & 1 deletion pkg/gcc/slope_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/assert"
)

func identity(d time.Duration) time.Duration {
func identity(d, _ time.Duration) time.Duration {
return d
}

Expand Down