Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using Microsoft.StreamProcessing.Internal;
using Microsoft.StreamProcessing.Internal.Collections;
Expand All @@ -25,7 +26,7 @@ internal sealed class SessionWindowPipe<TKey, TPayload> : UnaryPipe<TKey, TPaylo
[DataMember]
private StreamMessage<TKey, TPayload> output;

private LinkedList<TKey> orderedKeys = new LinkedList<TKey>();
private Queue<SessionThreshold> orderedKeys = new Queue<SessionThreshold>();
[DataMember]
private FastDictionary2<TKey, long> windowEndTimeDictionary = new FastDictionary2<TKey, long>();
[DataMember]
Expand Down Expand Up @@ -68,13 +69,21 @@ private void ReachTime(int pIndex, long timestamp)
}
}

var current = this.orderedKeys.First;
while (current != null)
while (this.orderedKeys.Count > 0)
{
this.lastDataTimeDictionary.Lookup(current.Value, out int cIndex);
var current = this.orderedKeys.Peek();
this.lastDataTimeDictionary.Lookup(current.Key, out int cIndex);
var threshold = this.lastDataTimeDictionary.entries[cIndex].value == long.MinValue
? this.windowEndTimeDictionary.entries[cIndex].value
: Math.Min(this.lastDataTimeDictionary.entries[cIndex].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[cIndex].value);
: CalculateThreshold(this.lastDataTimeDictionary.entries[cIndex].value, cIndex);

// Skip stale threshold entries (can happen when threshold is updated and new entry is enqueued)
if (current.Threshold != threshold)
{
this.orderedKeys.Dequeue();
continue;
}

if (timestamp >= threshold)
{
var queue = this.stateDictionary.entries[cIndex].value;
Expand All @@ -95,13 +104,12 @@ private void ReachTime(int pIndex, long timestamp)
this.windowEndTimeDictionary.entries[cIndex].value = StreamEvent.MaxSyncTime;
else
{
this.windowEndTimeDictionary.Remove(current.Value);
this.lastDataTimeDictionary.Remove(current.Value);
this.stateDictionary.Remove(current.Value);
this.windowEndTimeDictionary.Remove(current.Key);
this.lastDataTimeDictionary.Remove(current.Key);
this.stateDictionary.Remove(current.Key);
}

this.orderedKeys.RemoveFirst();
current = this.orderedKeys.First;
this.orderedKeys.Dequeue();
}
else break;
}
Expand Down Expand Up @@ -142,17 +150,18 @@ public override unsafe void OnNext(StreamMessage<TKey, TPayload> batch)
if (!this.lastDataTimeDictionary.Lookup(batch.key.col[i], out keyIndex))
keyIndex = AllocatePartition(batch.key.col[i]);

var newThreshold = CalculateThreshold(vsync[i], keyIndex);

if (!this.stateDictionary.entries[keyIndex].value.Any())
this.orderedKeys.AddLast(new LinkedListNode<TKey>(batch.key.col[i]));
{
this.orderedKeys.Enqueue(new SessionThreshold { Key = batch.key.col[i], Threshold = newThreshold });
}
else
{
var oldThreshold = Math.Min(this.lastDataTimeDictionary.entries[keyIndex].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[keyIndex].value);
var newThreshold = Math.Min(vsync[i] + this.sessionTimeout, this.windowEndTimeDictionary.entries[keyIndex].value);
var oldThreshold = CalculateThreshold(this.lastDataTimeDictionary.entries[keyIndex].value, keyIndex);
if (newThreshold > oldThreshold)
{
var node = this.orderedKeys.Find(batch.key.col[i]);
this.orderedKeys.Remove(node);
this.orderedKeys.AddLast(node);
this.orderedKeys.Enqueue(new SessionThreshold { Key = batch.key.col[i], Threshold = newThreshold });
}
}

Expand Down Expand Up @@ -227,10 +236,10 @@ protected override void UpdatePointers()
{
temp.Add(Tuple.Create(
this.windowEndTimeDictionary.entries[iter].key,
Math.Min(this.lastDataTimeDictionary.entries[iter].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[iter].value)));
CalculateThreshold(this.lastDataTimeDictionary.entries[iter].value, iter)));
}
}
foreach (var item in temp.OrderBy(o => o.Item2)) this.orderedKeys.AddLast(new LinkedListNode<TKey>(item.Item1));
foreach (var item in temp.OrderBy(o => o.Item2)) this.orderedKeys.Enqueue(new SessionThreshold { Key = item.Item1, Threshold = item.Item2 });
base.UpdatePointers();
}

Expand All @@ -242,6 +251,10 @@ protected override void DisposeState()
this.stateDictionary.Clear();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private long CalculateThreshold(long lastDataTime, int keyIndex) =>
Math.Min(lastDataTime + this.sessionTimeout, this.windowEndTimeDictionary.entries[keyIndex].value);

[DataContract]
private struct ActiveEvent
{
Expand All @@ -256,5 +269,15 @@ private struct ActiveEvent

public override string ToString() => "Key='" + this.Key + "', Payload='" + this.Payload;
}

[DataContract]
private struct SessionThreshold
{
[DataMember]
public long Threshold;

[DataMember]
public TKey Key;
}
}
}