diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/SessionWindowPipe.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/SessionWindowPipe.cs index 57d64ee8..7b6bcf8b 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/SessionWindowPipe.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/SessionWindowPipe.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; using System.Runtime.Serialization; using Microsoft.StreamProcessing.Internal; using Microsoft.StreamProcessing.Internal.Collections; @@ -25,7 +26,7 @@ internal sealed class SessionWindowPipe : UnaryPipe output; - private LinkedList orderedKeys = new LinkedList(); + private Queue orderedKeys = new Queue(); [DataMember] private FastDictionary2 windowEndTimeDictionary = new FastDictionary2(); [DataMember] @@ -68,13 +69,21 @@ private void ReachTime(int pIndex, long timestamp) } } - var current = this.orderedKeys.First; - while (current != null) + while (this.orderedKeys.Count > 0) { - this.lastDataTimeDictionary.Lookup(current.Value, out int cIndex); + var current = this.orderedKeys.Peek(); + this.lastDataTimeDictionary.Lookup(current.Key, out int cIndex); var threshold = this.lastDataTimeDictionary.entries[cIndex].value == long.MinValue ? this.windowEndTimeDictionary.entries[cIndex].value - : Math.Min(this.lastDataTimeDictionary.entries[cIndex].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[cIndex].value); + : CalculateThreshold(this.lastDataTimeDictionary.entries[cIndex].value, cIndex); + + // Skip stale threshold entries (can happen when threshold is updated and new entry is enqueued) + if (current.Threshold != threshold) + { + this.orderedKeys.Dequeue(); + continue; + } + if (timestamp >= threshold) { var queue = this.stateDictionary.entries[cIndex].value; @@ -95,13 +104,12 @@ private void ReachTime(int pIndex, long timestamp) this.windowEndTimeDictionary.entries[cIndex].value = StreamEvent.MaxSyncTime; else { - this.windowEndTimeDictionary.Remove(current.Value); - this.lastDataTimeDictionary.Remove(current.Value); - this.stateDictionary.Remove(current.Value); + this.windowEndTimeDictionary.Remove(current.Key); + this.lastDataTimeDictionary.Remove(current.Key); + this.stateDictionary.Remove(current.Key); } - this.orderedKeys.RemoveFirst(); - current = this.orderedKeys.First; + this.orderedKeys.Dequeue(); } else break; } @@ -142,17 +150,18 @@ public override unsafe void OnNext(StreamMessage batch) if (!this.lastDataTimeDictionary.Lookup(batch.key.col[i], out keyIndex)) keyIndex = AllocatePartition(batch.key.col[i]); + var newThreshold = CalculateThreshold(vsync[i], keyIndex); + if (!this.stateDictionary.entries[keyIndex].value.Any()) - this.orderedKeys.AddLast(new LinkedListNode(batch.key.col[i])); + { + this.orderedKeys.Enqueue(new SessionThreshold { Key = batch.key.col[i], Threshold = newThreshold }); + } else { - var oldThreshold = Math.Min(this.lastDataTimeDictionary.entries[keyIndex].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[keyIndex].value); - var newThreshold = Math.Min(vsync[i] + this.sessionTimeout, this.windowEndTimeDictionary.entries[keyIndex].value); + var oldThreshold = CalculateThreshold(this.lastDataTimeDictionary.entries[keyIndex].value, keyIndex); if (newThreshold > oldThreshold) { - var node = this.orderedKeys.Find(batch.key.col[i]); - this.orderedKeys.Remove(node); - this.orderedKeys.AddLast(node); + this.orderedKeys.Enqueue(new SessionThreshold { Key = batch.key.col[i], Threshold = newThreshold }); } } @@ -227,10 +236,10 @@ protected override void UpdatePointers() { temp.Add(Tuple.Create( this.windowEndTimeDictionary.entries[iter].key, - Math.Min(this.lastDataTimeDictionary.entries[iter].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[iter].value))); + CalculateThreshold(this.lastDataTimeDictionary.entries[iter].value, iter))); } } - foreach (var item in temp.OrderBy(o => o.Item2)) this.orderedKeys.AddLast(new LinkedListNode(item.Item1)); + foreach (var item in temp.OrderBy(o => o.Item2)) this.orderedKeys.Enqueue(new SessionThreshold { Key = item.Item1, Threshold = item.Item2 }); base.UpdatePointers(); } @@ -242,6 +251,10 @@ protected override void DisposeState() this.stateDictionary.Clear(); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private long CalculateThreshold(long lastDataTime, int keyIndex) => + Math.Min(lastDataTime + this.sessionTimeout, this.windowEndTimeDictionary.entries[keyIndex].value); + [DataContract] private struct ActiveEvent { @@ -256,5 +269,15 @@ private struct ActiveEvent public override string ToString() => "Key='" + this.Key + "', Payload='" + this.Payload; } + + [DataContract] + private struct SessionThreshold + { + [DataMember] + public long Threshold; + + [DataMember] + public TKey Key; + } } } \ No newline at end of file