Skip to content

Conversation

@liudezhi2098
Copy link

Summary

  • Implement asyncTrimConsumedLedgersBefore(long ledgerId) method in ManagedLedger for precise ledger trimming at a specified ledger ID boundary
  • Add mutex-based concurrency control to ensure only one trim operation can execute at a time, with automatic retry for concurrent calls
  • Return list of deleted ledger IDs from the trim operation for auditing purposes
  • Add comprehensive tests including concurrent operation scenarios

Motivation

The existing trimConsumedLedgersSince() method deletes ledgers based on time, which is not precise enough for certain scenarios. This new method allows users to precisely trim ledgers up to a specific
ledger ID, giving finer control over ledger retention.

Key Features

  1. Precise Ledger Trimming: Delete all fully consumed ledgers with IDs less than the specified ledger ID
  2. Concurrency Control: Mutex-based locking ensures only one trim operation proceeds at a time
  3. Automatic Retry: Concurrent trim calls automatically retry with exponential backoff
  4. Audit Support: Returns list of deleted ledger IDs for tracking

Test plan

  • Unit tests for basic trim functionality
  • Concurrent trim operations test (5 threads)
  • High contention test (20 rapid operations)
  • Verify mutex and retry logic
  • Integration test with admin API

Changes

  • ManagedLedger.java: Add asyncTrimConsumedLedgersBefore(long) interface
  • ManagedLedgerImpl.java: Core implementation with mutex and retry logic
  • TopicsImpl.java: Admin API integration
  • ManagedLedgerImplExtTest.java: Comprehensive tests including concurrent scenarios

liudezhi2098 and others added 7 commits January 20, 2026 15:24
Add ability to trim consumed ledgers before a specified ledger ID.

Changes:
- Add `asyncTrimConsumedLedgersBefore(long ledgerId)` method to ManagedLedger interface
- Add protected factory method `createManagedLedgerInstance()` to ManagedLedgerFactoryImpl
  to allow subclass extension
- Make several methods/fields protected in ManagedLedgerImpl to enable extension access
  (metadataMutex, trimmerMutex, invalidateReadHandle, isOffloadedNeedsDelete, etc.)
- Create new managed-ledger-ext module with:
  - ManagedLedgerImplExt: Extended implementation with custom trim logic
  - ManagedLedgerFactoryImplExt: Factory that creates ManagedLedgerImplExt instances
  - ManagedLedgerClientFactoryExt: Storage implementation for Pulsar integration

This allows users to manually trigger ledger deletion before a specified
ledger ID, provided all ledgers before that ID have been fully consumed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
…rimming

Add a new method to trim consumed ledgers before a specific ledger ID,
allowing more precise control over ledger cleanup than the existing
retention-based trimming.

Key features:
- Trim all consumed ledgers before a specified ledger ID
- Handle edge cases: large IDs, gap IDs, small IDs
- Protection against unconsumed data via slowest reader check
- Automatic boundary adjustment when ID equals current ledger

Changes:
- Add ManagedLedgerImplExt class extending ManagedLedgerImpl
- Implement asyncTrimConsumedLedgersBefore method
- Add comprehensive test coverage (9 test cases)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Fix checkstyle violations:
- Fix line length issues (max 120 characters)
- Fix OperatorWrap for '+' concatenation
- Add package-info.java with Javadoc

Fix license headers:
- Add Apache license header to package-info.java
- Update pom.xml license header format

All 9 tests pass, checkstyle and license checks pass.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
… calls

Changes:
- Add asyncTrimConsumedLedgersBefore to ManagedLedger interface with
  default implementation that returns a failed future for unsupported
  operations
- Remove reflection-based invocation from PersistentTopicsBase,
  now calls the method directly on ManagedLedger interface
- Fix resource leak where future was not completed when
  calculateSlowestReaderLedgerId returns -1

This allows the trim operation to work with ManagedLedgerImplExt
while providing clear error messages for standard ManagedLedgerImpl.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
…mprehensive tests

- Refine trim semantics: delete boundary ledger for middle ledgers, keep for current
- Add detailed javadoc comments explaining all scenarios:
  * trimConsumedLedgersBefore(L4) where L4 is current: delete L1, L2, L3, keep L4
  * trimConsumedLedgersBefore(L3) where L3 is middle: delete L1, L2, L3, keep L4
  * trimConsumedLedgersBefore(L2) where L2 is middle: delete L1, L2, keep L3, L4
  * Handle gaps: trimConsumedLedgersBefore(L3) with L1, L2, L4 uses L2 as boundary
- Update all test cases with clear semantic documentation
- Add AdminApiTrimConsumedLedgersBeforeTest in managed-ledger-ext module
- Add basic AdminApiTrimConsumedLedgersBeforeTest in pulsar-broker module
- Add pulsar-broker test-jar dependency to managed-ledger-ext for testing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
…Before

Modify asyncTrimConsumedLedgersBefore to return CompletableFuture<List<Long>>
containing the IDs of all deleted ledgers. This provides better transparency,
debugging capability, and monitoring for trim operations.

Changes:
- Update ManagedLedger interface to return CompletableFuture<List<Long>>
- Update ManagedLedgerImplExt to collect and return deleted ledger IDs
- Return empty list when no ledgers are deleted (including when not fully consumed)
- Update Topics API (sync/async) to return List<Long>
- Update REST endpoint to return JSON array of deleted ledger IDs (HTTP 200)
- Update CLI tool to display deleted ledger IDs line by line
- Update all test cases to verify the new return type

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
… logic

Added two comprehensive test cases:
- testConcurrentTrimOperations: Verifies that multiple threads can safely call trimConsumedLedgersBefore concurrently, ensuring only one operation proceeds at a time while others retry
- testHighContentionTrimOperations: Tests rapid successive trim calls under high contention to verify retry mechanism works correctly

Also simplified response handling in TopicsImpl.trimConsumedLedgersBefore by removing unnecessary 204 NO_CONTENT handling.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants