diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index fad98cf1..4742327d 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -1773,7 +1773,7 @@ mod row_reader { allowed: impl FnOnce(&fcore::metadata::DataType) -> bool, ) -> Result<&'a fcore::metadata::DataType, String> { let col = get_column(columns, field)?; - if row.is_null_at(field) { + if row.is_null_at(field).map_err(|e| e.to_string())? { return Err(format!("field {field} is null")); } let dt = col.data_type(); @@ -1801,7 +1801,7 @@ mod row_reader { field: usize, ) -> Result { get_column(columns, field)?; - Ok(row.is_null_at(field)) + row.is_null_at(field).map_err(|e| e.to_string()) } pub fn get_bool( @@ -1812,7 +1812,7 @@ mod row_reader { validate(row, columns, field, "get_bool", |dt| { matches!(dt, fcore::metadata::DataType::Boolean(_)) })?; - Ok(row.get_boolean(field)) + row.get_boolean(field).map_err(|e| e.to_string()) } pub fn get_i32( @@ -1828,11 +1828,17 @@ mod row_reader { | fcore::metadata::DataType::Int(_) ) })?; - Ok(match dt { - fcore::metadata::DataType::TinyInt(_) => row.get_byte(field) as i32, - fcore::metadata::DataType::SmallInt(_) => row.get_short(field) as i32, - _ => row.get_int(field), - }) + match dt { + fcore::metadata::DataType::TinyInt(_) => row + .get_byte(field) + .map(|v| v as i32) + .map_err(|e| e.to_string()), + fcore::metadata::DataType::SmallInt(_) => row + .get_short(field) + .map(|v| v as i32) + .map_err(|e| e.to_string()), + _ => row.get_int(field).map_err(|e| e.to_string()), + } } pub fn get_i64( @@ -1843,7 +1849,7 @@ mod row_reader { validate(row, columns, field, "get_i64", |dt| { matches!(dt, fcore::metadata::DataType::BigInt(_)) })?; - Ok(row.get_long(field)) + row.get_long(field).map_err(|e| e.to_string()) } pub fn get_f32( @@ -1854,7 +1860,7 @@ mod row_reader { validate(row, columns, field, "get_f32", |dt| { matches!(dt, fcore::metadata::DataType::Float(_)) })?; - Ok(row.get_float(field)) + row.get_float(field).map_err(|e| e.to_string()) } pub fn get_f64( @@ -1865,7 +1871,7 @@ mod row_reader { validate(row, columns, field, "get_f64", |dt| { matches!(dt, fcore::metadata::DataType::Double(_)) })?; - Ok(row.get_double(field)) + row.get_double(field).map_err(|e| e.to_string()) } pub fn get_str<'a>( @@ -1879,10 +1885,12 @@ mod row_reader { fcore::metadata::DataType::Char(_) | fcore::metadata::DataType::String(_) ) })?; - Ok(match dt { - fcore::metadata::DataType::Char(ct) => row.get_char(field, ct.length() as usize), - _ => row.get_string(field), - }) + match dt { + fcore::metadata::DataType::Char(ct) => row + .get_char(field, ct.length() as usize) + .map_err(|e| e.to_string()), + _ => row.get_string(field).map_err(|e| e.to_string()), + } } pub fn get_bytes<'a>( @@ -1896,10 +1904,12 @@ mod row_reader { fcore::metadata::DataType::Binary(_) | fcore::metadata::DataType::Bytes(_) ) })?; - Ok(match dt { - fcore::metadata::DataType::Binary(bt) => row.get_binary(field, bt.length()), - _ => row.get_bytes(field), - }) + match dt { + fcore::metadata::DataType::Binary(bt) => row + .get_binary(field, bt.length()) + .map_err(|e| e.to_string()), + _ => row.get_bytes(field).map_err(|e| e.to_string()), + } } pub fn get_date_days( @@ -1910,7 +1920,9 @@ mod row_reader { validate(row, columns, field, "get_date_days", |dt| { matches!(dt, fcore::metadata::DataType::Date(_)) })?; - Ok(row.get_date(field).get_inner()) + row.get_date(field) + .map(|d| d.get_inner()) + .map_err(|e| e.to_string()) } pub fn get_time_millis( @@ -1921,7 +1933,9 @@ mod row_reader { validate(row, columns, field, "get_time_millis", |dt| { matches!(dt, fcore::metadata::DataType::Time(_)) })?; - Ok(row.get_time(field).get_inner()) + row.get_time(field) + .map(|t| t.get_inner()) + .map_err(|e| e.to_string()) } pub fn get_ts_millis( @@ -1937,12 +1951,14 @@ mod row_reader { ) })?; match dt { - fcore::metadata::DataType::TimestampLTz(ts) => Ok(row + fcore::metadata::DataType::TimestampLTz(ts) => row .get_timestamp_ltz(field, ts.precision()) - .get_epoch_millisecond()), - fcore::metadata::DataType::Timestamp(ts) => Ok(row + .map(|v| v.get_epoch_millisecond()) + .map_err(|e| e.to_string()), + fcore::metadata::DataType::Timestamp(ts) => row .get_timestamp_ntz(field, ts.precision()) - .get_millisecond()), + .map(|v| v.get_millisecond()) + .map_err(|e| e.to_string()), dt => Err(format!("get_ts_millis: unexpected type {dt}")), } } @@ -1960,12 +1976,14 @@ mod row_reader { ) })?; match dt { - fcore::metadata::DataType::TimestampLTz(ts) => Ok(row + fcore::metadata::DataType::TimestampLTz(ts) => row .get_timestamp_ltz(field, ts.precision()) - .get_nano_of_millisecond()), - fcore::metadata::DataType::Timestamp(ts) => Ok(row + .map(|v| v.get_nano_of_millisecond()) + .map_err(|e| e.to_string()), + fcore::metadata::DataType::Timestamp(ts) => row .get_timestamp_ntz(field, ts.precision()) - .get_nano_of_millisecond()), + .map(|v| v.get_nano_of_millisecond()) + .map_err(|e| e.to_string()), dt => Err(format!("get_ts_nanos: unexpected type {dt}")), } } @@ -1987,7 +2005,9 @@ mod row_reader { })?; match dt { fcore::metadata::DataType::Decimal(dd) => { - let decimal = row.get_decimal(field, dd.precision() as usize, dd.scale() as usize); + let decimal = row + .get_decimal(field, dd.precision() as usize, dd.scale() as usize) + .map_err(|e| e.to_string())?; Ok(decimal.to_big_decimal().to_string()) } dt => Err(format!("get_decimal_str: unexpected type {dt}")), diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index 073a1681..f8efe677 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -371,42 +371,42 @@ pub fn compacted_row_to_owned( let mut out = fcore::row::GenericRow::new(columns.len()); for (i, col) in columns.iter().enumerate() { - if row.is_null_at(i) { + if row.is_null_at(i)? { out.set_field(i, Datum::Null); continue; } let datum = match col.data_type() { - fcore::metadata::DataType::Boolean(_) => Datum::Bool(row.get_boolean(i)), - fcore::metadata::DataType::TinyInt(_) => Datum::Int8(row.get_byte(i)), - fcore::metadata::DataType::SmallInt(_) => Datum::Int16(row.get_short(i)), - fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)), - fcore::metadata::DataType::BigInt(_) => Datum::Int64(row.get_long(i)), - fcore::metadata::DataType::Float(_) => Datum::Float32(row.get_float(i).into()), - fcore::metadata::DataType::Double(_) => Datum::Float64(row.get_double(i).into()), + fcore::metadata::DataType::Boolean(_) => Datum::Bool(row.get_boolean(i)?), + fcore::metadata::DataType::TinyInt(_) => Datum::Int8(row.get_byte(i)?), + fcore::metadata::DataType::SmallInt(_) => Datum::Int16(row.get_short(i)?), + fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)?), + fcore::metadata::DataType::BigInt(_) => Datum::Int64(row.get_long(i)?), + fcore::metadata::DataType::Float(_) => Datum::Float32(row.get_float(i)?.into()), + fcore::metadata::DataType::Double(_) => Datum::Float64(row.get_double(i)?.into()), fcore::metadata::DataType::String(_) => { - Datum::String(Cow::Owned(row.get_string(i).to_string())) + Datum::String(Cow::Owned(row.get_string(i)?.to_string())) } fcore::metadata::DataType::Bytes(_) => { - Datum::Blob(Cow::Owned(row.get_bytes(i).to_vec())) + Datum::Blob(Cow::Owned(row.get_bytes(i)?.to_vec())) } - fcore::metadata::DataType::Date(_) => Datum::Date(row.get_date(i)), - fcore::metadata::DataType::Time(_) => Datum::Time(row.get_time(i)), + fcore::metadata::DataType::Date(_) => Datum::Date(row.get_date(i)?), + fcore::metadata::DataType::Time(_) => Datum::Time(row.get_time(i)?), fcore::metadata::DataType::Timestamp(dt) => { - Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision())) + Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision())?) } fcore::metadata::DataType::TimestampLTz(dt) => { - Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision())) + Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision())?) } fcore::metadata::DataType::Decimal(dt) => { - let decimal = row.get_decimal(i, dt.precision() as usize, dt.scale() as usize); + let decimal = row.get_decimal(i, dt.precision() as usize, dt.scale() as usize)?; Datum::Decimal(decimal) } fcore::metadata::DataType::Char(dt) => Datum::String(Cow::Owned( - row.get_char(i, dt.length() as usize).to_string(), + row.get_char(i, dt.length() as usize)?.to_string(), )), fcore::metadata::DataType::Binary(dt) => { - Datum::Blob(Cow::Owned(row.get_binary(i, dt.length()).to_vec())) + Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec())) } other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")), }; diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index bc2e956c..660cd6be 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -1256,91 +1256,119 @@ pub fn datum_to_python_value( use fcore::metadata::DataType; // Check for null first - if row.is_null_at(pos) { + if row + .is_null_at(pos) + .map_err(|e| FlussError::from_core_error(&e))? + { return Ok(py.None()); } match data_type { DataType::Boolean(_) => Ok(row .get_boolean(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::TinyInt(_) => Ok(row .get_byte(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::SmallInt(_) => Ok(row .get_short(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::Int(_) => Ok(row .get_int(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::BigInt(_) => Ok(row .get_long(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::Float(_) => Ok(row .get_float(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::Double(_) => Ok(row .get_double(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::String(_) => { - let s = row.get_string(pos); + let s = row + .get_string(pos) + .map_err(|e| FlussError::from_core_error(&e))?; Ok(s.into_pyobject(py)?.into_any().unbind()) } DataType::Char(char_type) => { - let s = row.get_char(pos, char_type.length() as usize); + let s = row + .get_char(pos, char_type.length() as usize) + .map_err(|e| FlussError::from_core_error(&e))?; Ok(s.into_pyobject(py)?.into_any().unbind()) } DataType::Bytes(_) => { - let b = row.get_bytes(pos); + let b = row + .get_bytes(pos) + .map_err(|e| FlussError::from_core_error(&e))?; Ok(PyBytes::new(py, b).into_any().unbind()) } DataType::Binary(binary_type) => { - let b = row.get_binary(pos, binary_type.length()); + let b = row + .get_binary(pos, binary_type.length()) + .map_err(|e| FlussError::from_core_error(&e))?; Ok(PyBytes::new(py, b).into_any().unbind()) } DataType::Decimal(decimal_type) => { - let decimal = row.get_decimal( - pos, - decimal_type.precision() as usize, - decimal_type.scale() as usize, - ); + let decimal = row + .get_decimal( + pos, + decimal_type.precision() as usize, + decimal_type.scale() as usize, + ) + .map_err(|e| FlussError::from_core_error(&e))?; rust_decimal_to_python(py, &decimal) } DataType::Date(_) => { - let date = row.get_date(pos); + let date = row + .get_date(pos) + .map_err(|e| FlussError::from_core_error(&e))?; rust_date_to_python(py, date) } DataType::Time(_) => { - let time = row.get_time(pos); + let time = row + .get_time(pos) + .map_err(|e| FlussError::from_core_error(&e))?; rust_time_to_python(py, time) } DataType::Timestamp(ts_type) => { - let ts = row.get_timestamp_ntz(pos, ts_type.precision()); + let ts = row + .get_timestamp_ntz(pos, ts_type.precision()) + .map_err(|e| FlussError::from_core_error(&e))?; rust_timestamp_ntz_to_python(py, ts) } DataType::TimestampLTz(ts_type) => { - let ts = row.get_timestamp_ltz(pos, ts_type.precision()); + let ts = row + .get_timestamp_ltz(pos, ts_type.precision()) + .map_err(|e| FlussError::from_core_error(&e))?; rust_timestamp_ltz_to_python(py, ts) } _ => Err(FlussError::new_err(format!( diff --git a/crates/examples/src/example_kv_table.rs b/crates/examples/src/example_kv_table.rs index 90788b14..8fb60baa 100644 --- a/crates/examples/src/example_kv_table.rs +++ b/crates/examples/src/example_kv_table.rs @@ -75,8 +75,8 @@ pub async fn main() -> Result<()> { let row = result.get_single_row()?.unwrap(); println!( "Found id={id}: name={}, age={}", - row.get_string(1), - row.get_long(2) + row.get_string(1)?, + row.get_long(2)? ); } @@ -92,8 +92,8 @@ pub async fn main() -> Result<()> { let row = result.get_single_row()?.unwrap(); println!( "Verified update: name={}, age={}", - row.get_string(1), - row.get_long(2) + row.get_string(1)?, + row.get_long(2)? ); println!("\n=== Deleting ==="); diff --git a/crates/examples/src/example_partitioned_kv_table.rs b/crates/examples/src/example_partitioned_kv_table.rs index e0471785..9cd2e7df 100644 --- a/crates/examples/src/example_partitioned_kv_table.rs +++ b/crates/examples/src/example_partitioned_kv_table.rs @@ -90,9 +90,9 @@ pub async fn main() -> Result<()> { let row = result.get_single_row()?.unwrap(); println!( "Found id={id}: region={}, zone={}, score={}", - row.get_string(1), - row.get_long(2), - row.get_long(3) + row.get_string(1)?, + row.get_long(2)?, + row.get_long(3)? ); } @@ -109,8 +109,8 @@ pub async fn main() -> Result<()> { let row = result.get_single_row()?.unwrap(); println!( "Verified update: region={}, zone={}", - row.get_string(1), - row.get_long(2) + row.get_string(1)?, + row.get_long(2)? ); println!("\n=== Deleting ==="); diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index cfe1627b..e4ad1fbd 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -83,9 +83,9 @@ pub async fn main() -> Result<()> { let row = record.row(); println!( "{{{}, {}, {}}}@{}", - row.get_int(0), - row.get_string(1), - row.get_long(2), + row.get_int(0)?, + row.get_string(1)?, + row.get_long(2)?, record.offset() ); } diff --git a/crates/fluss/src/client/table/append.rs b/crates/fluss/src/client/table/append.rs index 942253fa..a58433f3 100644 --- a/crates/fluss/src/client/table/append.rs +++ b/crates/fluss/src/client/table/append.rs @@ -17,6 +17,7 @@ use crate::client::table::partition_getter::{PartitionGetter, get_physical_path}; use crate::client::{WriteRecord, WriteResultFuture, WriterClient}; +use crate::error::Error::IllegalArgument; use crate::error::Result; use crate::metadata::{PhysicalTablePath, TableInfo, TablePath}; use crate::row::{ColumnarRow, InternalRow}; @@ -69,6 +70,21 @@ pub struct AppendWriter { } impl AppendWriter { + fn check_field_count(&self, row: &R) -> Result<()> { + let expected = self.table_info.get_row_type().fields().len(); + if row.get_field_count() != expected { + return Err(IllegalArgument { + message: format!( + "The field count of the row does not match the table schema. \ + Expected: {}, Actual: {}", + expected, + row.get_field_count() + ), + }); + } + Ok(()) + } + /// Appends a row to the table. /// /// This method returns a [`WriteResultFuture`] immediately after queueing the write, @@ -81,6 +97,7 @@ impl AppendWriter { /// A [`WriteResultFuture`] that can be awaited to wait for server acknowledgment, /// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery). pub fn append(&self, row: &R) -> Result { + self.check_field_count(row)?; let physical_table_path = Arc::new(get_physical_path( &self.table_path, self.partition_getter.as_ref(), diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index a1aad2d8..1115ded3 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -87,7 +87,7 @@ impl PartitionGetter { let mut partition_values = Vec::with_capacity(self.partitions.len()); for (data_type, field_getter) in &self.partitions { - let value = field_getter.get_field(row); + let value = field_getter.get_field(row)?; if value.is_null() { return Err(IllegalArgument { message: "Partition value shouldn't be null.".to_string(), diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 4b6f8095..3ec9106d 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -114,9 +114,9 @@ impl<'a> TableScan<'a> { /// let row = record.row(); /// println!( /// "{{{}, {}, {}}}@{}", - /// row.get_int(0), - /// row.get_string(2), - /// row.get_string(3), + /// row.get_int(0)?, + /// row.get_string(2)?, + /// row.get_string(3)?, /// record.offset() /// ); /// } @@ -188,8 +188,8 @@ impl<'a> TableScan<'a> { /// let row = record.row(); /// println!( /// "{{{}, {}}}@{}", - /// row.get_int(0), - /// row.get_string(1), + /// row.get_int(0)?, + /// row.get_string(1)?, /// record.offset() /// ); /// } diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 7057b901..52ec37b3 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -328,7 +328,7 @@ impl UpsertWriter { })?; encoder.start_new_row()?; for (pos, field_getter) in self.field_getters.iter().enumerate() { - let datum = field_getter.get_field(row); + let datum = field_getter.get_field(row)?; encoder.encode_field(pos, datum)?; } encoder.finish_row() diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 7fb9d34a..ea27836e 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -355,7 +355,7 @@ impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder { fn append(&mut self, row: &dyn InternalRow) -> Result { for (idx, getter) in self.field_getters.iter().enumerate() { - let datum = getter.get_field(row); + let datum = getter.get_field(row)?; let field_type = self.table_schema.field(idx).data_type(); let builder = self.arrow_column_builders diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs b/crates/fluss/src/record/kv/kv_record_batch.rs index eb89d69c..14ff2e91 100644 --- a/crates/fluss/src/record/kv/kv_record_batch.rs +++ b/crates/fluss/src/record/kv/kv_record_batch.rs @@ -445,7 +445,7 @@ mod tests { assert_eq!(record1.key().as_ref(), key1); assert!(!record1.is_deletion()); let row1 = record1.row(&*decoder).unwrap(); - assert_eq!(row1.get_bytes(0), &[1, 2, 3, 4, 5]); + assert_eq!(row1.get_bytes(0).unwrap(), &[1, 2, 3, 4, 5]); let record2 = iter.next().unwrap().unwrap(); assert_eq!(record2.key().as_ref(), key2); diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs index 83707648..0e806337 100644 --- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -555,14 +555,14 @@ mod tests { 1 => { assert_eq!(rec.key().as_ref(), key1); let row = rec.row(&*decoder).unwrap(); - assert_eq!(row.get_int(0), 42); - assert_eq!(row.get_string(1), "hello"); + assert_eq!(row.get_int(0)?, 42); + assert_eq!(row.get_string(1)?, "hello"); } 2 => { assert_eq!(rec.key().as_ref(), key2); let row = rec.row(&*decoder).unwrap(); - assert_eq!(row.get_int(0), 100); - assert_eq!(row.get_string(1), "world"); + assert_eq!(row.get_int(0)?, 100); + assert_eq!(row.get_string(1)?, "world"); } 3 => { assert_eq!(rec.key().as_ref(), key3); diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index 50db32b1..c07fe97c 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -15,15 +15,17 @@ // specific language governing permissions and limitations // under the License. +use crate::error::Error::IllegalArgument; +use crate::error::Result; use crate::row::InternalRow; -use arrow::array::{ - Array, AsArray, BinaryArray, Date32Array, Decimal128Array, FixedSizeBinaryArray, Float32Array, - Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringArray, - Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, +use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; +use arrow::array::{Array, AsArray, BinaryArray, RecordBatch, StringArray}; +use arrow::datatypes::{ + DataType as ArrowDataType, Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, + Int16Type, Int32Type, Int64Type, Time32MillisecondType, Time32SecondType, + Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; -use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; use std::sync::Arc; #[derive(Clone)] @@ -59,6 +61,18 @@ impl ColumnarRow { &self.record_batch } + fn column(&self, pos: usize) -> Result<&Arc> { + self.record_batch + .columns() + .get(pos) + .ok_or_else(|| IllegalArgument { + message: format!( + "column index {pos} out of bounds (batch has {} columns)", + self.record_batch.num_columns() + ), + }) + } + /// Generic helper to read timestamp from Arrow, handling all TimeUnit conversions. /// Like Java, the precision parameter is ignored - conversion is determined by Arrow TimeUnit. fn read_timestamp_from_arrow( @@ -66,114 +80,133 @@ impl ColumnarRow { pos: usize, _precision: u32, construct_compact: impl FnOnce(i64) -> T, - construct_with_nanos: impl FnOnce(i64, i32) -> crate::error::Result, - ) -> T { - let schema = self.record_batch.schema(); - let arrow_field = schema.field(pos); - let column = self.record_batch.column(pos); - - // Read value based on the actual Arrow timestamp type - let value = match arrow_field.data_type() { - ArrowDataType::Timestamp(TimeUnit::Second, _) => column - .as_any() - .downcast_ref::() - .expect("Expected TimestampSecondArray") - .value(self.row_id), - ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => column - .as_any() - .downcast_ref::() - .expect("Expected TimestampMillisecondArray") - .value(self.row_id), - ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => column - .as_any() - .downcast_ref::() - .expect("Expected TimestampMicrosecondArray") - .value(self.row_id), - ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => column - .as_any() - .downcast_ref::() - .expect("Expected TimestampNanosecondArray") - .value(self.row_id), - other => panic!("Expected Timestamp column at position {pos}, got {other:?}"), + construct_with_nanos: impl FnOnce(i64, i32) -> Result, + ) -> Result { + let column = self.column(pos)?; + + // Read value and time unit based on the actual Arrow timestamp type + let (value, time_unit) = match column.data_type() { + ArrowDataType::Timestamp(TimeUnit::Second, _) => ( + column + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected TimestampSecondArray at position {pos}"), + })? + .value(self.row_id), + TimeUnit::Second, + ), + ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => ( + column + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected TimestampMillisecondArray at position {pos}"), + })? + .value(self.row_id), + TimeUnit::Millisecond, + ), + ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => ( + column + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected TimestampMicrosecondArray at position {pos}"), + })? + .value(self.row_id), + TimeUnit::Microsecond, + ), + ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => ( + column + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected TimestampNanosecondArray at position {pos}"), + })? + .value(self.row_id), + TimeUnit::Nanosecond, + ), + other => { + return Err(IllegalArgument { + message: format!("expected Timestamp column at position {pos}, got {other:?}"), + }); + } }; // Convert based on Arrow TimeUnit - let (millis, nanos) = match arrow_field.data_type() { - ArrowDataType::Timestamp(time_unit, _) => match time_unit { - TimeUnit::Second => (value * 1000, 0), - TimeUnit::Millisecond => (value, 0), - TimeUnit::Microsecond => { - // Use Euclidean division so that nanos is always non-negative, - // even for timestamps before the Unix epoch. - let millis = value.div_euclid(1000); - let nanos = (value.rem_euclid(1000) * 1000) as i32; - (millis, nanos) - } - TimeUnit::Nanosecond => { - // Use Euclidean division so that nanos is always in [0, 999_999]. - let millis = value.div_euclid(1_000_000); - let nanos = value.rem_euclid(1_000_000) as i32; - (millis, nanos) - } - }, - _ => unreachable!(), + let (millis, nanos) = match time_unit { + TimeUnit::Second => (value * 1000, 0), + TimeUnit::Millisecond => (value, 0), + TimeUnit::Microsecond => { + // Use Euclidean division so that nanos is always non-negative, + // even for timestamps before the Unix epoch. + let millis = value.div_euclid(1000); + let nanos = (value.rem_euclid(1000) * 1000) as i32; + (millis, nanos) + } + TimeUnit::Nanosecond => { + // Use Euclidean division so that nanos is always in [0, 999_999]. + let millis = value.div_euclid(1_000_000); + let nanos = value.rem_euclid(1_000_000) as i32; + (millis, nanos) + } }; if nanos == 0 { - construct_compact(millis) + Ok(construct_compact(millis)) } else { - // nanos is guaranteed to be in valid range [0, 999_999] by arithmetic - construct_with_nanos(millis, nanos).expect("nanos in valid range by construction") + construct_with_nanos(millis, nanos) } } /// Read date value from Arrow Date32Array - fn read_date_from_arrow(&self, pos: usize) -> i32 { - self.record_batch - .column(pos) - .as_any() - .downcast_ref::() - .expect("Expected Date32Array") - .value(self.row_id) + fn read_date_from_arrow(&self, pos: usize) -> Result { + Ok(self + .column(pos)? + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected Date32Array at position {pos}"), + })? + .value(self.row_id)) } /// Read time value from Arrow Time32/Time64 arrays, converting to milliseconds - fn read_time_from_arrow(&self, pos: usize) -> i32 { - let schema = self.record_batch.schema(); - let arrow_field = schema.field(pos); - let column = self.record_batch.column(pos); + fn read_time_from_arrow(&self, pos: usize) -> Result { + let column = self.column(pos)?; - match arrow_field.data_type() { + match column.data_type() { ArrowDataType::Time32(TimeUnit::Second) => { let value = column - .as_any() - .downcast_ref::() - .expect("Expected Time32SecondArray") + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected Time32SecondArray at position {pos}"), + })? .value(self.row_id); - value * 1000 // Convert seconds to milliseconds + Ok(value * 1000) // Convert seconds to milliseconds } - ArrowDataType::Time32(TimeUnit::Millisecond) => column - .as_any() - .downcast_ref::() - .expect("Expected Time32MillisecondArray") - .value(self.row_id), + ArrowDataType::Time32(TimeUnit::Millisecond) => Ok(column + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected Time32MillisecondArray at position {pos}"), + })? + .value(self.row_id)), ArrowDataType::Time64(TimeUnit::Microsecond) => { let value = column - .as_any() - .downcast_ref::() - .expect("Expected Time64MicrosecondArray") + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected Time64MicrosecondArray at position {pos}"), + })? .value(self.row_id); - (value / 1000) as i32 // Convert microseconds to milliseconds + Ok((value / 1000) as i32) // Convert microseconds to milliseconds } ArrowDataType::Time64(TimeUnit::Nanosecond) => { let value = column - .as_any() - .downcast_ref::() - .expect("Expected Time64NanosecondArray") + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected Time64NanosecondArray at position {pos}"), + })? .value(self.row_id); - (value / 1_000_000) as i32 // Convert nanoseconds to milliseconds + Ok((value / 1_000_000) as i32) // Convert nanoseconds to milliseconds } - other => panic!("Expected Time column at position {pos}, got {other:?}"), + other => Err(IllegalArgument { + message: format!("expected Time column at position {pos}, got {other:?}"), + }), } } } @@ -183,106 +216,121 @@ impl InternalRow for ColumnarRow { self.record_batch.num_columns() } - fn is_null_at(&self, pos: usize) -> bool { - self.record_batch.column(pos).is_null(self.row_id) + fn is_null_at(&self, pos: usize) -> Result { + Ok(self.column(pos)?.is_null(self.row_id)) } - fn get_boolean(&self, pos: usize) -> bool { - self.record_batch - .column(pos) - .as_boolean() - .value(self.row_id) + fn get_boolean(&self, pos: usize) -> Result { + Ok(self + .column(pos)? + .as_boolean_opt() + .ok_or_else(|| IllegalArgument { + message: format!("expected boolean array at position {pos}"), + })? + .value(self.row_id)) } - fn get_byte(&self, pos: usize) -> i8 { - self.record_batch - .column(pos) - .as_any() - .downcast_ref::() - .expect("Expect byte array") - .value(self.row_id) + fn get_byte(&self, pos: usize) -> Result { + Ok(self + .column(pos)? + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected byte array at position {pos}"), + })? + .value(self.row_id)) } - fn get_short(&self, pos: usize) -> i16 { - self.record_batch - .column(pos) - .as_any() - .downcast_ref::() - .expect("Expect short array") - .value(self.row_id) + fn get_short(&self, pos: usize) -> Result { + Ok(self + .column(pos)? + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected short array at position {pos}"), + })? + .value(self.row_id)) } - fn get_int(&self, pos: usize) -> i32 { - self.record_batch - .column(pos) - .as_any() - .downcast_ref::() - .expect("Expect int array") - .value(self.row_id) + fn get_int(&self, pos: usize) -> Result { + Ok(self + .column(pos)? + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected int array at position {pos}"), + })? + .value(self.row_id)) } - fn get_long(&self, pos: usize) -> i64 { - self.record_batch - .column(pos) - .as_any() - .downcast_ref::() - .expect("Expect long array") - .value(self.row_id) + fn get_long(&self, pos: usize) -> Result { + Ok(self + .column(pos)? + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected long array at position {pos}"), + })? + .value(self.row_id)) } - fn get_float(&self, pos: usize) -> f32 { - self.record_batch - .column(pos) - .as_any() - .downcast_ref::() - .expect("Expect float32 array") - .value(self.row_id) + fn get_float(&self, pos: usize) -> Result { + Ok(self + .column(pos)? + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected float32 array at position {pos}"), + })? + .value(self.row_id)) } - fn get_double(&self, pos: usize) -> f64 { - self.record_batch - .column(pos) - .as_any() - .downcast_ref::() - .expect("Expect float64 array") - .value(self.row_id) + fn get_double(&self, pos: usize) -> Result { + Ok(self + .column(pos)? + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!("expected float64 array at position {pos}"), + })? + .value(self.row_id)) } - fn get_char(&self, pos: usize, _length: usize) -> &str { - self.record_batch - .column(pos) + fn get_char(&self, pos: usize, _length: usize) -> Result<&str> { + Ok(self + .column(pos)? .as_any() .downcast_ref::() - .expect("Expected String array for char type") - .value(self.row_id) + .ok_or_else(|| IllegalArgument { + message: format!("expected String array for char type at position {pos}"), + })? + .value(self.row_id)) } - fn get_string(&self, pos: usize) -> &str { - self.record_batch - .column(pos) + fn get_string(&self, pos: usize) -> Result<&str> { + Ok(self + .column(pos)? .as_any() .downcast_ref::() - .expect("Expected String array.") - .value(self.row_id) + .ok_or_else(|| IllegalArgument { + message: format!("expected String array at position {pos}"), + })? + .value(self.row_id)) } - fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> crate::row::Decimal { + fn get_decimal( + &self, + pos: usize, + precision: usize, + scale: usize, + ) -> Result { use arrow::datatypes::DataType; - let column = self.record_batch.column(pos); + let column = self.column(pos)?; let array = column - .as_any() - .downcast_ref::() - .unwrap_or_else(|| { - panic!( - "Expected Decimal128Array at column {}, found: {:?}", - pos, + .as_primitive_opt::() + .ok_or_else(|| IllegalArgument { + message: format!( + "expected Decimal128Array at column {pos}, found: {:?}", column.data_type() - ) - }); + ), + })?; // Contract: caller must check is_null_at() before calling get_decimal. - // Calling on null value violates the contract and returns garbage data debug_assert!( !array.is_null(self.row_id), "get_decimal called on null value at pos {} row {}", @@ -290,12 +338,16 @@ impl InternalRow for ColumnarRow { self.row_id ); - // Read scale from Arrow schema field metadata - let schema = self.record_batch.schema(); - let field = schema.field(pos); - let arrow_scale = match field.data_type() { + // Read scale from Arrow column data type + let arrow_scale = match column.data_type() { DataType::Decimal128(_p, s) => *s as i64, - dt => panic!("Expected Decimal128 data type at column {pos}, found: {dt:?}"), + dt => { + return Err(IllegalArgument { + message: format!( + "expected Decimal128 data type at column {pos}, found: {dt:?}" + ), + }); + } }; let i128_val = array.value(self.row_id); @@ -307,60 +359,53 @@ impl InternalRow for ColumnarRow { precision as u32, scale as u32, ) - .unwrap_or_else(|e| { - panic!( - "Failed to create Decimal at column {} row {}: {}", - pos, self.row_id, e - ) - }) } - fn get_date(&self, pos: usize) -> crate::row::datum::Date { - crate::row::datum::Date::new(self.read_date_from_arrow(pos)) + fn get_date(&self, pos: usize) -> Result { + Ok(Date::new(self.read_date_from_arrow(pos)?)) } - fn get_time(&self, pos: usize) -> crate::row::datum::Time { - crate::row::datum::Time::new(self.read_time_from_arrow(pos)) + fn get_time(&self, pos: usize) -> Result