From 7057164910298d077d4691ab1d6d477b483ae1e2 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 22 Feb 2026 15:29:06 +0000 Subject: [PATCH 1/5] Fix panic / crashes in Rust and C++ --- bindings/cpp/src/lib.rs | 76 ++++--- bindings/cpp/src/types.rs | 32 +-- bindings/python/src/table.rs | 51 +++-- crates/examples/src/example_kv_table.rs | 8 +- .../src/example_partitioned_kv_table.rs | 10 +- crates/examples/src/example_table.rs | 6 +- crates/fluss/src/client/table/append.rs | 17 ++ .../src/client/table/partition_getter.rs | 2 +- crates/fluss/src/client/table/scanner.rs | 10 +- crates/fluss/src/client/table/upsert.rs | 2 +- crates/fluss/src/record/arrow.rs | 2 +- crates/fluss/src/record/kv/kv_record_batch.rs | 2 +- .../src/record/kv/kv_record_batch_builder.rs | 8 +- crates/fluss/src/row/column.rs | 211 ++++++++++-------- .../fluss/src/row/compacted/compacted_row.rs | 83 +++---- .../src/row/encode/compacted_key_encoder.rs | 2 +- crates/fluss/src/row/field_getter.rs | 43 ++-- crates/fluss/src/row/mod.rs | 203 +++++++++++------ crates/fluss/src/row/row_decoder.rs | 8 +- crates/fluss/tests/integration/kv_table.rs | 188 ++++++++-------- crates/fluss/tests/integration/log_table.rs | 194 ++++++++++++---- .../tests/integration/table_remote_scan.rs | 14 +- 22 files changed, 736 insertions(+), 436 deletions(-) diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index fad98cf1..5ebeff2f 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -1812,7 +1812,7 @@ mod row_reader { validate(row, columns, field, "get_bool", |dt| { matches!(dt, fcore::metadata::DataType::Boolean(_)) })?; - Ok(row.get_boolean(field)) + row.get_boolean(field).map_err(|e| e.to_string()) } pub fn get_i32( @@ -1828,11 +1828,17 @@ mod row_reader { | fcore::metadata::DataType::Int(_) ) })?; - Ok(match dt { - fcore::metadata::DataType::TinyInt(_) => row.get_byte(field) as i32, - fcore::metadata::DataType::SmallInt(_) => row.get_short(field) as i32, - _ => row.get_int(field), - }) + match dt { + fcore::metadata::DataType::TinyInt(_) => row + .get_byte(field) + .map(|v| v as i32) + .map_err(|e| e.to_string()), + fcore::metadata::DataType::SmallInt(_) => row + .get_short(field) + .map(|v| v as i32) + .map_err(|e| e.to_string()), + _ => row.get_int(field).map_err(|e| e.to_string()), + } } pub fn get_i64( @@ -1843,7 +1849,7 @@ mod row_reader { validate(row, columns, field, "get_i64", |dt| { matches!(dt, fcore::metadata::DataType::BigInt(_)) })?; - Ok(row.get_long(field)) + row.get_long(field).map_err(|e| e.to_string()) } pub fn get_f32( @@ -1854,7 +1860,7 @@ mod row_reader { validate(row, columns, field, "get_f32", |dt| { matches!(dt, fcore::metadata::DataType::Float(_)) })?; - Ok(row.get_float(field)) + row.get_float(field).map_err(|e| e.to_string()) } pub fn get_f64( @@ -1865,7 +1871,7 @@ mod row_reader { validate(row, columns, field, "get_f64", |dt| { matches!(dt, fcore::metadata::DataType::Double(_)) })?; - Ok(row.get_double(field)) + row.get_double(field).map_err(|e| e.to_string()) } pub fn get_str<'a>( @@ -1879,10 +1885,12 @@ mod row_reader { fcore::metadata::DataType::Char(_) | fcore::metadata::DataType::String(_) ) })?; - Ok(match dt { - fcore::metadata::DataType::Char(ct) => row.get_char(field, ct.length() as usize), - _ => row.get_string(field), - }) + match dt { + fcore::metadata::DataType::Char(ct) => row + .get_char(field, ct.length() as usize) + .map_err(|e| e.to_string()), + _ => row.get_string(field).map_err(|e| e.to_string()), + } } pub fn get_bytes<'a>( @@ -1896,10 +1904,12 @@ mod row_reader { fcore::metadata::DataType::Binary(_) | fcore::metadata::DataType::Bytes(_) ) })?; - Ok(match dt { - fcore::metadata::DataType::Binary(bt) => row.get_binary(field, bt.length()), - _ => row.get_bytes(field), - }) + match dt { + fcore::metadata::DataType::Binary(bt) => row + .get_binary(field, bt.length()) + .map_err(|e| e.to_string()), + _ => row.get_bytes(field).map_err(|e| e.to_string()), + } } pub fn get_date_days( @@ -1910,7 +1920,9 @@ mod row_reader { validate(row, columns, field, "get_date_days", |dt| { matches!(dt, fcore::metadata::DataType::Date(_)) })?; - Ok(row.get_date(field).get_inner()) + row.get_date(field) + .map(|d| d.get_inner()) + .map_err(|e| e.to_string()) } pub fn get_time_millis( @@ -1921,7 +1933,9 @@ mod row_reader { validate(row, columns, field, "get_time_millis", |dt| { matches!(dt, fcore::metadata::DataType::Time(_)) })?; - Ok(row.get_time(field).get_inner()) + row.get_time(field) + .map(|t| t.get_inner()) + .map_err(|e| e.to_string()) } pub fn get_ts_millis( @@ -1937,12 +1951,14 @@ mod row_reader { ) })?; match dt { - fcore::metadata::DataType::TimestampLTz(ts) => Ok(row + fcore::metadata::DataType::TimestampLTz(ts) => row .get_timestamp_ltz(field, ts.precision()) - .get_epoch_millisecond()), - fcore::metadata::DataType::Timestamp(ts) => Ok(row + .map(|v| v.get_epoch_millisecond()) + .map_err(|e| e.to_string()), + fcore::metadata::DataType::Timestamp(ts) => row .get_timestamp_ntz(field, ts.precision()) - .get_millisecond()), + .map(|v| v.get_millisecond()) + .map_err(|e| e.to_string()), dt => Err(format!("get_ts_millis: unexpected type {dt}")), } } @@ -1960,12 +1976,14 @@ mod row_reader { ) })?; match dt { - fcore::metadata::DataType::TimestampLTz(ts) => Ok(row + fcore::metadata::DataType::TimestampLTz(ts) => row .get_timestamp_ltz(field, ts.precision()) - .get_nano_of_millisecond()), - fcore::metadata::DataType::Timestamp(ts) => Ok(row + .map(|v| v.get_nano_of_millisecond()) + .map_err(|e| e.to_string()), + fcore::metadata::DataType::Timestamp(ts) => row .get_timestamp_ntz(field, ts.precision()) - .get_nano_of_millisecond()), + .map(|v| v.get_nano_of_millisecond()) + .map_err(|e| e.to_string()), dt => Err(format!("get_ts_nanos: unexpected type {dt}")), } } @@ -1987,7 +2005,9 @@ mod row_reader { })?; match dt { fcore::metadata::DataType::Decimal(dd) => { - let decimal = row.get_decimal(field, dd.precision() as usize, dd.scale() as usize); + let decimal = row + .get_decimal(field, dd.precision() as usize, dd.scale() as usize) + .map_err(|e| e.to_string())?; Ok(decimal.to_big_decimal().to_string()) } dt => Err(format!("get_decimal_str: unexpected type {dt}")), diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index 073a1681..c296929b 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -377,36 +377,36 @@ pub fn compacted_row_to_owned( } let datum = match col.data_type() { - fcore::metadata::DataType::Boolean(_) => Datum::Bool(row.get_boolean(i)), - fcore::metadata::DataType::TinyInt(_) => Datum::Int8(row.get_byte(i)), - fcore::metadata::DataType::SmallInt(_) => Datum::Int16(row.get_short(i)), - fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)), - fcore::metadata::DataType::BigInt(_) => Datum::Int64(row.get_long(i)), - fcore::metadata::DataType::Float(_) => Datum::Float32(row.get_float(i).into()), - fcore::metadata::DataType::Double(_) => Datum::Float64(row.get_double(i).into()), + fcore::metadata::DataType::Boolean(_) => Datum::Bool(row.get_boolean(i)?), + fcore::metadata::DataType::TinyInt(_) => Datum::Int8(row.get_byte(i)?), + fcore::metadata::DataType::SmallInt(_) => Datum::Int16(row.get_short(i)?), + fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)?), + fcore::metadata::DataType::BigInt(_) => Datum::Int64(row.get_long(i)?), + fcore::metadata::DataType::Float(_) => Datum::Float32(row.get_float(i)?.into()), + fcore::metadata::DataType::Double(_) => Datum::Float64(row.get_double(i)?.into()), fcore::metadata::DataType::String(_) => { - Datum::String(Cow::Owned(row.get_string(i).to_string())) + Datum::String(Cow::Owned(row.get_string(i)?.to_string())) } fcore::metadata::DataType::Bytes(_) => { - Datum::Blob(Cow::Owned(row.get_bytes(i).to_vec())) + Datum::Blob(Cow::Owned(row.get_bytes(i)?.to_vec())) } - fcore::metadata::DataType::Date(_) => Datum::Date(row.get_date(i)), - fcore::metadata::DataType::Time(_) => Datum::Time(row.get_time(i)), + fcore::metadata::DataType::Date(_) => Datum::Date(row.get_date(i)?), + fcore::metadata::DataType::Time(_) => Datum::Time(row.get_time(i)?), fcore::metadata::DataType::Timestamp(dt) => { - Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision())) + Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision())?) } fcore::metadata::DataType::TimestampLTz(dt) => { - Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision())) + Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision())?) } fcore::metadata::DataType::Decimal(dt) => { - let decimal = row.get_decimal(i, dt.precision() as usize, dt.scale() as usize); + let decimal = row.get_decimal(i, dt.precision() as usize, dt.scale() as usize)?; Datum::Decimal(decimal) } fcore::metadata::DataType::Char(dt) => Datum::String(Cow::Owned( - row.get_char(i, dt.length() as usize).to_string(), + row.get_char(i, dt.length() as usize)?.to_string(), )), fcore::metadata::DataType::Binary(dt) => { - Datum::Blob(Cow::Owned(row.get_binary(i, dt.length()).to_vec())) + Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec())) } other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")), }; diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index bc2e956c..51f67df5 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -1263,84 +1263,109 @@ pub fn datum_to_python_value( match data_type { DataType::Boolean(_) => Ok(row .get_boolean(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::TinyInt(_) => Ok(row .get_byte(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::SmallInt(_) => Ok(row .get_short(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::Int(_) => Ok(row .get_int(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::BigInt(_) => Ok(row .get_long(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::Float(_) => Ok(row .get_float(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::Double(_) => Ok(row .get_double(pos) + .map_err(|e| FlussError::from_core_error(&e))? .into_pyobject(py)? .to_owned() .into_any() .unbind()), DataType::String(_) => { - let s = row.get_string(pos); + let s = row + .get_string(pos) + .map_err(|e| FlussError::from_core_error(&e))?; Ok(s.into_pyobject(py)?.into_any().unbind()) } DataType::Char(char_type) => { - let s = row.get_char(pos, char_type.length() as usize); + let s = row + .get_char(pos, char_type.length() as usize) + .map_err(|e| FlussError::from_core_error(&e))?; Ok(s.into_pyobject(py)?.into_any().unbind()) } DataType::Bytes(_) => { - let b = row.get_bytes(pos); + let b = row + .get_bytes(pos) + .map_err(|e| FlussError::from_core_error(&e))?; Ok(PyBytes::new(py, b).into_any().unbind()) } DataType::Binary(binary_type) => { - let b = row.get_binary(pos, binary_type.length()); + let b = row + .get_binary(pos, binary_type.length()) + .map_err(|e| FlussError::from_core_error(&e))?; Ok(PyBytes::new(py, b).into_any().unbind()) } DataType::Decimal(decimal_type) => { - let decimal = row.get_decimal( - pos, - decimal_type.precision() as usize, - decimal_type.scale() as usize, - ); + let decimal = row + .get_decimal( + pos, + decimal_type.precision() as usize, + decimal_type.scale() as usize, + ) + .map_err(|e| FlussError::from_core_error(&e))?; rust_decimal_to_python(py, &decimal) } DataType::Date(_) => { - let date = row.get_date(pos); + let date = row + .get_date(pos) + .map_err(|e| FlussError::from_core_error(&e))?; rust_date_to_python(py, date) } DataType::Time(_) => { - let time = row.get_time(pos); + let time = row + .get_time(pos) + .map_err(|e| FlussError::from_core_error(&e))?; rust_time_to_python(py, time) } DataType::Timestamp(ts_type) => { - let ts = row.get_timestamp_ntz(pos, ts_type.precision()); + let ts = row + .get_timestamp_ntz(pos, ts_type.precision()) + .map_err(|e| FlussError::from_core_error(&e))?; rust_timestamp_ntz_to_python(py, ts) } DataType::TimestampLTz(ts_type) => { - let ts = row.get_timestamp_ltz(pos, ts_type.precision()); + let ts = row + .get_timestamp_ltz(pos, ts_type.precision()) + .map_err(|e| FlussError::from_core_error(&e))?; rust_timestamp_ltz_to_python(py, ts) } _ => Err(FlussError::new_err(format!( diff --git a/crates/examples/src/example_kv_table.rs b/crates/examples/src/example_kv_table.rs index 90788b14..8fb60baa 100644 --- a/crates/examples/src/example_kv_table.rs +++ b/crates/examples/src/example_kv_table.rs @@ -75,8 +75,8 @@ pub async fn main() -> Result<()> { let row = result.get_single_row()?.unwrap(); println!( "Found id={id}: name={}, age={}", - row.get_string(1), - row.get_long(2) + row.get_string(1)?, + row.get_long(2)? ); } @@ -92,8 +92,8 @@ pub async fn main() -> Result<()> { let row = result.get_single_row()?.unwrap(); println!( "Verified update: name={}, age={}", - row.get_string(1), - row.get_long(2) + row.get_string(1)?, + row.get_long(2)? ); println!("\n=== Deleting ==="); diff --git a/crates/examples/src/example_partitioned_kv_table.rs b/crates/examples/src/example_partitioned_kv_table.rs index e0471785..9cd2e7df 100644 --- a/crates/examples/src/example_partitioned_kv_table.rs +++ b/crates/examples/src/example_partitioned_kv_table.rs @@ -90,9 +90,9 @@ pub async fn main() -> Result<()> { let row = result.get_single_row()?.unwrap(); println!( "Found id={id}: region={}, zone={}, score={}", - row.get_string(1), - row.get_long(2), - row.get_long(3) + row.get_string(1)?, + row.get_long(2)?, + row.get_long(3)? ); } @@ -109,8 +109,8 @@ pub async fn main() -> Result<()> { let row = result.get_single_row()?.unwrap(); println!( "Verified update: region={}, zone={}", - row.get_string(1), - row.get_long(2) + row.get_string(1)?, + row.get_long(2)? ); println!("\n=== Deleting ==="); diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index cfe1627b..e4ad1fbd 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -83,9 +83,9 @@ pub async fn main() -> Result<()> { let row = record.row(); println!( "{{{}, {}, {}}}@{}", - row.get_int(0), - row.get_string(1), - row.get_long(2), + row.get_int(0)?, + row.get_string(1)?, + row.get_long(2)?, record.offset() ); } diff --git a/crates/fluss/src/client/table/append.rs b/crates/fluss/src/client/table/append.rs index 942253fa..a58433f3 100644 --- a/crates/fluss/src/client/table/append.rs +++ b/crates/fluss/src/client/table/append.rs @@ -17,6 +17,7 @@ use crate::client::table::partition_getter::{PartitionGetter, get_physical_path}; use crate::client::{WriteRecord, WriteResultFuture, WriterClient}; +use crate::error::Error::IllegalArgument; use crate::error::Result; use crate::metadata::{PhysicalTablePath, TableInfo, TablePath}; use crate::row::{ColumnarRow, InternalRow}; @@ -69,6 +70,21 @@ pub struct AppendWriter { } impl AppendWriter { + fn check_field_count(&self, row: &R) -> Result<()> { + let expected = self.table_info.get_row_type().fields().len(); + if row.get_field_count() != expected { + return Err(IllegalArgument { + message: format!( + "The field count of the row does not match the table schema. \ + Expected: {}, Actual: {}", + expected, + row.get_field_count() + ), + }); + } + Ok(()) + } + /// Appends a row to the table. /// /// This method returns a [`WriteResultFuture`] immediately after queueing the write, @@ -81,6 +97,7 @@ impl AppendWriter { /// A [`WriteResultFuture`] that can be awaited to wait for server acknowledgment, /// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery). pub fn append(&self, row: &R) -> Result { + self.check_field_count(row)?; let physical_table_path = Arc::new(get_physical_path( &self.table_path, self.partition_getter.as_ref(), diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index a1aad2d8..1115ded3 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -87,7 +87,7 @@ impl PartitionGetter { let mut partition_values = Vec::with_capacity(self.partitions.len()); for (data_type, field_getter) in &self.partitions { - let value = field_getter.get_field(row); + let value = field_getter.get_field(row)?; if value.is_null() { return Err(IllegalArgument { message: "Partition value shouldn't be null.".to_string(), diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 4b6f8095..3ec9106d 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -114,9 +114,9 @@ impl<'a> TableScan<'a> { /// let row = record.row(); /// println!( /// "{{{}, {}, {}}}@{}", - /// row.get_int(0), - /// row.get_string(2), - /// row.get_string(3), + /// row.get_int(0)?, + /// row.get_string(2)?, + /// row.get_string(3)?, /// record.offset() /// ); /// } @@ -188,8 +188,8 @@ impl<'a> TableScan<'a> { /// let row = record.row(); /// println!( /// "{{{}, {}}}@{}", - /// row.get_int(0), - /// row.get_string(1), + /// row.get_int(0)?, + /// row.get_string(1)?, /// record.offset() /// ); /// } diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 7057b901..52ec37b3 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -328,7 +328,7 @@ impl UpsertWriter { })?; encoder.start_new_row()?; for (pos, field_getter) in self.field_getters.iter().enumerate() { - let datum = field_getter.get_field(row); + let datum = field_getter.get_field(row)?; encoder.encode_field(pos, datum)?; } encoder.finish_row() diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 7fb9d34a..ea27836e 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -355,7 +355,7 @@ impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder { fn append(&mut self, row: &dyn InternalRow) -> Result { for (idx, getter) in self.field_getters.iter().enumerate() { - let datum = getter.get_field(row); + let datum = getter.get_field(row)?; let field_type = self.table_schema.field(idx).data_type(); let builder = self.arrow_column_builders diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs b/crates/fluss/src/record/kv/kv_record_batch.rs index eb89d69c..14ff2e91 100644 --- a/crates/fluss/src/record/kv/kv_record_batch.rs +++ b/crates/fluss/src/record/kv/kv_record_batch.rs @@ -445,7 +445,7 @@ mod tests { assert_eq!(record1.key().as_ref(), key1); assert!(!record1.is_deletion()); let row1 = record1.row(&*decoder).unwrap(); - assert_eq!(row1.get_bytes(0), &[1, 2, 3, 4, 5]); + assert_eq!(row1.get_bytes(0).unwrap(), &[1, 2, 3, 4, 5]); let record2 = iter.next().unwrap().unwrap(); assert_eq!(record2.key().as_ref(), key2); diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs index 83707648..0e806337 100644 --- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -555,14 +555,14 @@ mod tests { 1 => { assert_eq!(rec.key().as_ref(), key1); let row = rec.row(&*decoder).unwrap(); - assert_eq!(row.get_int(0), 42); - assert_eq!(row.get_string(1), "hello"); + assert_eq!(row.get_int(0)?, 42); + assert_eq!(row.get_string(1)?, "hello"); } 2 => { assert_eq!(rec.key().as_ref(), key2); let row = rec.row(&*decoder).unwrap(); - assert_eq!(row.get_int(0), 100); - assert_eq!(row.get_string(1), "world"); + assert_eq!(row.get_int(0)?, 100); + assert_eq!(row.get_string(1)?, "world"); } 3 => { assert_eq!(rec.key().as_ref(), key3); diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index 50db32b1..fbd09296 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use crate::error::Error::IllegalArgument; +use crate::error::Result; use crate::row::InternalRow; +use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; use arrow::array::{ Array, AsArray, BinaryArray, Date32Array, Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringArray, @@ -187,102 +190,130 @@ impl InternalRow for ColumnarRow { self.record_batch.column(pos).is_null(self.row_id) } - fn get_boolean(&self, pos: usize) -> bool { - self.record_batch + fn get_boolean(&self, pos: usize) -> Result { + Ok(self + .record_batch .column(pos) .as_boolean() - .value(self.row_id) + .value(self.row_id)) } - fn get_byte(&self, pos: usize) -> i8 { - self.record_batch + fn get_byte(&self, pos: usize) -> Result { + Ok(self + .record_batch .column(pos) .as_any() .downcast_ref::() - .expect("Expect byte array") - .value(self.row_id) + .ok_or_else(|| IllegalArgument { + message: format!("expected byte array at position {pos}"), + })? + .value(self.row_id)) } - fn get_short(&self, pos: usize) -> i16 { - self.record_batch + fn get_short(&self, pos: usize) -> Result { + Ok(self + .record_batch .column(pos) .as_any() .downcast_ref::() - .expect("Expect short array") - .value(self.row_id) + .ok_or_else(|| IllegalArgument { + message: format!("expected short array at position {pos}"), + })? + .value(self.row_id)) } - fn get_int(&self, pos: usize) -> i32 { - self.record_batch + fn get_int(&self, pos: usize) -> Result { + Ok(self + .record_batch .column(pos) .as_any() .downcast_ref::() - .expect("Expect int array") - .value(self.row_id) + .ok_or_else(|| IllegalArgument { + message: format!("expected int array at position {pos}"), + })? + .value(self.row_id)) } - fn get_long(&self, pos: usize) -> i64 { - self.record_batch + fn get_long(&self, pos: usize) -> Result { + Ok(self + .record_batch .column(pos) .as_any() .downcast_ref::() - .expect("Expect long array") - .value(self.row_id) + .ok_or_else(|| IllegalArgument { + message: format!("expected long array at position {pos}"), + })? + .value(self.row_id)) } - fn get_float(&self, pos: usize) -> f32 { - self.record_batch + fn get_float(&self, pos: usize) -> Result { + Ok(self + .record_batch .column(pos) .as_any() .downcast_ref::() - .expect("Expect float32 array") - .value(self.row_id) + .ok_or_else(|| IllegalArgument { + message: format!("expected float32 array at position {pos}"), + })? + .value(self.row_id)) } - fn get_double(&self, pos: usize) -> f64 { - self.record_batch + fn get_double(&self, pos: usize) -> Result { + Ok(self + .record_batch .column(pos) .as_any() .downcast_ref::() - .expect("Expect float64 array") - .value(self.row_id) + .ok_or_else(|| IllegalArgument { + message: format!("expected float64 array at position {pos}"), + })? + .value(self.row_id)) } - fn get_char(&self, pos: usize, _length: usize) -> &str { - self.record_batch + fn get_char(&self, pos: usize, _length: usize) -> Result<&str> { + Ok(self + .record_batch .column(pos) .as_any() .downcast_ref::() - .expect("Expected String array for char type") - .value(self.row_id) + .ok_or_else(|| IllegalArgument { + message: format!("expected String array for char type at position {pos}"), + })? + .value(self.row_id)) } - fn get_string(&self, pos: usize) -> &str { - self.record_batch + fn get_string(&self, pos: usize) -> Result<&str> { + Ok(self + .record_batch .column(pos) .as_any() .downcast_ref::() - .expect("Expected String array.") - .value(self.row_id) + .ok_or_else(|| IllegalArgument { + message: format!("expected String array at position {pos}"), + })? + .value(self.row_id)) } - fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> crate::row::Decimal { + fn get_decimal( + &self, + pos: usize, + precision: usize, + scale: usize, + ) -> Result { use arrow::datatypes::DataType; let column = self.record_batch.column(pos); let array = column .as_any() .downcast_ref::() - .unwrap_or_else(|| { - panic!( - "Expected Decimal128Array at column {}, found: {:?}", - pos, + .ok_or_else(|| IllegalArgument { + message: format!( + "expected Decimal128Array at column {pos}, found: {:?}", column.data_type() - ) - }); + ), + })?; // Contract: caller must check is_null_at() before calling get_decimal. - // Calling on null value violates the contract and returns garbage data debug_assert!( !array.is_null(self.row_id), "get_decimal called on null value at pos {} row {}", @@ -295,7 +326,13 @@ impl InternalRow for ColumnarRow { let field = schema.field(pos); let arrow_scale = match field.data_type() { DataType::Decimal128(_p, s) => *s as i64, - dt => panic!("Expected Decimal128 data type at column {pos}, found: {dt:?}"), + dt => { + return Err(IllegalArgument { + message: format!( + "expected Decimal128 data type at column {pos}, found: {dt:?}" + ), + }); + } }; let i128_val = array.value(self.row_id); @@ -307,60 +344,56 @@ impl InternalRow for ColumnarRow { precision as u32, scale as u32, ) - .unwrap_or_else(|e| { - panic!( - "Failed to create Decimal at column {} row {}: {}", - pos, self.row_id, e - ) - }) } - fn get_date(&self, pos: usize) -> crate::row::datum::Date { - crate::row::datum::Date::new(self.read_date_from_arrow(pos)) + fn get_date(&self, pos: usize) -> Result { + Ok(Date::new(self.read_date_from_arrow(pos))) } - fn get_time(&self, pos: usize) -> crate::row::datum::Time { - crate::row::datum::Time::new(self.read_time_from_arrow(pos)) + fn get_time(&self, pos: usize) -> Result