Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -705,14 +705,14 @@ struct ScanRecord {
RowView row;
};

/// A view into a subset of scan results for a single bucket.
/// A bundle of scan records belonging to a single bucket.
///
/// BucketView is a value type — it shares ownership of the underlying scan data
/// BucketRecords is a value type — it shares ownership of the underlying scan data
/// via reference counting, so it can safely outlive the ScanRecords that produced it.
class BucketView {
class BucketRecords {
public:
BucketView(std::shared_ptr<const detail::ScanData> data, TableBucket bucket, size_t bucket_idx,
size_t count)
BucketRecords(std::shared_ptr<const detail::ScanData> data, TableBucket bucket,
size_t bucket_idx, size_t count)
: data_(std::move(data)),
bucket_(std::move(bucket)),
bucket_idx_(bucket_idx),
Expand All @@ -738,9 +738,9 @@ class BucketView {
bool operator!=(const Iterator& other) const { return idx_ != other.idx_; }

private:
friend class BucketView;
Iterator(const BucketView* owner, size_t idx) : owner_(owner), idx_(idx) {}
const BucketView* owner_;
friend class BucketRecords;
Iterator(const BucketRecords* owner, size_t idx) : owner_(owner), idx_(idx) {}
const BucketRecords* owner_;
size_t idx_;
};

Expand Down Expand Up @@ -774,16 +774,16 @@ class ScanRecords {
/// List of distinct buckets that have records.
std::vector<TableBucket> Buckets() const;

/// Get a view of records for a specific bucket.
/// Get records for a specific bucket.
///
/// Returns an empty BucketView if the bucket is not present (matches Rust/Java).
/// Returns an empty BucketRecords if the bucket is not present (matches Rust/Java).
/// Note: O(B) linear scan. For iteration over all buckets, prefer BucketAt(idx).
BucketView Records(const TableBucket& bucket) const;
BucketRecords Records(const TableBucket& bucket) const;

/// Get a view of records by bucket index (0-based). O(1).
/// Get records by bucket index (0-based). O(1).
///
/// Throws std::out_of_range if idx >= BucketCount().
BucketView BucketAt(size_t idx) const;
BucketRecords BucketAt(size_t idx) const;

/// Flat iterator over all records across all buckets (matches Java Iterable<ScanRecord>).
class Iterator {
Expand Down
2 changes: 1 addition & 1 deletion bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2061,7 +2061,7 @@ impl ScanResultInner {
self.columns.len()
}

// Field accessors — C++ validates bounds in BucketView/RecordAt, validate() checks field.
// Field accessors — C++ validates bounds in BucketRecords/RecordAt, validate() checks field.
fn sv_is_null(&self, bucket: usize, rec: usize, field: usize) -> Result<bool, String> {
row_reader::is_null(self.resolve(bucket, rec).row(), &self.columns, field)
}
Expand Down
20 changes: 10 additions & 10 deletions bindings/cpp/src/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,21 +334,21 @@ std::vector<TableBucket> ScanRecords::Buckets() const {
return result;
}

BucketView ScanRecords::Records(const TableBucket& bucket) const {
BucketRecords ScanRecords::Records(const TableBucket& bucket) const {
if (!data_) {
return BucketView({}, bucket, 0, 0);
return BucketRecords({}, bucket, 0, 0);
}
const auto& infos = data_->raw->sv_bucket_infos();
for (size_t i = 0; i < infos.size(); ++i) {
TableBucket tb = to_table_bucket(infos[i]);
if (tb == bucket) {
return BucketView(data_, std::move(tb), i, infos[i].record_count);
return BucketRecords(data_, std::move(tb), i, infos[i].record_count);
}
}
return BucketView({}, bucket, 0, 0);
return BucketRecords({}, bucket, 0, 0);
}

BucketView ScanRecords::BucketAt(size_t idx) const {
BucketRecords ScanRecords::BucketAt(size_t idx) const {
if (!data_) {
throw std::logic_error("ScanRecords: not available (moved-from or null)");
}
Expand All @@ -357,12 +357,12 @@ BucketView ScanRecords::BucketAt(size_t idx) const {
throw std::out_of_range("ScanRecords::BucketAt: index " + std::to_string(idx) +
" out of range (" + std::to_string(infos.size()) + " buckets)");
}
return BucketView(data_, to_table_bucket(infos[idx]), idx, infos[idx].record_count);
return BucketRecords(data_, to_table_bucket(infos[idx]), idx, infos[idx].record_count);
}

ScanRecord BucketView::operator[](size_t idx) const {
ScanRecord BucketRecords::operator[](size_t idx) const {
if (idx >= count_) {
throw std::out_of_range("BucketView: index " + std::to_string(idx) + " out of range (" +
throw std::out_of_range("BucketRecords: index " + std::to_string(idx) + " out of range (" +
std::to_string(count_) + " records)");
}
return ScanRecord{data_->raw->sv_offset(bucket_idx_, idx),
Expand All @@ -371,7 +371,7 @@ ScanRecord BucketView::operator[](size_t idx) const {
RowView(data_, bucket_idx_, idx)};
}

ScanRecord BucketView::Iterator::operator*() const { return owner_->operator[](idx_); }
ScanRecord BucketRecords::Iterator::operator*() const { return owner_->operator[](idx_); }

// ============================================================================
// LookupResult — backed by opaque Rust LookupResultInner
Expand Down Expand Up @@ -1146,7 +1146,7 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {

// Wrap raw pointer in ScanData immediately so it's never leaked on exception.
auto data = std::make_shared<detail::ScanData>(result_box.into_raw(), detail::ColumnMap{});
// Build column map eagerly — shared by all RowViews/BucketViews.
// Build column map eagerly — shared by all RowViews/BucketRecords.
auto col_count = data->raw->sv_column_count();
for (size_t i = 0; i < col_count; ++i) {
auto name = data->raw->sv_column_name(i);
Expand Down
8 changes: 4 additions & 4 deletions bindings/cpp/test/test_log_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) {
{1, "a1"}, {2, "a2"}, {3, "a3"}, {4, "a4"}, {5, "a5"}, {6, "a6"}};
EXPECT_EQ(records, expected);

// Verify per-bucket iteration via BucketView
// Verify per-bucket iteration via BucketRecords
{
fluss::Table bucket_table;
ASSERT_OK(conn.GetTable(table_path, bucket_table));
Expand All @@ -140,11 +140,11 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) {

// Iterate by bucket
for (size_t b = 0; b < scan_records.BucketCount(); ++b) {
auto bucket_view = scan_records.BucketAt(b);
if (!bucket_view.Empty()) {
auto bkt_records = scan_records.BucketAt(b);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise shadowed

if (!bkt_records.Empty()) {
buckets_with_data++;
}
for (auto rec : bucket_view) {
for (auto rec : bkt_records) {
bucket_records.emplace_back(rec.row.GetInt32(0),
std::string(rec.row.GetString(1)));
}
Expand Down
8 changes: 4 additions & 4 deletions website/docs/user-guide/cpp/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,12 @@ for (const auto& rec : records) {
|-----------------------------------------------------------------|-----------------------------------------------------------------------|
| `BucketCount() -> size_t` | Number of distinct buckets |
| `Buckets() -> std::vector<TableBucket>` | List of distinct buckets |
| `Records(const TableBucket& bucket) -> BucketView` | Records for a specific bucket (empty view if bucket not present) |
| `BucketAt(size_t idx) -> BucketView` | Records by bucket index (0-based, O(1)) |
| `Records(const TableBucket& bucket) -> BucketRecords` | Records for a specific bucket (empty if bucket not present) |
| `BucketAt(size_t idx) -> BucketRecords` | Records by bucket index (0-based, O(1)) |

## `BucketView`
## `BucketRecords`

A view of records within a single bucket. Obtained from `ScanRecords::Records()` or `ScanRecords::BucketAt()`. `BucketView` is a value type — it shares ownership of the underlying scan data via reference counting, so it can safely outlive the `ScanRecords` that produced it.
A bundle of scan records belonging to a single bucket. Obtained from `ScanRecords::Records()` or `ScanRecords::BucketAt()`. `BucketRecords` is a value type — it shares ownership of the underlying scan data via reference counting, so it can safely outlive the `ScanRecords` that produced it.

| Method | Description |
|------------------------------------------------|--------------------------------------------|
Expand Down
Loading