From 6d2ced2da2a6d72148337fa136ea9e5ce0b28c29 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 7 May 2025 14:05:18 -0400 Subject: [PATCH 1/3] Improve error messages if schema hint mismatches with parquet scheam --- arrow-array/src/record_batch.rs | 4 +- parquet/src/arrow/arrow_reader/filter.rs | 8 ++ parquet/src/arrow/arrow_reader/mod.rs | 148 ++++++++++++++++++----- 3 files changed, 129 insertions(+), 31 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 7bf7dc3a3944..cdbd62049637 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1073,8 +1073,8 @@ mod tests { let a = Int64Array::from(vec![1, 2, 3, 4, 5]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]); - assert!(batch.is_err()); + let err = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap_err(); + assert_eq!(err.to_string(), "Invalid argument error: column types must match schema types, expected Int32 but found Int64 at column index 0"); } #[test] diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index 2e22f7e01cf0..88a94fa375af 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -18,6 +18,7 @@ use crate::arrow::ProjectionMask; use arrow_array::{BooleanArray, RecordBatch}; use arrow_schema::ArrowError; +use std::fmt::{Debug, Formatter}; /// A predicate operating on [`RecordBatch`] /// @@ -108,11 +109,18 @@ where /// not contiguous. /// /// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection + pub struct RowFilter { /// A list of [`ArrowPredicate`] pub(crate) predicates: Vec>, } +impl Debug for RowFilter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "RowFilter {{ {} predicates: }}", self.predicates.len()) + } +} + impl RowFilter { /// Create a new [`RowFilter`] from an array of [`ArrowPredicate`] pub fn new(predicates: Vec>) -> Self { diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 8bbe175dafb8..4f73e7ef2a68 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -25,6 +25,7 @@ use arrow_select::filter::prep_null_mask_filter; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; use std::collections::VecDeque; +use std::fmt::{Debug, Formatter}; use std::sync::Arc; pub use crate::arrow::array_reader::RowGroups; @@ -112,6 +113,24 @@ pub struct ArrowReaderBuilder { pub(crate) offset: Option, } +impl Debug for ArrowReaderBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArrowReaderBuilder") + .field("input", &self.input) + .field("metadata", &self.metadata) + .field("schema", &self.schema) + .field("fields", &self.fields) + .field("batch_size", &self.batch_size) + .field("row_groups", &self.row_groups) + .field("projection", &self.projection) + .field("filter", &self.filter) + .field("selection", &self.selection) + .field("limit", &self.limit) + .field("offset", &self.offset) + .finish() + } +} + impl ArrowReaderBuilder { pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self { Self { @@ -506,37 +525,47 @@ impl ArrowReaderMetadata { // parquet_to_arrow_field_levels is expected to throw an error if the schemas have // different lengths, but we check here to be safe. if inferred_len != supplied_len { - Err(arrow_err!(format!( - "incompatible arrow schema, expected {} columns received {}", + return Err(arrow_err!(format!( + "Incompatible supplied Arrow schema: expected {} columns received {}", inferred_len, supplied_len - ))) - } else { - let diff_fields: Vec<_> = supplied_schema - .fields() - .iter() - .zip(fields.iter()) - .filter_map(|(field1, field2)| { - if field1 != field2 { - Some(field1.name().clone()) - } else { - None - } - }) - .collect(); + ))); + } - if !diff_fields.is_empty() { - Err(ParquetError::ArrowError(format!( - "incompatible arrow schema, the following fields could not be cast: [{}]", - diff_fields.join(", ") - ))) - } else { - Ok(Self { - metadata, - schema: supplied_schema, - fields: field_levels.levels.map(Arc::new), - }) + let mut errors = Vec::new(); + + let field_iter = supplied_schema.fields().iter().zip(fields.iter()); + + for (field1, field2) in field_iter { + if field1.data_type() != field2.data_type() { + errors.push(format!( + "data type mismatch for field {}: requested {:?} but found {:?}", + field1.name(), + field1.data_type(), + field2.data_type() + )); + } + if field1.is_nullable() != field2.is_nullable() { + errors.push(format!( + "nullability mismatch for field {}: expected {:?} but found {:?}", + field1.name(), + field1.is_nullable(), + field2.is_nullable() + )); } } + + if !errors.is_empty() { + let message = errors.join(", "); + return Err(ParquetError::ArrowError(format!( + "Incompatible supplied Arrow schema: {message}", + ))); + } + + Ok(Self { + metadata, + schema: supplied_schema, + fields: field_levels.levels.map(Arc::new), + }) } /// Returns a reference to the [`ParquetMetaData`] for this parquet file @@ -559,6 +588,12 @@ impl ArrowReaderMetadata { /// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async pub struct SyncReader(T); +impl Debug for SyncReader { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("SyncReader").field(&self.0).finish() + } +} + /// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file /// /// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] @@ -3462,7 +3497,7 @@ mod tests { Field::new("col2_valid", ArrowDataType::Int32, false), Field::new("col3_invalid", ArrowDataType::Int32, false), ])), - "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]", + "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64", ); } @@ -3500,10 +3535,65 @@ mod tests { false, ), ])), - "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]", + "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \ + requested Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \ + but found Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])", ); } + /// Return parquet data with a single column of utf8 strings + fn utf8_parquet() -> Bytes { + let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]); + let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap(); + let props = None; + // write parquet file with non nullable strings + let mut parquet_data = vec![]; + let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + Bytes::from(parquet_data) + } + + #[test] + fn test_schema_error_bad_types() { + // verify incompatible schemas error on read + let parquet_data = utf8_parquet(); + + // Ask to read it back with an incompatible schema (int vs string) + let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new( + "column1", + arrow::datatypes::DataType::Int32, + false, + )])); + + // read it back out + let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone()); + let err = + ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options) + .unwrap_err(); + assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8") + } + + #[test] + fn test_schema_error_bad_nullability() { + // verify incompatible schemas error on read + let parquet_data = utf8_parquet(); + + // Ask to read it back with an incompatible schema (nullability mismatch) + let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new( + "column1", + arrow::datatypes::DataType::Utf8, + true, + )])); + + // read it back out + let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone()); + let err = + ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options) + .unwrap_err(); + assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false") + } + #[test] fn test_read_binary_as_utf8() { let file = write_parquet_from_iter(vec![ From 7c21d5f22c2e02d7be3904319e8447d065c4eb98 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 7 May 2025 14:13:39 -0400 Subject: [PATCH 2/3] clippy --- parquet/src/arrow/arrow_reader/filter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index 88a94fa375af..21ff4a6c9f94 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -109,7 +109,6 @@ where /// not contiguous. /// /// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection - pub struct RowFilter { /// A list of [`ArrowPredicate`] pub(crate) predicates: Vec>, From db6cfe784521a8155e9ef8d7e662e37b9b4d52f7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 15 May 2025 16:30:52 -0400 Subject: [PATCH 3/3] Add vestigal metadata check --- parquet/src/arrow/arrow_reader/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 58a96d04a9c5..ad1d455ce599 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -565,6 +565,14 @@ impl ArrowReaderMetadata { field2.is_nullable() )); } + if field1.metadata() != field2.metadata() { + errors.push(format!( + "metadata mismatch for field {}: expected {:?} but found {:?}", + field1.name(), + field1.metadata(), + field2.metadata() + )); + } } if !errors.is_empty() {