Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 162 additions & 2 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::transaction::{ActionCommit, TransactionAction};
/// FastAppendAction is a transaction action for fast append data files to the table.
pub struct FastAppendAction {
check_duplicate: bool,
check_added_data_files: bool,
// below are properties used to create SnapshotProducer when commit
commit_uuid: Option<Uuid>,
key_metadata: Option<Vec<u8>>,
Expand All @@ -43,6 +44,7 @@ impl FastAppendAction {
pub(crate) fn new() -> Self {
Self {
check_duplicate: true,
check_added_data_files: true,
commit_uuid: None,
key_metadata: None,
snapshot_properties: HashMap::default(),
Expand All @@ -56,6 +58,12 @@ impl FastAppendAction {
self
}

/// Set whether to check added data files
pub fn with_check_added_data_files(mut self, v: bool) -> Self {
self.check_added_data_files = v;
self
}

/// Add data files to the snapshot.
pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
self.added_data_files.extend(data_files);
Expand Down Expand Up @@ -92,8 +100,10 @@ impl TransactionAction for FastAppendAction {
self.added_data_files.clone(),
);

// validate added files
snapshot_producer.validate_added_data_files()?;
// Checks added files
if self.check_added_data_files {
snapshot_producer.validate_added_data_files()?;
}

// Checks duplicate files
if self.check_duplicate {
Expand Down Expand Up @@ -333,4 +343,154 @@ mod tests {
);
assert_eq!(data_file, *manifest.entries()[0].data_file());
}

#[tokio::test]
async fn test_fast_append_with_check_duplicate_false() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);
let action = tx.fast_append().with_check_duplicate(false); // set with_check_duplicate to false on the action

let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/3.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap();

let action = action.add_data_files(vec![data_file.clone()]);
let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
let updates = action_commit.take_updates();
let requirements = action_commit.take_requirements();

// check updates and requirements
assert!(
matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH)
);
assert_eq!(
vec![
TableRequirement::UuidMatch {
uuid: table.metadata().uuid()
},
TableRequirement::RefSnapshotIdMatch {
r#ref: MAIN_BRANCH.to_string(),
snapshot_id: table.metadata().current_snapshot_id
}
],
requirements
);

// check manifest list
let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
snapshot
} else {
unreachable!()
};
let manifest_list = new_snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();
assert_eq!(1, manifest_list.entries().len());
assert_eq!(
manifest_list.entries()[0].sequence_number,
new_snapshot.sequence_number()
);

// check manifest
let manifest = manifest_list.entries()[0]
.load_manifest(table.file_io())
.await
.unwrap();
assert_eq!(1, manifest.entries().len());
assert_eq!(
new_snapshot.sequence_number(),
manifest.entries()[0]
.sequence_number()
.expect("Inherit sequence number by load manifest")
);

assert_eq!(
new_snapshot.snapshot_id(),
manifest.entries()[0].snapshot_id().unwrap()
);
assert_eq!(data_file, *manifest.entries()[0].data_file());
}

#[tokio::test]
async fn test_fast_append_with_check_added_data_files_false() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);
let action = tx.fast_append().with_check_added_data_files(false); // set with_check_added_data_files to false on the action

let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/3.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap();

let action = action.add_data_files(vec![data_file.clone()]);
let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
let updates = action_commit.take_updates();
let requirements = action_commit.take_requirements();

// check updates and requirements
assert!(
matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH)
);
assert_eq!(
vec![
TableRequirement::UuidMatch {
uuid: table.metadata().uuid()
},
TableRequirement::RefSnapshotIdMatch {
r#ref: MAIN_BRANCH.to_string(),
snapshot_id: table.metadata().current_snapshot_id
}
],
requirements
);

// check manifest list
let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
snapshot
} else {
unreachable!()
};
let manifest_list = new_snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();
assert_eq!(1, manifest_list.entries().len());
assert_eq!(
manifest_list.entries()[0].sequence_number,
new_snapshot.sequence_number()
);

// check manifest
let manifest = manifest_list.entries()[0]
.load_manifest(table.file_io())
.await
.unwrap();
assert_eq!(1, manifest.entries().len());
assert_eq!(
new_snapshot.sequence_number(),
manifest.entries()[0]
.sequence_number()
.expect("Inherit sequence number by load manifest")
);

assert_eq!(
new_snapshot.snapshot_id(),
manifest.entries()[0].snapshot_id().unwrap()
);
assert_eq!(data_file, *manifest.entries()[0].data_file());
}
}
Loading