diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index f3a521379e..095b872eba 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -2384,11 +2384,11 @@ mod tests { let updated_table = table_commit.apply(table).unwrap(); assert_eq!( - updated_table.metadata().properties.get("prop1").unwrap(), + updated_table.metadata().properties.other.get("prop1").unwrap(), "v1" ); assert_eq!( - updated_table.metadata().properties.get("prop2").unwrap(), + updated_table.metadata().properties.other.get("prop2").unwrap(), "v2" ); diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 585cb3e2bc..58a9217ad6 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -37,11 +37,10 @@ pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataB use super::{ DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType, - TableProperties, }; use crate::error::{Result, timestamp_ms_to_utc}; use crate::io::FileIO; -use crate::spec::EncryptedKey; +use crate::spec::{EncryptedKey, TableProperties}; use crate::{Error, ErrorKind}; static MAIN_BRANCH: &str = "main"; @@ -91,7 +90,7 @@ pub struct TableMetadata { ///A string to string map of table properties. This is used to control settings that /// affect reading and writing and is not intended to be used for arbitrary metadata. /// For example, commit.retry.num-retries is used to control the number of commit retries. - pub(crate) properties: HashMap, + pub(crate) properties: TableProperties, /// long ID of the current table snapshot; must be the same as the current /// ID of the main branch in refs. pub(crate) current_snapshot_id: Option, @@ -358,16 +357,13 @@ impl TableMetadata { /// Returns properties of table. #[inline] pub fn properties(&self) -> &HashMap { - &self.properties + &self.properties.other } /// Returns typed table properties parsed from the raw properties map with defaults. - pub fn table_properties(&self) -> Result { - TableProperties::try_from(&self.properties).map_err(|e| { - Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e) - }) + pub fn table_properties(&self) -> &TableProperties { + &self.properties } - /// Return location of statistics files. #[inline] pub fn statistics_iter(&self) -> impl ExactSizeIterator { @@ -969,7 +965,7 @@ pub(super) mod _serde { default_partition_type, default_spec, last_partition_id: value.last_partition_id, - properties: value.properties.unwrap_or_default(), + properties: crate::spec::TableProperties::new(value.properties.unwrap_or_default()), current_snapshot_id, snapshots: snapshots .map(|snapshots| { @@ -1082,7 +1078,7 @@ pub(super) mod _serde { default_partition_type, default_spec, last_partition_id: value.last_partition_id, - properties: value.properties.unwrap_or_default(), + properties: crate::spec::TableProperties::new(value.properties.unwrap_or_default()), current_snapshot_id, snapshots: snapshots .map(|snapshots| { @@ -1234,7 +1230,7 @@ pub(super) mod _serde { .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()), partition_specs, schemas, - properties: value.properties.unwrap_or_default(), + properties: crate::spec::TableProperties::new(value.properties.unwrap_or_default()), current_snapshot_id, snapshots: value .snapshots @@ -1361,10 +1357,10 @@ pub(super) mod _serde { .collect(), default_spec_id: v.default_spec.spec_id(), last_partition_id: v.last_partition_id, - properties: if v.properties.is_empty() { + properties: if v.properties.other.is_empty() { None } else { - Some(v.properties) + Some(v.properties.other.clone()) }, current_snapshot_id: v.current_snapshot_id, snapshot_log: if v.snapshot_log.is_empty() { @@ -1430,10 +1426,10 @@ pub(super) mod _serde { ), default_spec_id: Some(v.default_spec.spec_id()), last_partition_id: Some(v.last_partition_id), - properties: if v.properties.is_empty() { + properties: if v.properties.other.is_empty() { None } else { - Some(v.properties) + Some(v.properties.other.clone()) }, current_snapshot_id: v.current_snapshot_id, snapshots: if v.snapshots.is_empty() { @@ -1557,6 +1553,7 @@ impl SnapshotLog { #[cfg(test)] mod tests { + use super::*; use std::collections::HashMap; use std::fs; use std::io::Write as _; @@ -1577,7 +1574,7 @@ mod tests { SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, Summary, Transform, Type, UnboundPartitionField, }; - use crate::{ErrorKind, TableCreation}; + use crate::TableCreation; fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) { let desered_type: TableMetadata = serde_json::from_str(json).unwrap(); @@ -1709,10 +1706,10 @@ mod tests { snapshots: HashMap::default(), current_snapshot_id: None, last_sequence_number: 1, - properties: HashMap::from_iter(vec![( - "commit.retry.num-retries".to_string(), - "1".to_string(), - )]), + properties: TableProperties::new(HashMap::from_iter(vec![( + "commit.retry.num-retries".to_string(), + "1".to_string(), + )])), snapshot_log: Vec::new(), metadata_log: vec![MetadataLog { metadata_file: "s3://bucket/.../v1.json".to_string(), @@ -1847,7 +1844,7 @@ mod tests { .with_encryption_key_id(Some("key1".to_string())) .with_summary(Summary { operation: Operation::Append, - additional_properties: HashMap::new(), + additional_properties: HashMap::new() }) .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string()) .with_schema_id(0) @@ -1885,10 +1882,10 @@ mod tests { snapshots: HashMap::from_iter(vec![(1, snapshot.into())]), current_snapshot_id: None, last_sequence_number: 1, - properties: HashMap::from_iter(vec![( + properties: TableProperties::new(HashMap::from_iter(vec![( "commit.retry.num-retries".to_string(), "1".to_string(), - )]), + )])), snapshot_log: Vec::new(), metadata_log: vec![MetadataLog { metadata_file: "s3://bucket/.../v1.json".to_string(), @@ -2044,8 +2041,13 @@ mod tests { .with_sequence_number(0) .with_schema_id(0) .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") - .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) - .build(); + .with_summary(Summary { operation: Operation::Append, + additional_properties: HashMap::from_iter(vec![ + ("spark.app.id".to_string(), "local-1662532784305".to_string()), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()) ]) }) + .build(); let default_partition_type = partition_spec.partition_type(&schema).unwrap(); let expected = TableMetadata { @@ -2065,7 +2067,7 @@ mod tests { snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]), current_snapshot_id: Some(638933773299822130), last_sequence_number: 0, - properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]), + properties: TableProperties::new(HashMap::from_iter(vec![("owner".to_string(), "root".to_string())])), snapshot_log: vec![SnapshotLog { snapshot_id: 638933773299822130, timestamp_ms: 1662532818843, @@ -2164,7 +2166,7 @@ mod tests { snapshots: HashMap::default(), current_snapshot_id: None, last_sequence_number: 1, - properties: HashMap::new(), + properties: TableProperties::default(), snapshot_log: Vec::new(), metadata_log: vec![MetadataLog { metadata_file: "s3://bucket/.../v1.json".to_string(), @@ -2682,7 +2684,7 @@ mod tests { snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]), current_snapshot_id: Some(3055729675574597004), last_sequence_number: 34, - properties: HashMap::new(), + properties: TableProperties::default(), snapshot_log: Vec::new(), metadata_log: Vec::new(), statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile { @@ -2696,7 +2698,7 @@ mod tests { sequence_number: 1, fields: vec![1], r#type: "ndv".to_string(), - properties: HashMap::new(), + properties: HashMap::new(), }], })]), partition_statistics: HashMap::new(), @@ -2824,7 +2826,7 @@ mod tests { snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]), current_snapshot_id: Some(3055729675574597004), last_sequence_number: 34, - properties: HashMap::new(), + properties: TableProperties::default(), snapshot_log: Vec::new(), metadata_log: Vec::new(), statistics: HashMap::new(), @@ -2950,7 +2952,7 @@ mod tests { snapshots: HashMap::default(), current_snapshot_id: None, last_sequence_number: 34, - properties: HashMap::new(), + properties: TableProperties::default(), snapshot_log: Vec::new(), metadata_log: Vec::new(), refs: HashMap::new(), @@ -3074,7 +3076,7 @@ mod tests { ]), current_snapshot_id: Some(3055729675574597004), last_sequence_number: 34, - properties: HashMap::new(), + properties: TableProperties::default(), snapshot_log: vec![ SnapshotLog { snapshot_id: 3051729675574597004, @@ -3176,7 +3178,7 @@ mod tests { snapshots: HashMap::default(), current_snapshot_id: None, last_sequence_number: 34, - properties: HashMap::new(), + properties: TableProperties::default(), snapshot_log: vec![], metadata_log: Vec::new(), refs: HashMap::new(), @@ -3246,7 +3248,7 @@ mod tests { snapshots: HashMap::new(), current_snapshot_id: None, last_sequence_number: 0, - properties: HashMap::new(), + properties: TableProperties::default(), snapshot_log: vec![], metadata_log: Vec::new(), refs: HashMap::new(), @@ -3510,7 +3512,7 @@ mod tests { .len(), 0 ); - assert_eq!(table_metadata.properties.len(), 0); + assert_eq!(table_metadata.properties.other.len(), 0); assert_eq!( table_metadata.partition_specs, HashMap::from([( @@ -3894,7 +3896,7 @@ mod tests { .unwrap() .metadata; - let props = metadata.table_properties().unwrap(); + let props = metadata.table_properties(); assert_eq!( props.commit_num_retries, @@ -3941,41 +3943,10 @@ mod tests { .unwrap() .metadata; - let props = metadata.table_properties().unwrap(); - - assert_eq!(props.commit_num_retries, 10); + let props = metadata.table_properties(); + + assert_eq!(props.commit_num_retries, 10); assert_eq!(props.write_target_file_size_bytes, 1024); } - #[test] - fn test_table_properties_with_invalid_value() { - let schema = Schema::builder() - .with_fields(vec![ - NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), - ]) - .build() - .unwrap(); - - let properties = HashMap::from([( - "commit.retry.num-retries".to_string(), - "not_a_number".to_string(), - )]); - - let metadata = TableMetadataBuilder::new( - schema, - PartitionSpec::unpartition_spec().into_unbound(), - SortOrder::unsorted_order(), - "s3://test/location".to_string(), - FormatVersion::V2, - properties, - ) - .unwrap() - .build() - .unwrap() - .metadata; - - let err = metadata.table_properties().unwrap_err(); - assert_eq!(err.kind(), ErrorKind::DataInvalid); - assert!(err.message().contains("Invalid table properties")); - } } diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 3db327d48a..2a43eaa032 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -110,7 +110,7 @@ impl TableMetadataBuilder { ), // Overwritten immediately by add_default_partition_spec default_partition_type: StructType::new(vec![]), last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID, - properties: HashMap::new(), + properties: TableProperties::default(), current_snapshot_id: None, snapshots: HashMap::new(), snapshot_log: vec![], @@ -272,7 +272,7 @@ impl TableMetadataBuilder { return Ok(self); } - self.metadata.properties.extend(properties.clone()); + self.metadata.properties.other.extend(properties.clone()); self.changes.push(TableUpdate::SetProperties { updates: properties, }); @@ -307,7 +307,7 @@ impl TableMetadataBuilder { } for property in &properties { - self.metadata.properties.remove(property); + self.metadata.properties.other.remove(property); } if !properties.is_empty() { @@ -1128,6 +1128,7 @@ impl TableMetadataBuilder { let max_size = self .metadata .properties + .other .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX) .and_then(|v| v.parse::().ok()) .unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT) @@ -1510,7 +1511,7 @@ mod tests { assert_eq!(metadata.snapshots.len(), 0); assert_eq!(metadata.current_snapshot_id, None); assert_eq!(metadata.refs.len(), 0); - assert_eq!(metadata.properties.len(), 0); + assert_eq!(metadata.properties.other.len(), 0); assert_eq!(metadata.metadata_log.len(), 0); assert_eq!(metadata.last_sequence_number, 0); assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID); @@ -1559,7 +1560,7 @@ mod tests { assert_eq!(metadata.snapshots.len(), 0); assert_eq!(metadata.current_snapshot_id, None); assert_eq!(metadata.refs.len(), 0); - assert_eq!(metadata.properties.len(), 0); + assert_eq!(metadata.properties.other.len(), 0); assert_eq!(metadata.metadata_log.len(), 0); assert_eq!(metadata.last_sequence_number, 0); } diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 413604f51c..a2cccd42f6 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -35,7 +35,7 @@ where } /// TableProperties that contains the properties of a table. -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)] pub struct TableProperties { /// The number of times to retry a commit. pub commit_num_retries: usize, @@ -51,6 +51,9 @@ pub struct TableProperties { pub write_target_file_size_bytes: usize, /// Whether to use `FanoutWriter` for partitioned tables. pub write_datafusion_fanout_enabled: bool, + /// Any other properties that are not explicitly captured in named fields. + #[serde(flatten)] + pub other: HashMap, } impl TableProperties { @@ -68,6 +71,11 @@ impl TableProperties { pub const PROPERTY_UUID: &str = "uuid"; /// Reserved table property for the total number of snapshots. pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count"; + /// Creates a new TableProperties from a HashMap of raw strings. + pub fn new(props: HashMap) -> Self { + Self::try_from(&props).unwrap_or_default() + } + /// Reserved table property for current snapshot summary. pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary"; /// Reserved table property for current snapshot id. @@ -187,6 +195,7 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED, TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT, )?, + other: props.clone(), }) } } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 074c7fefe4..d5c49b1438 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -163,9 +163,9 @@ impl Transaction { return Ok(self.table); } - let table_props = self.table.metadata().table_properties()?; + let table_props = self.table.metadata().table_properties(); - let backoff = Self::build_backoff(table_props)?; + let backoff = Self::build_backoff(table_props.clone())?; let tx = self; (|mut tx: Transaction| async { diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index 0ad4d91ac6..e50b7f11cc 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use std::sync::atomic::AtomicU64; - use crate::Result; use crate::spec::{DataFileFormat, PartitionKey, TableMetadata}; @@ -146,7 +145,7 @@ pub(crate) mod test { use std::sync::Arc; use uuid::Uuid; - + use crate::spec::TableProperties; use super::LocationGenerator; use crate::spec::{ FormatVersion, Literal, NestedField, PartitionKey, PartitionSpec, PrimitiveType, Schema, @@ -157,6 +156,7 @@ pub(crate) mod test { WRITE_FOLDER_STORAGE_LOCATION, }; + #[test] fn test_default_location_generate() { let mut table_metadata = TableMetadata { @@ -176,7 +176,7 @@ pub(crate) mod test { snapshots: HashMap::default(), current_snapshot_id: None, last_sequence_number: 1, - properties: HashMap::new(), + properties: TableProperties::default(), snapshot_log: Vec::new(), metadata_log: vec![], refs: HashMap::new(), @@ -200,7 +200,7 @@ pub(crate) mod test { assert_eq!(location, "s3://data.db/table/data/part-00000-test.parquet"); // test custom data location - table_metadata.properties.insert( + table_metadata.properties.other.insert( WRITE_FOLDER_STORAGE_LOCATION.to_string(), "s3://data.db/table/data_1".to_string(), ); @@ -213,7 +213,7 @@ pub(crate) mod test { "s3://data.db/table/data_1/part-00001-test.parquet" ); - table_metadata.properties.insert( + table_metadata.properties.other.insert( WRITE_DATA_LOCATION.to_string(), "s3://data.db/table/data_2".to_string(), ); @@ -226,7 +226,7 @@ pub(crate) mod test { "s3://data.db/table/data_2/part-00002-test.parquet" ); - table_metadata.properties.insert( + table_metadata.properties.other.insert( WRITE_DATA_LOCATION.to_string(), // invalid table location "s3://data.db/data_3".to_string(), @@ -291,7 +291,7 @@ pub(crate) mod test { snapshots: HashMap::default(), current_snapshot_id: None, last_sequence_number: 1, - properties: HashMap::new(), + properties: TableProperties::default(), snapshot_log: Vec::new(), metadata_log: vec![], refs: HashMap::new(), diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 0dea150d31..67a4ad456e 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -209,12 +209,7 @@ impl ExecutionPlan for IcebergWriteExec { let format_version = self.table.metadata().format_version(); // Get typed table properties - let table_props = self - .table - .metadata() - .table_properties() - .map_err(to_datafusion_error)?; - + let table_props = self.table.metadata().table_properties(); // Check data file format let file_format = DataFileFormat::from_str(&table_props.write_format_default) .map_err(to_datafusion_error)?;