From 3239916be9039c9fb56aa341cf0d5fe2353c6eee Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 16 Sep 2025 22:16:00 -0500 Subject: [PATCH 1/5] CNDB-14802: Improve async byte allocation estimates in SAI SegmentBuilder --- .../index/sai/disk/v1/SSTableIndexWriter.java | 10 +- .../index/sai/disk/v1/SegmentBuilder.java | 109 +++++++++++------- .../index/sai/disk/v1/SegmentFlushTest.java | 2 + 3 files changed, 69 insertions(+), 52 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index 7989ecb9086c..332ba682ea86 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -274,8 +274,7 @@ else if (shouldFlush(sstableRowId)) if (term.remaining() == 0 && TypeUtil.skipsEmptyValue(indexContext.getValidator())) return false; - long allocated = currentBuilder.analyzeAndAdd(term, type, key, sstableRowId); - limiter.increment(allocated); + currentBuilder.analyzeAndAdd(term, type, key, sstableRowId); return true; } @@ -301,13 +300,6 @@ private boolean shouldFlush(long sstableRowId) private void flushSegment() throws IOException { currentBuilder.awaitAsyncAdditions(); - if (currentBuilder.supportsAsyncAdd() - && currentBuilder.totalBytesAllocatedConcurrent.sum() > 1.1 * currentBuilder.totalBytesAllocated()) - { - logger.warn("Concurrent memory usage is higher than estimated: {} vs {}", - currentBuilder.totalBytesAllocatedConcurrent.sum(), currentBuilder.totalBytesAllocated()); - } - // throw exceptions that occurred during async addInternal() var ae = currentBuilder.getAsyncThrowable(); if (ae != null) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java index 320f59631c84..0a8ab719a5a2 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java @@ -60,7 +60,6 @@ import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.TypeUtil; -import org.apache.cassandra.metrics.QuickSlidingWindowReservoir; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse; import org.apache.lucene.util.BytesRef; @@ -102,10 +101,8 @@ public abstract class SegmentBuilder final AbstractAnalyzer analyzer; // track memory usage for this segment so we can flush when it gets too big - private final NamedMemoryLimiter limiter; + protected final NamedMemoryLimiter limiter; long totalBytesAllocated; - // when we're adding terms asynchronously, totalBytesAllocated will be an approximation and this tracks the exact size - final LongAdder totalBytesAllocatedConcurrent = new LongAdder(); private final long lastValidSegmentRowID; @@ -127,7 +124,6 @@ public abstract class SegmentBuilder protected ByteBuffer maxTerm; protected final AtomicInteger updatesInFlight = new AtomicInteger(0); - protected final QuickSlidingWindowReservoir termSizeReservoir = new QuickSlidingWindowReservoir(100); protected AtomicReference asyncThrowable = new AtomicReference<>(); @@ -151,7 +147,7 @@ public static class KDTreeSegmentBuilder extends SegmentBuilder this.buffer = new byte[typeSize]; this.indexWriterConfig = indexWriterConfig; totalBytesAllocated = kdTreeRamBuffer.ramBytesUsed(); - totalBytesAllocatedConcurrent.add(totalBytesAllocated);} + } public boolean isEmpty() { @@ -200,7 +196,6 @@ public static class RAMStringSegmentBuilder extends SegmentBuilder this.byteComparableVersion = components.byteComparableVersionFor(IndexComponentType.TERMS_DATA); ramIndexer = new RAMStringIndexer(writeFrequencies()); totalBytesAllocated = ramIndexer.estimatedBytesUsed(); - totalBytesAllocatedConcurrent.add(totalBytesAllocated); } private boolean writeFrequencies() @@ -247,6 +242,8 @@ public boolean requiresFlush() public static class VectorOffHeapSegmentBuilder extends SegmentBuilder { private final CompactionGraph graphIndex; + protected final AtomicLong maxBytesAddedObserved = new AtomicLong(0); + protected final LongAdder reconciliationBytes = new LongAdder(); public VectorOffHeapSegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, @@ -266,7 +263,6 @@ public VectorOffHeapSegmentBuilder(IndexComponents.ForWrite components, throw new UncheckedIOException(e); } totalBytesAllocated = graphIndex.ramBytesUsed(); - totalBytesAllocatedConcurrent.add(totalBytesAllocated); } @Override @@ -282,7 +278,7 @@ protected long addInternal(List terms, int segmentRowId) } @Override - protected long addInternalAsync(List terms, int segmentRowId) + protected void addInternalAsync(List terms, int segmentRowId) { assert terms.size() == 1; @@ -293,21 +289,31 @@ protected long addInternalAsync(List terms, int segmentRowId) try { result = graphIndex.maybeAddVector(terms.get(0), segmentRowId); + observeAllocatedBytes(result.bytesUsed); } catch (IOException e) { throw new UncheckedIOException(e); } if (result.vector == null) - return result.bytesUsed; + return; updatesInFlight.incrementAndGet(); + + // Increment by double max bytes before dispatching the task to avoid a subsequent insertion from + // exceeding the limit. Upon completion, the limiter will be corrected with the actual size. + long estimatedBytes = maxBytesAddedObserved.get() * 2; + long reconciledBytes = reconciliationBytes.sumThenReset(); + observeAllocatedBytes(estimatedBytes + reconciledBytes); + compactionExecutor.submit(() -> { try { - long bytesAdded = result.bytesUsed + graphIndex.addGraphNode(result); - totalBytesAllocatedConcurrent.add(bytesAdded); - termSizeReservoir.update(bytesAdded); + long bytesAdded = graphIndex.addGraphNode(result); + maxBytesAddedObserved.accumulateAndGet(bytesAdded, Math::max); + // Store the difference between the estimated and actual bytes added for correction on the + // next call to addInternalAsync. + reconciliationBytes.add(bytesAdded - estimatedBytes); } catch (Throwable th) { @@ -318,15 +324,10 @@ protected long addInternalAsync(List terms, int segmentRowId) updatesInFlight.decrementAndGet(); } }); - // bytes allocated will be approximated immediately as the average of recently added terms, - // rather than waiting until the async update completes to get the exact value. The latter could - // result in a dangerously large discrepancy between the amount of memory actually consumed - // and the amount the limiter knows about if the queue depth grows. - busyWaitWhile(() -> termSizeReservoir.size() == 0 && asyncThrowable.get() == null); + if (asyncThrowable.get() != null) { throw new RuntimeException("Error adding term asynchronously", asyncThrowable.get()); } - return (long) termSizeReservoir.getMean(); } @Override @@ -344,6 +345,12 @@ public boolean supportsAsyncAdd() return true; } + @Override + public void reconcileAsyncByteAllocations() + { + observeAllocatedBytes(reconciliationBytes.sumThenReset()); + } + @Override public boolean requiresFlush() { @@ -368,13 +375,14 @@ long release(IndexContext indexContext) public static class VectorOnHeapSegmentBuilder extends SegmentBuilder { private final CassandraOnHeapGraph graphIndex; + protected final AtomicLong maxBytesAddedObserved = new AtomicLong(0); + protected final LongAdder reconciliationBytes = new LongAdder(); public VectorOnHeapSegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, long keyCount, NamedMemoryLimiter limiter) { super(components, rowIdOffset, limiter); graphIndex = new CassandraOnHeapGraph<>(components.context(), false, null); totalBytesAllocated = graphIndex.ramBytesUsed(); - totalBytesAllocatedConcurrent.add(totalBytesAllocated); } @Override @@ -391,15 +399,20 @@ protected long addInternal(List terms, int segmentRowId) } @Override - protected long addInternalAsync(List terms, int segmentRowId) + protected void addInternalAsync(List terms, int segmentRowId) { updatesInFlight.incrementAndGet(); + long estimatedBytes = maxBytesAddedObserved.get() * 2; + long reconciledBytes = reconciliationBytes.sumThenReset(); + observeAllocatedBytes(estimatedBytes + reconciledBytes); compactionExecutor.submit(() -> { try { long bytesAdded = addInternal(terms, segmentRowId); - totalBytesAllocatedConcurrent.add(bytesAdded); - termSizeReservoir.update(bytesAdded); + maxBytesAddedObserved.accumulateAndGet(bytesAdded, Math::max); + // Store the difference between the estimated and actual bytes added for correction on the + // next call to addInternalAsync. + reconciliationBytes.add(bytesAdded - estimatedBytes); } catch (Throwable th) { @@ -410,15 +423,9 @@ protected long addInternalAsync(List terms, int segmentRowId) updatesInFlight.decrementAndGet(); } }); - // bytes allocated will be approximated immediately as the average of recently added terms, - // rather than waiting until the async update completes to get the exact value. The latter could - // result in a dangerously large discrepancy between the amount of memory actually consumed - // and the amount the limiter knows about if the queue depth grows. - busyWaitWhile(() -> termSizeReservoir.size() == 0 && asyncThrowable.get() == null); if (asyncThrowable.get() != null) { throw new RuntimeException("Error adding term asynchronously", asyncThrowable.get()); } - return (long) termSizeReservoir.getMean(); } @Override @@ -437,6 +444,12 @@ public boolean supportsAsyncAdd() { return true; } + + @Override + public void reconcileAsyncByteAllocations() + { + observeAllocatedBytes(reconciliationBytes.sumThenReset()); + } } private SegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, NamedMemoryLimiter limiter) @@ -452,6 +465,15 @@ private SegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, Na minimumFlushBytes = limiter.limitBytes() / ACTIVE_BUILDER_COUNT.getAndIncrement(); } + protected void observeAllocatedBytes(long bytes) + { + if (bytes != 0) + { + totalBytesAllocated += bytes; + limiter.increment(bytes); + } + } + public SegmentMetadata flush() throws IOException { assert !flushed; @@ -474,27 +496,25 @@ public SegmentMetadata flush() throws IOException return metadataBuilder.build(); } - public long analyzeAndAdd(ByteBuffer rawTerm, AbstractType type, PrimaryKey key, long sstableRowId) + public void analyzeAndAdd(ByteBuffer rawTerm, AbstractType type, PrimaryKey key, long sstableRowId) { - long totalSize = 0; if (TypeUtil.isLiteral(type)) { var terms = ByteLimitedMaterializer.materializeTokens(analyzer, rawTerm, components.context(), key); - totalSize += add(terms, key, sstableRowId); + add(terms, key, sstableRowId); totalTermCount += terms.size(); } else { - totalSize += add(List.of(rawTerm), key, sstableRowId); + add(List.of(rawTerm), key, sstableRowId); totalTermCount++; } - return totalSize; } - private long add(List terms, PrimaryKey key, long sstableRowId) + private void add(List terms, PrimaryKey key, long sstableRowId) { if (terms.isEmpty()) - return 0; + return; Preconditions.checkState(!flushed, "Cannot add to flushed segment"); Preconditions.checkArgument(sstableRowId >= maxSSTableRowId, @@ -527,23 +547,20 @@ private long add(List terms, PrimaryKey key, long sstableRowId) maxSegmentRowId = Math.max(maxSegmentRowId, segmentRowId); - long bytesAllocated; if (supportsAsyncAdd()) { // only vector indexing is done async and there can only be one term assert terms.size() == 1; - bytesAllocated = addInternalAsync(terms, segmentRowId); + addInternalAsync(terms, segmentRowId); } else { - bytesAllocated = addInternal(terms, segmentRowId); + long bytesAllocated = addInternal(terms, segmentRowId); + observeAllocatedBytes(bytesAllocated); } - - totalBytesAllocated += bytesAllocated; - return bytesAllocated; } - protected long addInternalAsync(List terms, int segmentRowId) + protected void addInternalAsync(List terms, int segmentRowId) { throw new UnsupportedOperationException(); } @@ -552,6 +569,11 @@ public boolean supportsAsyncAdd() { return false; } + protected void reconcileAsyncByteAllocations() + { + throw new UnsupportedOperationException(); + } + public Throwable getAsyncThrowable() { return asyncThrowable.get(); @@ -562,6 +584,7 @@ public void awaitAsyncAdditions() // addTerm is only called by the compaction thread, serially, so we don't need to worry about new // terms being added while we're waiting -- updatesInFlight can only decrease busyWaitWhile(() -> updatesInFlight.get() > 0); + reconcileAsyncByteAllocations(); } long totalBytesAllocated() diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java index 557f433df267..16cd43957ad7 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java @@ -155,6 +155,7 @@ private void testFlushBetweenRowIds(long sstableRowId1, long sstableRowId2, int MockSchema.newCFS("ks")); IndexComponents.ForWrite components = indexDescriptor.newPerIndexComponentsForWrite(indexContext); + long startingLimiterValue = V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER.currentBytesUsed(); SSTableIndexWriter writer = new SSTableIndexWriter(components, V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER, () -> false, () -> false, 2); List keys = Arrays.asList(dk("1"), dk("2")); @@ -172,6 +173,7 @@ private void testFlushBetweenRowIds(long sstableRowId1, long sstableRowId2, int writer.addRow(SAITester.TEST_FACTORY.create(key2, Clustering.EMPTY), row2, sstableRowId2); writer.complete(Stopwatch.createStarted()); + assertEquals(startingLimiterValue, V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER.currentBytesUsed()); MetadataSource source = MetadataSource.loadMetadata(components); From 8edfac06f36039dc9897801f255c1303afb27cca Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 17 Sep 2025 17:10:18 -0500 Subject: [PATCH 2/5] Add timeout to prevent infinite loop --- .../apache/cassandra/index/sai/disk/v1/SegmentBuilder.java | 2 +- src/java/org/apache/cassandra/utils/FBUtilities.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java index 0a8ab719a5a2..d712e233573b 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java @@ -583,7 +583,7 @@ public void awaitAsyncAdditions() { // addTerm is only called by the compaction thread, serially, so we don't need to worry about new // terms being added while we're waiting -- updatesInFlight can only decrease - busyWaitWhile(() -> updatesInFlight.get() > 0); + busyWaitWhile(() -> updatesInFlight.get() > 0, 60_000); reconcileAsyncByteAllocations(); } diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 1bb8677f5cb0..2caa3accf04c 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -1396,10 +1396,12 @@ public static String getStackTrace(ThreadInfo threadInfo) } } - public static void busyWaitWhile(Supplier condition) + public static void busyWaitWhile(Supplier condition, int timeoutMs) { while (condition.get()) { + if (timeoutMs-- <= 0) + throw new RuntimeException("Timeout while waiting for condition"); try { Thread.sleep(1); From fb296379716d5f3c591cfbe3a659f50a05848e24 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 18 Sep 2025 14:08:57 -0500 Subject: [PATCH 3/5] Fix reconcileAsyncByteAllocations --- .../org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java index d712e233573b..1c744adbaeee 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java @@ -571,7 +571,6 @@ public boolean supportsAsyncAdd() { protected void reconcileAsyncByteAllocations() { - throw new UnsupportedOperationException(); } public Throwable getAsyncThrowable() From 72797597300f2769682f05fee4dafdb6880728e3 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 19 Sep 2025 12:20:26 -0500 Subject: [PATCH 4/5] Improve comments describing reconciliation design --- .../apache/cassandra/index/sai/disk/v1/SegmentBuilder.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java index 1c744adbaeee..78d1bbda1439 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java @@ -301,7 +301,10 @@ protected void addInternalAsync(List terms, int segmentRowId) updatesInFlight.incrementAndGet(); // Increment by double max bytes before dispatching the task to avoid a subsequent insertion from - // exceeding the limit. Upon completion, the limiter will be corrected with the actual size. + // exceeding the limit. Also add back any bytes that were reconciled previous calls that have since + // completed. It is expected that the reconciliation process typically results in a net reduction + // because we overestimate the number of bytes required per insertion, and as such, we allow for a lazy + // reconciliation process. long estimatedBytes = maxBytesAddedObserved.get() * 2; long reconciledBytes = reconciliationBytes.sumThenReset(); observeAllocatedBytes(estimatedBytes + reconciledBytes); @@ -402,6 +405,7 @@ protected long addInternal(List terms, int segmentRowId) protected void addInternalAsync(List terms, int segmentRowId) { updatesInFlight.incrementAndGet(); + // See VectorOffHeapSegmentBuilder for comments on this logic long estimatedBytes = maxBytesAddedObserved.get() * 2; long reconciledBytes = reconciliationBytes.sumThenReset(); observeAllocatedBytes(estimatedBytes + reconciledBytes); From 0df78bb4b9cb1e2d2da95241cd9a0b6018f84147 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 22 Sep 2025 14:00:06 -0500 Subject: [PATCH 5/5] Fix concurrency bug in isEmpty We no longer block until the first insertion completes, and as such, we need to make sure that isEmpty correctly accounts for inflight inserts. This fixes a flaky test (only flaky for this branch) in the FeaturesVersionSupportTest. --- .../apache/cassandra/index/sai/disk/v1/SegmentBuilder.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java index 78d1bbda1439..6bccfab2991b 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java @@ -268,6 +268,7 @@ public VectorOffHeapSegmentBuilder(IndexComponents.ForWrite components, @Override public boolean isEmpty() { + // Don't need to check updatesInFlight because we add the vector to the graphIndex before dispatching the task. return graphIndex.isEmpty(); } @@ -391,7 +392,8 @@ public VectorOnHeapSegmentBuilder(IndexComponents.ForWrite components, long rowI @Override public boolean isEmpty() { - return graphIndex.isEmpty(); + // Must check updatesInFlight first to avoid a race with addInternalAsync and graphIndex.isEmpty(). + return updatesInFlight.get() == 0 && graphIndex.isEmpty(); } @Override