diff --git a/pkg/gcc/adaptive_threshold.go b/pkg/gcc/adaptive_threshold.go index 61d34d8f..7f25eb00 100644 --- a/pkg/gcc/adaptive_threshold.go +++ b/pkg/gcc/adaptive_threshold.go @@ -8,10 +8,6 @@ import ( "time" ) -const ( - maxDeltas = 60 -) - type adaptiveThresholdOption func(*adaptiveThreshold) func setInitialThreshold(t time.Duration) adaptiveThresholdOption { @@ -65,16 +61,16 @@ func (a *adaptiveThreshold) compare(estimate, _ time.Duration) (usage, time.Dura if a.numDeltas < 2 { return usageNormal, estimate, a.max } - t := time.Duration(minInt(a.numDeltas, maxDeltas)) * estimate + use := usageNormal - if t > a.thresh { + if estimate > a.thresh { use = usageOver - } else if t < -a.thresh { + } else if estimate < -a.thresh { use = usageUnder } thresh := a.thresh - a.update(t) - return use, t, thresh + a.update(estimate) + return use, estimate, thresh } func (a *adaptiveThreshold) update(estimate time.Duration) { diff --git a/pkg/gcc/adaptive_threshold_test.go b/pkg/gcc/adaptive_threshold_test.go index 968ae0a5..084ab141 100644 --- a/pkg/gcc/adaptive_threshold_test.go +++ b/pkg/gcc/adaptive_threshold_test.go @@ -104,7 +104,7 @@ func TestAdaptiveThreshold(t *testing.T) { }, expected: []usage{usageNormal, usageOver, usageNormal}, options: []adaptiveThresholdOption{ - setInitialThreshold(40 * time.Millisecond), + setInitialThreshold(20 * time.Millisecond), }, }, { diff --git a/pkg/gcc/kalman.go b/pkg/gcc/kalman.go index 1eb76a88..f385e3c4 100644 --- a/pkg/gcc/kalman.go +++ b/pkg/gcc/kalman.go @@ -9,18 +9,21 @@ import ( ) const ( - chi = 0.001 + chi = 0.01 + kCount = 25 ) type kalmanOption func(*kalman) type kalman struct { - gain float64 - estimate time.Duration - processUncertainty float64 // Q_i - estimateError float64 - measurementUncertainty float64 - + gain float64 + estimate time.Duration + processUncertainty float64 // Q_i + estimateError float64 + measurementUncertainty float64 + K [kCount]time.Duration + Kmin time.Duration + kIndex int disableMeasurementUncertaintyUpdates bool } @@ -69,23 +72,48 @@ func newKalman(opts ...kalmanOption) *kalman { return k } -func (k *kalman) updateEstimate(measurement time.Duration) time.Duration { +func (k *kalman) updateEstimate(measurement, lastReceiveDelta time.Duration) time.Duration { z := measurement - k.estimate zms := float64(z.Microseconds()) / 1000.0 if !k.disableMeasurementUncertaintyUpdates { - alpha := math.Pow((1 - chi), 30.0/(1000.0*5*float64(time.Millisecond))) + index := k.kIndex % kCount + + switch { + case k.kIndex == 0: + k.Kmin = lastReceiveDelta + case lastReceiveDelta < k.Kmin: + k.Kmin = lastReceiveDelta + case k.kIndex >= kCount && k.K[index] == k.Kmin: + k.Kmin = lastReceiveDelta + + for i := 0; i < k.kIndex && i < kCount; i++ { + if i != index && k.Kmin > k.K[i] { + k.Kmin = k.K[i] + } + } + default: + } + + k.K[index] = lastReceiveDelta + kMinms := float64(k.Kmin.Microseconds()) / 1000.0 + fmax := 1 / kMinms + + alpha := math.Pow((1 - chi), 30.0/(1000.0*fmax)) root := math.Sqrt(k.measurementUncertainty) root3 := 3 * root if zms > root3 { k.measurementUncertainty = math.Max(alpha*k.measurementUncertainty+(1-alpha)*root3*root3, 1) + } else { + k.measurementUncertainty = math.Max(alpha*k.measurementUncertainty+(1-alpha)*zms*zms, 1) } - k.measurementUncertainty = math.Max(alpha*k.measurementUncertainty+(1-alpha)*zms*zms, 1) + + k.kIndex++ } estimateUncertainty := k.estimateError + k.processUncertainty - k.gain = estimateUncertainty / (estimateUncertainty + k.measurementUncertainty) + k.gain = math.Max(estimateUncertainty/(estimateUncertainty+k.measurementUncertainty), 0.01) k.estimate += time.Duration(k.gain * zms * float64(time.Millisecond)) diff --git a/pkg/gcc/kalman_test.go b/pkg/gcc/kalman_test.go index f3b90056..312408ca 100644 --- a/pkg/gcc/kalman_test.go +++ b/pkg/gcc/kalman_test.go @@ -63,7 +63,7 @@ func TestKalman(t *testing.T) { k := newKalman(append(tc.opts, setDisableMeasurementUncertaintyUpdates(true))...) estimates := []time.Duration{} for _, m := range tc.measurements { - estimates = append(estimates, k.updateEstimate(m)) + estimates = append(estimates, k.updateEstimate(m, 5*time.Millisecond)) } assert.Equal(t, tc.expected, estimates, "%v != %v", tc.expected, estimates) }) diff --git a/pkg/gcc/rate_controller.go b/pkg/gcc/rate_controller.go index c19a617b..d19ba3a8 100644 --- a/pkg/gcc/rate_controller.go +++ b/pkg/gcc/rate_controller.go @@ -34,20 +34,35 @@ type rateController struct { } type exponentialMovingAverage struct { + init bool average float64 variance float64 stdDeviation float64 + lastUpdate time.Time +} + +func (a *exponentialMovingAverage) reset() { + a.init = false + a.average = 0 + a.variance = 0 + a.stdDeviation = 0 } func (a *exponentialMovingAverage) update(value float64) { - if a.average == 0.0 { + if !a.init { a.average = value + a.init = true } else { x := value - a.average a.average += decreaseEMAAlpha * x - a.variance = (1 - decreaseEMAAlpha) * (a.variance + decreaseEMAAlpha*x*x) + a.variance = decreaseEMAAlpha*x*x + (1-decreaseEMAAlpha)*a.variance a.stdDeviation = math.Sqrt(a.variance) } + a.lastUpdate = time.Now() +} + +func (a *exponentialMovingAverage) expired(now time.Time) bool { + return a.init && now.Sub(a.lastUpdate) > time.Minute } func newRateController(now now, initialTargetBitrate, minBitrate, maxBitrate int, dsw func(DelayStats)) *rateController { @@ -104,7 +119,7 @@ func (c *rateController) onDelayStats(ds DelayStats) { case stateHold: // should never occur due to check above, but makes the linter happy case stateIncrease: - c.target = clampInt(c.increase(now), c.minBitrate, c.maxBitrate) + c.target = clampInt(c.increase(now), c.target, c.maxBitrate) next = DelayStats{ Measurement: c.delayStats.Measurement, Estimate: c.delayStats.Estimate, @@ -134,7 +149,16 @@ func (c *rateController) onDelayStats(ds DelayStats) { } func (c *rateController) increase(now time.Time) int { - if c.latestDecreaseRate.average > 0 && float64(c.latestReceivedRate) > c.latestDecreaseRate.average-3*c.latestDecreaseRate.stdDeviation && + if c.latestDecreaseRate.init && + float64(c.latestReceivedRate) > c.latestDecreaseRate.average+3*c.latestDecreaseRate.stdDeviation { + c.latestDecreaseRate.reset() + } + + if c.latestDecreaseRate.expired(now) { + c.latestDecreaseRate.reset() + } + + if c.latestDecreaseRate.init && float64(c.latestReceivedRate) > c.latestDecreaseRate.average-3*c.latestDecreaseRate.stdDeviation && float64(c.latestReceivedRate) < c.latestDecreaseRate.average+3*c.latestDecreaseRate.stdDeviation { bitsPerFrame := float64(c.target) / 30.0 packetsPerFrame := math.Ceil(bitsPerFrame / (1200 * 8)) diff --git a/pkg/gcc/slope_estimator.go b/pkg/gcc/slope_estimator.go index 2b57f4da..47ee0eb2 100644 --- a/pkg/gcc/slope_estimator.go +++ b/pkg/gcc/slope_estimator.go @@ -8,13 +8,13 @@ import ( ) type estimator interface { - updateEstimate(measurement time.Duration) time.Duration + updateEstimate(measurement, lastReceiveDelta time.Duration) time.Duration } -type estimatorFunc func(time.Duration) time.Duration +type estimatorFunc func(time.Duration, time.Duration) time.Duration -func (f estimatorFunc) updateEstimate(d time.Duration) time.Duration { - return f(d) +func (f estimatorFunc) updateEstimate(d, c time.Duration) time.Duration { + return f(d, c) } type slopeEstimator struct { @@ -42,7 +42,7 @@ func (e *slopeEstimator) onArrivalGroup(ag arrivalGroup) { e.group = ag e.delayStatsWriter(DelayStats{ Measurement: measurement, - Estimate: e.updateEstimate(measurement), + Estimate: e.updateEstimate(measurement, delta), Threshold: 0, LastReceiveDelta: delta, Usage: 0, diff --git a/pkg/gcc/slope_estimator_test.go b/pkg/gcc/slope_estimator_test.go index ba7bfc59..564d5e87 100644 --- a/pkg/gcc/slope_estimator_test.go +++ b/pkg/gcc/slope_estimator_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/assert" ) -func identity(d time.Duration) time.Duration { +func identity(d, _ time.Duration) time.Duration { return d }