From 58496b04b2baec2df943d11ed1a794ba97f74efe Mon Sep 17 00:00:00 2001 From: Arun Kumar M Date: Thu, 29 Oct 2020 02:46:06 -0700 Subject: [PATCH] Fix Partitioned SessionWindow bug, where restoring from checkpoint triggers an exception. Cause of the Issue: The PartitionedSessionWindowPipe keeps multiple dictionary states. One of the dictionary does not have a [DataMember] attribute, Because the value type is a LinkedList which does not support serialization. On checkpoint and then Restore, this dictionary is re-created using other data members in UpdatePointers callback. During this, the scenario of empty LinkedList value is missed and not restored. When next data event appears for the partition, the partitionKey is indexed on the dictionary resulting in KeyNotFoundException Regression No, this existed for a long time. The customer hit the bug now because of workaround suggested (for another bug) to use 'timestamp by .. over' clause. ReproSteps: Added as a Testcase. Fix: Added code to restore key with empty LinkedList. Verified that this was the exact state just before checkpoint. Added a testcase that triggers this bug and with the fix the testcase passes. --- .../PartitionedSessionWindowPipe.cs | 13 ++- .../PartitionedStreamCheckpointTests.cs | 100 ++++++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 Sources/Test/SimpleTesting/Partitioned/PartitionedStreamCheckpointTests.cs diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/PartitionedSessionWindowPipe.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/PartitionedSessionWindowPipe.cs index 1fdf85f03..5c9206a28 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/PartitionedSessionWindowPipe.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/PartitionedSessionWindowPipe.cs @@ -249,15 +249,26 @@ public override int CurrentlyBufferedInputCount protected override void UpdatePointers() { + // This method restores the member 'this.orderedKeysDictionary' + // The dictionary is not serializable because of the LinkedList value type, + // and hence it does not have [DataMember] attribute int iter = FastDictionary.IteratorStart; var temp = new List>(); while (this.lastDataTimeDictionary.Iterate(ref iter)) { + var partitionKey = this.getPartitionKey(this.lastDataTimeDictionary.entries[iter].key); + if (this.stateDictionary.entries[iter].value.Any()) { temp.Add(Tuple.Create( this.lastDataTimeDictionary.entries[iter].key, - Math.Min(this.lastDataTimeDictionary.entries[iter].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[iter].value), this.getPartitionKey(this.lastDataTimeDictionary.entries[iter].key))); + Math.Min(this.lastDataTimeDictionary.entries[iter].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[iter].value), + partitionKey)); + } + else if (!this.orderedKeysDictionary.ContainsKey(partitionKey)) + { + // We still need to restore the empty list - as that was the case just before checkpoint + this.orderedKeysDictionary.Add(partitionKey, new LinkedList()); } } foreach (var item in temp.OrderBy(o => o.Item2)) diff --git a/Sources/Test/SimpleTesting/Partitioned/PartitionedStreamCheckpointTests.cs b/Sources/Test/SimpleTesting/Partitioned/PartitionedStreamCheckpointTests.cs new file mode 100644 index 000000000..74763b120 --- /dev/null +++ b/Sources/Test/SimpleTesting/Partitioned/PartitionedStreamCheckpointTests.cs @@ -0,0 +1,100 @@ +// ********************************************************************* +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License +// ********************************************************************* +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using Microsoft.StreamProcessing; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace SimpleTesting +{ + /* This testcase verifies fix for bug where PartitionedSessionWindowPipe is not restored (from checkpoint) properly causing an exception + * + * Cause of the Bug: + * The PartitionedSessionWindowPipe keeps multiple dictionary states. + * One of the dictionary does not have a [DataMember] attribute, Because the value type is a LinkedList which does not support serialization. + * On checkpoint and then Restore, this dictionary is re-created using other data members in UpdatePointers callback. + * During this, the scenario of empty LinkedList value is missed and not restored. + * When next data event appears for the partition, the partitionKey is indexed on the dictionary resulting in KeyNotFoundException + */ + [TestClass] + public class PartitionedStreamCheckpointTests : TestWithConfigSettingsWithoutMemoryLeakDetection + { + [TestMethod, TestCategory("Gated")] + public void CheckpointPartitionedSessionWindow() + { + Config.DataBatchSize = 1; + + var data = new PartitionedStreamEvent[] + { + PartitionedStreamEvent.CreatePoint(0, 5, 1.0), + PartitionedStreamEvent.CreatePunctuation(0, 8), + PartitionedStreamEvent.CreatePunctuation(0, 11), + PartitionedStreamEvent.CreatePunctuation(0, 14), + PartitionedStreamEvent.CreatePunctuation(0, 17), + PartitionedStreamEvent.CreatePunctuation(0, 21), + PartitionedStreamEvent.CreatePoint(0, 24, 1.0), + PartitionedStreamEvent.CreatePunctuation(0, 100), + }; + + var expected = new PartitionedStreamEvent[] + { + PartitionedStreamEvent.CreateStart(0, 5, 1.0), + PartitionedStreamEvent.CreatePunctuation(0, 8), + PartitionedStreamEvent.CreateEnd(0, 9, 5, 1.0), + PartitionedStreamEvent.CreatePunctuation(0, 11), + PartitionedStreamEvent.CreatePunctuation(0, 14), + PartitionedStreamEvent.CreatePunctuation(0, 17), + PartitionedStreamEvent.CreatePunctuation(0, 21), + PartitionedStreamEvent.CreateStart(0, 24, 1.0), + PartitionedStreamEvent.CreateEnd(0, 28, 24, 1.0), + PartitionedStreamEvent.CreatePunctuation(0, 100), + }; + + // This index represents the point when the checkpoint restore needs to happen to trigger the bug. + const int checkpointIndex = 6; + + var subject = new Subject>(); + var output = new List>(); + var process = CreateQueryContainerForPartitionedStream(subject, output); + + for (int i = 0; i < data.Length; i++) + { + if (i == checkpointIndex) + { + using (var ms = new MemoryStream()) + { + process.Checkpoint(ms); + ms.Seek(0, SeekOrigin.Begin); + + subject = new Subject>(); + process = CreateQueryContainerForPartitionedStream(subject, output, ms); + } + } + + subject.OnNext(data[i]); + } + + Assert.IsTrue(expected.SequenceEqual(output)); + } + + private Process CreateQueryContainerForPartitionedStream( + Subject> subject, + List> output, + Stream stream = null) + { + var qc = new QueryContainer(); + var input = qc.RegisterInput(subject); + var streamableOutput = input.SessionTimeoutWindow(4, 5).Sum(o => o); + var egress = qc.RegisterOutput(streamableOutput).ForEachAsync(o => output.Add(o)); + var process = qc.Restore(stream); + + return process; + } + } +}