Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ protected SortedMultisetAggregateBase(IComparerExpression<T> comparer, QueryCont
Expression<Func<Func<SortedDictionary<T, long>>, SortedMultiSet<T>>> template
= (g) => new SortedMultiSet<T>(g);
var replaced = template.ReplaceParametersInBody(generator);
initialState = Expression.Lambda<Func<SortedMultiSet<T>>>(replaced);
this.initialState = Expression.Lambda<Func<SortedMultiSet<T>>>(replaced);
}

private readonly Expression<Func<SortedMultiSet<T>>> initialState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,37 @@
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq.Expressions;
using Microsoft.StreamProcessing.Internal;

namespace Microsoft.StreamProcessing.Aggregates
{
internal sealed class TopKAggregate<T> : SortedMultisetAggregateBase<T, List<RankedEvent<T>>>
internal sealed class TopKAggregate<T> : IAggregate<T, ITopKState<T>, List<RankedEvent<T>>>
{
private readonly Comparison<T> compiledRankComparer;
private readonly int k;

public TopKAggregate(int k, QueryContainer container) : this(k, ComparerExpression<T>.Default, container) { }
public TopKAggregate(int k, IComparerExpression<T> rankComparer, QueryContainer container, long hoppingWindowSize)
: this(k, rankComparer, ComparerExpression<T>.Default, container, hoppingWindowSize) { }

public TopKAggregate(int k, IComparerExpression<T> rankComparer, QueryContainer container)
: this(k, rankComparer, ComparerExpression<T>.Default, container) { }

public TopKAggregate(int k, IComparerExpression<T> rankComparer, IComparerExpression<T> overallComparer, QueryContainer container)
: base(ThenOrderBy(Reverse(rankComparer), overallComparer), container)
public TopKAggregate(int k, IComparerExpression<T> rankComparer, IComparerExpression<T> overallComparer,
QueryContainer container, long hoppingWindowSize)
{
Contract.Requires(rankComparer != null);
Contract.Requires(overallComparer != null);
Contract.Requires(k > 0);
this.compiledRankComparer = Reverse(rankComparer).GetCompareExpr().Compile();
this.k = k;

Expression<Func<Func<SortedDictionary<T, long>>, ITopKState<T>>> template;
if (hoppingWindowSize > 0 && hoppingWindowSize < 1000000)
template = (g) => new HoppingTopKState<T>(k, compiledRankComparer, (int)hoppingWindowSize, g);
else
template = (g) => new SimpleTopKState<T>(g);

var combinedComparer = ThenOrderBy(Reverse(rankComparer), overallComparer);
var generator = combinedComparer.CreateSortedDictionaryGenerator<T, long>(container);
var replaced = template.ReplaceParametersInBody(generator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document what is going on here...

this.initialState = Expression.Lambda<Func<ITopKState<T>>>(replaced);
}

private static IComparerExpression<T> Reverse(IComparerExpression<T> comparer)
Expand All @@ -53,10 +63,11 @@ private static IComparerExpression<T> ThenOrderBy(IComparerExpression<T> compare
return new ComparerExpression<T>(newExpression);
}

public override Expression<Func<SortedMultiSet<T>, List<RankedEvent<T>>>> ComputeResult() => set => GetTopK(set);
public Expression<Func<ITopKState<T>, List<RankedEvent<T>>>> ComputeResult() => set => GetTopK(set);

private List<RankedEvent<T>> GetTopK(SortedMultiSet<T> set)
private List<RankedEvent<T>> GetTopK(ITopKState<T> state)
{
var set = state.GetSortedValues();
int count = (int)Math.Min(this.k, set.TotalCount);
var result = new List<RankedEvent<T>>(count);
int nextRank = 1;
Expand All @@ -82,5 +93,20 @@ private List<RankedEvent<T>> GetTopK(SortedMultiSet<T> set)

return result;
}

private readonly Expression<Func<ITopKState<T>>> initialState;
public Expression<Func<ITopKState<T>>> InitialState() => initialState;

private static readonly Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> acc
= (state, timestamp, input) => state.Add(input, timestamp);
public Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> Accumulate() => acc;

private static readonly Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> dec
= (state, timestamp, input) => state.Remove(input, timestamp);
public Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> Deaccumulate() => dec;

private static readonly Expression<Func<ITopKState<T>, ITopKState<T>, ITopKState<T>>> diff
= (leftState, rightState) => leftState.RemoveAll(rightState);
public Expression<Func<ITopKState<T>, ITopKState<T>, ITopKState<T>>> Difference() => diff;
}
}
230 changes: 230 additions & 0 deletions Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
using System;
using System.Collections.Generic;
using System.Linq;

namespace Microsoft.StreamProcessing.Aggregates
{
/// <summary>
/// State used by TopK Aggregate
/// </summary>
/// <typeparam name="T"></typeparam>
public interface ITopKState<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ITopKState [](start = 21, length = 10)

This will wrap every aggregation operation a virtual function call. I wonder if it would be better to consolidate them with a single class and differentiate behavior at runtime like the Min/Max state, especially if we optimize the simple top k state as well so it will also have a comparer, etc.

{
/// <summary>
/// Add a single entry
/// </summary>
/// <param name="input"></param>
/// <param name="timestamp"></param>
ITopKState<T> Add(T input, long timestamp);

/// <summary>
/// Removes the specified entry
/// </summary>
/// <param name="input"></param>
/// <param name="timestamp"></param>
ITopKState<T> Remove(T input, long timestamp);

/// <summary>
/// Removes entries from other
/// </summary>
/// <param name="other"></param>
ITopKState<T> RemoveAll(ITopKState<T> other);

/// <summary>
/// Gets the values as sorted set
/// </summary>
/// <returns></returns>
SortedMultiSet<T> GetSortedValues();

/// <summary>
/// Returns total number of values in the set
/// </summary>
long Count { get; }
}

internal class SimpleTopKState<T> : ITopKState<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimpleTopKState [](start = 19, length = 15)

Shouldn't this also be capped t the top K ranks rather than storing everything?

{
private SortedMultiSet<T> values;

public SimpleTopKState(Func<SortedDictionary<T, long>> generator)
{
this.values = new SortedMultiSet<T>(generator);
}

public long Count => this.values.TotalCount;

public ITopKState<T> Add(T input, long timestamp)
{
this.values.Add(input);
return this;
}

public SortedMultiSet<T> GetSortedValues() => this.values;

public ITopKState<T> Remove(T input, long timestamp)
{
this.values.Remove(input);
return this;
}

public ITopKState<T> RemoveAll(ITopKState<T> other)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use expression bodies for small one-liners

this.values.RemoveAll(other.GetSortedValues());
return this;
}
}

internal class HoppingTopKState<T> : ITopKState<T>
{
public long currentTimestamp;

public CircularBuffer<ValueTuple<long, SortedMultiSet<T>>> previousValues;
public SortedMultiSet<T> currentValues;

public int k;

public Comparison<T> rankComparer;
private Func<SortedDictionary<T, long>> generator;
private ItemAndCount<T> minValue; // The minimum threshold value in TopK

public HoppingTopKState(int k, Comparison<T> rankComparer, int hoppingWindowSize, Func<SortedDictionary<T, long>> generator)
{
this.k = k;
this.rankComparer = rankComparer;
this.currentValues = new SortedMultiSet<T>(generator);
this.previousValues = new CircularBuffer<ValueTuple<long, SortedMultiSet<T>>>(hoppingWindowSize);
this.generator = generator;
}

public ITopKState<T> Add(T input, long timestamp)
{
// Verify that input is added in non-decreasing order
if (timestamp < this.currentTimestamp)
{
throw new ArgumentException("Invalid timestamp");
}

// First entry in new hop window, just add the value
if (timestamp > this.currentTimestamp)
{
MergeCurrentToPrevious();
this.currentTimestamp = timestamp;
this.currentValues.Add(input);
this.minValue = new ItemAndCount<T>(input, 1);
return this;
}

// these are subsequent entries
int compare = rankComparer(input, this.minValue.Item);

if (this.currentValues.TotalCount < this.k) // if we have not reached k yet, add it
{
if (compare > 0)
this.minValue = new ItemAndCount<T>(input, 1);
else if (compare == 0)
this.minValue.Count++;

this.currentValues.Add(input);
return this;
}
else if (compare > 0) // We have reached k and new input is smaller than minimum
{
return this;
}
else if (compare == 0) // We have reached k and new input is equal to the minimum
{
this.currentValues.Add(input); // add to get ties
this.minValue.Count++;
return this;
}
else // The new item is less than minValue, so we need to remove some entries to make place for the new entry
{
this.currentValues.Add(input);
var toRemove = this.currentValues.TotalCount - this.k;
if (toRemove >= minValue.Count)
{
this.currentValues.RemoveAll(this.minValue.Item);
this.minValue = this.currentValues.GetMinItem();
}
return this;
}
}

public ITopKState<T> Remove(T input, long timestamp)
{
throw new NotImplementedException("Cannot remove single elements from this state");
}

public ITopKState<T> RemoveAll(ITopKState<T> other)
{
if (other.Count != 0)
{
if (other is HoppingTopKState<T> otherTopK)
{
if (otherTopK.currentTimestamp > this.currentTimestamp)
{
throw new ArgumentException("Cannot remove entries with current or future timestamp");
}
else if (otherTopK.currentTimestamp == this.currentTimestamp)
{
if (this.currentValues.TotalCount != otherTopK.currentValues.TotalCount)
throw new InvalidOperationException("Invalid removal");

this.currentValues.Clear();
this.previousValues.Clear();
}
else
{
while (this.previousValues.Count > 0)
{
var first = this.previousValues.PeekFirst();

if (first.Item1 > otherTopK.currentTimestamp)
{
break;
}

if (first.Item1 == otherTopK.currentTimestamp &&
first.Item2.TotalCount != otherTopK.currentValues.TotalCount)
throw new InvalidOperationException("Invalid removal");

this.previousValues.Dequeue();
}
}
}
else
{
throw new InvalidOperationException("Cannot remove non-HoppingTopKState from HoppingTopKState");
}
}
return this;
}

// This function merges the current values to previous and is expensive
// Currently it is only called by ComputeResult
public SortedMultiSet<T> GetSortedValues()
{
var sortedMultiset = new SortedMultiSet<T>(generator);

foreach (var dictItem in this.previousValues.Iterate())
{
sortedMultiset.AddAll(dictItem.Item2);
}
sortedMultiset.AddAll(this.currentValues);

return sortedMultiset;
}

private void MergeCurrentToPrevious()
{
if (!this.currentValues.IsEmpty)
{
var newEntry = ValueTuple.Create(this.currentTimestamp, this.currentValues);
this.previousValues.Enqueue(ref newEntry);
this.currentValues = new SortedMultiSet<T>(generator);
}
}

public long Count => this.currentValues.TotalCount + this.previousValues.Iterate().Sum(e => e.Item2.TotalCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ public T Dequeue()
[EditorBrowsable(EditorBrowsableState.Never)]
public bool IsEmpty() => this.head == this.tail;

/// <summary>
/// Removes alll elements from the list - do not use directly.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public void Clear() => this.head = this.tail = 0;

/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
Expand Down Expand Up @@ -260,6 +267,17 @@ public IEnumerator<T> GetEnumerator()
}
}

/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public void Clear()
{
this.tail = this.head = this.buffers.First;
this.head.Value.Clear();
this.Count = 0;
}

System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
}
}
Loading