Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/pending_update.cc
update/update_sort_order.cc
update/update_partition_spec.cc
update/update_properties.cc
update/update_sort_order.cc
util/bucket_util.cc
util/conversions.cc
util/decimal.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/pending_update.cc',
'update/update_partition_spec.cc',
'update/update_properties.cc',
'update/update_sort_order.cc',
'util/bucket_util.cc',
Expand Down
31 changes: 31 additions & 0 deletions src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "iceberg/partition_spec.h"

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <format>
#include <memory>
Expand Down Expand Up @@ -95,6 +96,27 @@ Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(
return std::make_unique<StructType>(std::move(partition_fields));
}

bool PartitionSpec::CompatibleWith(const PartitionSpec& other) const {
if (Equals(other)) {
return true;
}

if (fields_.size() != other.fields_.size()) {
return false;
}

for (const auto& [lhs, rhs] :
std::ranges::zip_view<std::span<const PartitionField>,
std::span<const PartitionField>>{fields_, other.fields_}) {
if (lhs.source_id() != rhs.source_id() || *lhs.transform() != *rhs.transform() ||
lhs.name() != rhs.name()) {
return false;
}
}

return true;
}

std::string PartitionSpec::ToString() const {
std::string repr = std::format("partition_spec[spec_id<{}>,\n", spec_id_);
for (const auto& field : fields_) {
Expand Down Expand Up @@ -191,4 +213,13 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
}

bool PartitionSpec::HasSequentialFieldIds(const PartitionSpec& spec) {
for (size_t i = 0; i < spec.fields().size(); i += 1) {
if (spec.fields()[i].field_id() != PartitionSpec::kLegacyPartitionDataIdStart + i) {
return false;
}
}
return true;
}

} // namespace iceberg
7 changes: 7 additions & 0 deletions src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
/// \brief Get the partition type binding to the input schema.
Result<std::unique_ptr<StructType>> PartitionType(const Schema& schema) const;

/// \brief Returns true if this spec is equivalent to the other, with partition field
/// ids ignored. That is, if both specs have the same number of fields, field order,
/// field name, source columns, and transforms.
bool CompatibleWith(const PartitionSpec& other) const;

std::string ToString() const override;

int32_t last_assigned_field_id() const { return last_assigned_field_id_; }
Expand Down Expand Up @@ -111,6 +116,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

static bool HasSequentialFieldIds(const PartitionSpec& spec);

private:
/// \brief Create a new partition spec.
///
Expand Down
10 changes: 10 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "iceberg/table.h"

#include <memory>

#include "iceberg/catalog.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
Expand All @@ -28,6 +30,7 @@
#include "iceberg/table_properties.h"
#include "iceberg/table_scan.h"
#include "iceberg/transaction.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -147,6 +150,13 @@ Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
/*auto_commit=*/false);
}

Result<std::shared_ptr<UpdatePartitionSpec>> Table::NewUpdatePartitionSpec() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewUpdatePartitionSpec();
}

Result<std::shared_ptr<UpdateProperties>> Table::NewUpdateProperties() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// \brief Create a new Transaction to commit multiple table operations at once.
virtual Result<std::shared_ptr<Transaction>> NewTransaction();

/// \brief Create a new UpdatePartitionSpec to update the partition spec of this table
/// and commit the changes.
virtual Result<std::shared_ptr<UpdatePartitionSpec>> NewUpdatePartitionSpec();

/// \brief Create a new UpdateProperties to update table properties and commit the
/// changes.
virtual Result<std::shared_ptr<UpdateProperties>> NewUpdateProperties();
Expand Down
104 changes: 97 additions & 7 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
#include "iceberg/exception.h"
#include "iceberg/file_io.h"
#include "iceberg/json_internal.h"
#include "iceberg/partition_field.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_update.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/gzip_internal.h"
#include "iceberg/util/location_util.h"
Expand Down Expand Up @@ -428,7 +430,8 @@ class TableMetadataBuilder::Impl {
Result<int32_t> AddSortOrder(const SortOrder& order);
Status SetProperties(const std::unordered_map<std::string, std::string>& updated);
Status RemoveProperties(const std::unordered_set<std::string>& removed);

Status SetDefaultPartitionSpec(int32_t spec_id);
Result<int32_t> AddPartitionSpec(const PartitionSpec& spec);
std::unique_ptr<TableMetadata> Build();

private:
Expand All @@ -438,6 +441,12 @@ class TableMetadataBuilder::Impl {
/// \return The ID to use for this sort order (reused if exists, new otherwise)
int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order);

/// \brief Internal method to check for existing partition spec and reuse its ID or
/// create a new one
/// \param new_spec The partition spec to check
/// \return The ID to use for this partition spec (reused if exists, new otherwise)
int32_t ReuseOrCreateNewPartitionSpecId(const PartitionSpec& new_spec);

private:
// Base metadata (nullptr for new tables)
const TableMetadata* base_;
Expand Down Expand Up @@ -540,9 +549,10 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order)
bool is_new_order =
last_added_order_id_.has_value() &&
std::ranges::find_if(changes_, [new_order_id](const auto& change) {
auto* add_sort_order = dynamic_cast<table::AddSortOrder*>(change.get());
return add_sort_order &&
add_sort_order->sort_order()->order_id() == new_order_id;
return change->kind() == TableUpdate::Kind::kAddSortOrder &&
internal::checked_cast<const table::AddSortOrder&>(*change)
.sort_order()
->order_id() == new_order_id;
}) != changes_.cend();
last_added_order_id_ = is_new_order ? std::make_optional(new_order_id) : std::nullopt;
return new_order_id;
Expand Down Expand Up @@ -572,6 +582,69 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order)
return new_order_id;
}

Status TableMetadataBuilder::Impl::SetDefaultPartitionSpec(int32_t spec_id) {
if (spec_id == -1) {
if (!last_added_spec_id_.has_value()) {
return ValidationFailed(
"Cannot set last added partition spec: no partition spec has been added");
}
return SetDefaultPartitionSpec(last_added_spec_id_.value());
}

if (spec_id == metadata_.default_spec_id) {
// the new spec is already current and no change is needed
return {};
}

metadata_.default_spec_id = spec_id;
if (last_added_spec_id_ == std::make_optional(spec_id)) {
changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(kLastAdded));
} else {
changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(spec_id));
}
return {};
}

Result<int32_t> TableMetadataBuilder::Impl::AddPartitionSpec(const PartitionSpec& spec) {
int32_t new_spec_id = ReuseOrCreateNewPartitionSpecId(spec);

if (specs_by_id_.contains(new_spec_id)) {
// update last_added_spec_id if the spec was added in this set of changes (since it
// is now the last)
bool is_new_spec =
last_added_spec_id_.has_value() &&
std::ranges::find_if(changes_, [new_spec_id](const auto& change) {
return change->kind() == TableUpdate::Kind::kAddPartitionSpec &&
internal::checked_cast<const table::AddPartitionSpec&>(*change)
.spec()
->spec_id() == new_spec_id;
}) != changes_.cend();
Comment on lines +616 to +621
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::ranges::find_if(changes_, [new_spec_id](const auto& change) {
return change->kind() == TableUpdate::Kind::kAddPartitionSpec &&
internal::checked_cast<const table::AddPartitionSpec&>(*change)
.spec()
->spec_id() == new_spec_id;
}) != changes_.cend();
std::ranges::any_of(changes_, [new_spec_id](const auto& change) {
return change->kind() == TableUpdate::Kind::kAddPartitionSpec &&
internal::checked_cast<const table::AddPartitionSpec&>(*change)
.spec()
->spec_id() == new_spec_id;
});

last_added_spec_id_ = is_new_spec ? std::make_optional(new_spec_id) : std::nullopt;
return new_spec_id;
}

// Get current schema and validate the partition spec against it
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata_.Schema());
ICEBERG_RETURN_UNEXPECTED(spec.Validate(*schema, /*allow_missing_fields=*/false));
ICEBERG_CHECK(
metadata_.format_version > 1 || PartitionSpec::HasSequentialFieldIds(spec),
"Spec does not use sequential IDs that are required in v1: {}", spec.ToString());

ICEBERG_ASSIGN_OR_RAISE(
std::shared_ptr<PartitionSpec> new_spec,
PartitionSpec::Make(new_spec_id, std::vector<PartitionField>(spec.fields().begin(),
spec.fields().end())));
metadata_.last_partition_id =
std::max(metadata_.last_partition_id, new_spec->last_assigned_field_id());
metadata_.partition_specs.push_back(new_spec);
specs_by_id_.emplace(new_spec_id, new_spec);

changes_.push_back(std::make_unique<table::AddPartitionSpec>(new_spec));
last_added_spec_id_ = new_spec_id;

return new_spec_id;
}

Status TableMetadataBuilder::Impl::SetProperties(
const std::unordered_map<std::string, std::string>& updated) {
// If updated is empty, return early (no-op)
Expand Down Expand Up @@ -653,6 +726,20 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId(
return new_order_id;
}

int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
const PartitionSpec& new_spec) {
// if the spec already exists, use the same ID. otherwise, use the highest ID + 1.
int32_t new_spec_id = PartitionSpec::kInitialSpecId;
for (const auto& spec : metadata_.partition_specs) {
if (new_spec.CompatibleWith(*spec)) {
return spec->spec_id();
} else if (new_spec_id <= spec->spec_id()) {
new_spec_id = spec->spec_id() + 1;
}
}
return new_spec_id;
}

TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
: impl_(std::make_unique<Impl>(format_version)) {}

Expand Down Expand Up @@ -723,16 +810,19 @@ TableMetadataBuilder& TableMetadataBuilder::AddSchema(std::shared_ptr<Schema> sc

TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(
std::shared_ptr<PartitionSpec> spec) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, impl_->AddPartitionSpec(*spec));
return SetDefaultPartitionSpec(spec_id);
}

TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(int32_t spec_id) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetDefaultPartitionSpec(spec_id));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::AddPartitionSpec(
std::shared_ptr<PartitionSpec> spec) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, impl_->AddPartitionSpec(*spec));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/table_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
// AddPartitionSpec

void AddPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.AddPartitionSpec(spec_);
}

void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
Expand All @@ -82,7 +82,7 @@ void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
// SetDefaultPartitionSpec

void SetDefaultPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.SetDefaultPartitionSpec(spec_id_);
}

void SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
transaction_test.cc
update_partition_spec_test.cc
update_properties_test.cc
update_sort_order_test.cc)

Expand Down
Loading
Loading