diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..aa8fb47478 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -32,6 +32,7 @@ use crate::transaction::{ActionCommit, TransactionAction}; /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction { check_duplicate: bool, + check_added_data_files: bool, // below are properties used to create SnapshotProducer when commit commit_uuid: Option, key_metadata: Option>, @@ -43,6 +44,7 @@ impl FastAppendAction { pub(crate) fn new() -> Self { Self { check_duplicate: true, + check_added_data_files: true, commit_uuid: None, key_metadata: None, snapshot_properties: HashMap::default(), @@ -56,6 +58,12 @@ impl FastAppendAction { self } + /// Set whether to check added data files + pub fn with_check_added_data_files(mut self, v: bool) -> Self { + self.check_added_data_files = v; + self + } + /// Add data files to the snapshot. pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { self.added_data_files.extend(data_files); @@ -92,8 +100,10 @@ impl TransactionAction for FastAppendAction { self.added_data_files.clone(), ); - // validate added files - snapshot_producer.validate_added_data_files()?; + // Checks added files + if self.check_added_data_files { + snapshot_producer.validate_added_data_files()?; + } // Checks duplicate files if self.check_duplicate { @@ -333,4 +343,154 @@ mod tests { ); assert_eq!(data_file, *manifest.entries()[0].data_file()); } + + #[tokio::test] + async fn test_fast_append_with_check_duplicate_false() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let action = tx.fast_append().with_check_duplicate(false); // set with_check_duplicate to false on the action + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/3.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + + let action = action.add_data_files(vec![data_file.clone()]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); + + // check updates and requirements + assert!( + matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) + ); + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id + } + ], + requirements + ); + + // check manifest list + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + unreachable!() + }; + let manifest_list = new_snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(1, manifest_list.entries().len()); + assert_eq!( + manifest_list.entries()[0].sequence_number, + new_snapshot.sequence_number() + ); + + // check manifest + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(1, manifest.entries().len()); + assert_eq!( + new_snapshot.sequence_number(), + manifest.entries()[0] + .sequence_number() + .expect("Inherit sequence number by load manifest") + ); + + assert_eq!( + new_snapshot.snapshot_id(), + manifest.entries()[0].snapshot_id().unwrap() + ); + assert_eq!(data_file, *manifest.entries()[0].data_file()); + } + + #[tokio::test] + async fn test_fast_append_with_check_added_data_files_false() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let action = tx.fast_append().with_check_added_data_files(false); // set with_check_added_data_files to false on the action + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/3.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + + let action = action.add_data_files(vec![data_file.clone()]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); + + // check updates and requirements + assert!( + matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) + ); + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id + } + ], + requirements + ); + + // check manifest list + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + unreachable!() + }; + let manifest_list = new_snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(1, manifest_list.entries().len()); + assert_eq!( + manifest_list.entries()[0].sequence_number, + new_snapshot.sequence_number() + ); + + // check manifest + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(1, manifest.entries().len()); + assert_eq!( + new_snapshot.sequence_number(), + manifest.entries()[0] + .sequence_number() + .expect("Inherit sequence number by load manifest") + ); + + assert_eq!( + new_snapshot.snapshot_id(), + manifest.entries()[0].snapshot_id().unwrap() + ); + assert_eq!(data_file, *manifest.entries()[0].data_file()); + } }