Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 50 additions & 30 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1773,7 +1773,7 @@ mod row_reader {
allowed: impl FnOnce(&fcore::metadata::DataType) -> bool,
) -> Result<&'a fcore::metadata::DataType, String> {
let col = get_column(columns, field)?;
if row.is_null_at(field) {
if row.is_null_at(field).map_err(|e| e.to_string())? {
return Err(format!("field {field} is null"));
}
let dt = col.data_type();
Expand Down Expand Up @@ -1801,7 +1801,7 @@ mod row_reader {
field: usize,
) -> Result<bool, String> {
get_column(columns, field)?;
Ok(row.is_null_at(field))
row.is_null_at(field).map_err(|e| e.to_string())
}

pub fn get_bool(
Expand All @@ -1812,7 +1812,7 @@ mod row_reader {
validate(row, columns, field, "get_bool", |dt| {
matches!(dt, fcore::metadata::DataType::Boolean(_))
})?;
Ok(row.get_boolean(field))
row.get_boolean(field).map_err(|e| e.to_string())
}

pub fn get_i32(
Expand All @@ -1828,11 +1828,17 @@ mod row_reader {
| fcore::metadata::DataType::Int(_)
)
})?;
Ok(match dt {
fcore::metadata::DataType::TinyInt(_) => row.get_byte(field) as i32,
fcore::metadata::DataType::SmallInt(_) => row.get_short(field) as i32,
_ => row.get_int(field),
})
match dt {
fcore::metadata::DataType::TinyInt(_) => row
.get_byte(field)
.map(|v| v as i32)
.map_err(|e| e.to_string()),
fcore::metadata::DataType::SmallInt(_) => row
.get_short(field)
.map(|v| v as i32)
.map_err(|e| e.to_string()),
_ => row.get_int(field).map_err(|e| e.to_string()),
}
}

pub fn get_i64(
Expand All @@ -1843,7 +1849,7 @@ mod row_reader {
validate(row, columns, field, "get_i64", |dt| {
matches!(dt, fcore::metadata::DataType::BigInt(_))
})?;
Ok(row.get_long(field))
row.get_long(field).map_err(|e| e.to_string())
}

pub fn get_f32(
Expand All @@ -1854,7 +1860,7 @@ mod row_reader {
validate(row, columns, field, "get_f32", |dt| {
matches!(dt, fcore::metadata::DataType::Float(_))
})?;
Ok(row.get_float(field))
row.get_float(field).map_err(|e| e.to_string())
}

pub fn get_f64(
Expand All @@ -1865,7 +1871,7 @@ mod row_reader {
validate(row, columns, field, "get_f64", |dt| {
matches!(dt, fcore::metadata::DataType::Double(_))
})?;
Ok(row.get_double(field))
row.get_double(field).map_err(|e| e.to_string())
}

pub fn get_str<'a>(
Expand All @@ -1879,10 +1885,12 @@ mod row_reader {
fcore::metadata::DataType::Char(_) | fcore::metadata::DataType::String(_)
)
})?;
Ok(match dt {
fcore::metadata::DataType::Char(ct) => row.get_char(field, ct.length() as usize),
_ => row.get_string(field),
})
match dt {
fcore::metadata::DataType::Char(ct) => row
.get_char(field, ct.length() as usize)
.map_err(|e| e.to_string()),
_ => row.get_string(field).map_err(|e| e.to_string()),
}
}

pub fn get_bytes<'a>(
Expand All @@ -1896,10 +1904,12 @@ mod row_reader {
fcore::metadata::DataType::Binary(_) | fcore::metadata::DataType::Bytes(_)
)
})?;
Ok(match dt {
fcore::metadata::DataType::Binary(bt) => row.get_binary(field, bt.length()),
_ => row.get_bytes(field),
})
match dt {
fcore::metadata::DataType::Binary(bt) => row
.get_binary(field, bt.length())
.map_err(|e| e.to_string()),
_ => row.get_bytes(field).map_err(|e| e.to_string()),
}
}

pub fn get_date_days(
Expand All @@ -1910,7 +1920,9 @@ mod row_reader {
validate(row, columns, field, "get_date_days", |dt| {
matches!(dt, fcore::metadata::DataType::Date(_))
})?;
Ok(row.get_date(field).get_inner())
row.get_date(field)
.map(|d| d.get_inner())
.map_err(|e| e.to_string())
}

pub fn get_time_millis(
Expand All @@ -1921,7 +1933,9 @@ mod row_reader {
validate(row, columns, field, "get_time_millis", |dt| {
matches!(dt, fcore::metadata::DataType::Time(_))
})?;
Ok(row.get_time(field).get_inner())
row.get_time(field)
.map(|t| t.get_inner())
.map_err(|e| e.to_string())
}

pub fn get_ts_millis(
Expand All @@ -1937,12 +1951,14 @@ mod row_reader {
)
})?;
match dt {
fcore::metadata::DataType::TimestampLTz(ts) => Ok(row
fcore::metadata::DataType::TimestampLTz(ts) => row
.get_timestamp_ltz(field, ts.precision())
.get_epoch_millisecond()),
fcore::metadata::DataType::Timestamp(ts) => Ok(row
.map(|v| v.get_epoch_millisecond())
.map_err(|e| e.to_string()),
fcore::metadata::DataType::Timestamp(ts) => row
.get_timestamp_ntz(field, ts.precision())
.get_millisecond()),
.map(|v| v.get_millisecond())
.map_err(|e| e.to_string()),
dt => Err(format!("get_ts_millis: unexpected type {dt}")),
}
}
Expand All @@ -1960,12 +1976,14 @@ mod row_reader {
)
})?;
match dt {
fcore::metadata::DataType::TimestampLTz(ts) => Ok(row
fcore::metadata::DataType::TimestampLTz(ts) => row
.get_timestamp_ltz(field, ts.precision())
.get_nano_of_millisecond()),
fcore::metadata::DataType::Timestamp(ts) => Ok(row
.map(|v| v.get_nano_of_millisecond())
.map_err(|e| e.to_string()),
fcore::metadata::DataType::Timestamp(ts) => row
.get_timestamp_ntz(field, ts.precision())
.get_nano_of_millisecond()),
.map(|v| v.get_nano_of_millisecond())
.map_err(|e| e.to_string()),
dt => Err(format!("get_ts_nanos: unexpected type {dt}")),
}
}
Expand All @@ -1987,7 +2005,9 @@ mod row_reader {
})?;
match dt {
fcore::metadata::DataType::Decimal(dd) => {
let decimal = row.get_decimal(field, dd.precision() as usize, dd.scale() as usize);
let decimal = row
.get_decimal(field, dd.precision() as usize, dd.scale() as usize)
.map_err(|e| e.to_string())?;
Ok(decimal.to_big_decimal().to_string())
}
dt => Err(format!("get_decimal_str: unexpected type {dt}")),
Expand Down
34 changes: 17 additions & 17 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,42 +371,42 @@ pub fn compacted_row_to_owned(
let mut out = fcore::row::GenericRow::new(columns.len());

for (i, col) in columns.iter().enumerate() {
if row.is_null_at(i) {
if row.is_null_at(i)? {
out.set_field(i, Datum::Null);
continue;
}

let datum = match col.data_type() {
fcore::metadata::DataType::Boolean(_) => Datum::Bool(row.get_boolean(i)),
fcore::metadata::DataType::TinyInt(_) => Datum::Int8(row.get_byte(i)),
fcore::metadata::DataType::SmallInt(_) => Datum::Int16(row.get_short(i)),
fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)),
fcore::metadata::DataType::BigInt(_) => Datum::Int64(row.get_long(i)),
fcore::metadata::DataType::Float(_) => Datum::Float32(row.get_float(i).into()),
fcore::metadata::DataType::Double(_) => Datum::Float64(row.get_double(i).into()),
fcore::metadata::DataType::Boolean(_) => Datum::Bool(row.get_boolean(i)?),
fcore::metadata::DataType::TinyInt(_) => Datum::Int8(row.get_byte(i)?),
fcore::metadata::DataType::SmallInt(_) => Datum::Int16(row.get_short(i)?),
fcore::metadata::DataType::Int(_) => Datum::Int32(row.get_int(i)?),
fcore::metadata::DataType::BigInt(_) => Datum::Int64(row.get_long(i)?),
fcore::metadata::DataType::Float(_) => Datum::Float32(row.get_float(i)?.into()),
fcore::metadata::DataType::Double(_) => Datum::Float64(row.get_double(i)?.into()),
fcore::metadata::DataType::String(_) => {
Datum::String(Cow::Owned(row.get_string(i).to_string()))
Datum::String(Cow::Owned(row.get_string(i)?.to_string()))
}
fcore::metadata::DataType::Bytes(_) => {
Datum::Blob(Cow::Owned(row.get_bytes(i).to_vec()))
Datum::Blob(Cow::Owned(row.get_bytes(i)?.to_vec()))
}
fcore::metadata::DataType::Date(_) => Datum::Date(row.get_date(i)),
fcore::metadata::DataType::Time(_) => Datum::Time(row.get_time(i)),
fcore::metadata::DataType::Date(_) => Datum::Date(row.get_date(i)?),
fcore::metadata::DataType::Time(_) => Datum::Time(row.get_time(i)?),
fcore::metadata::DataType::Timestamp(dt) => {
Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision()))
Datum::TimestampNtz(row.get_timestamp_ntz(i, dt.precision())?)
}
fcore::metadata::DataType::TimestampLTz(dt) => {
Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision()))
Datum::TimestampLtz(row.get_timestamp_ltz(i, dt.precision())?)
}
fcore::metadata::DataType::Decimal(dt) => {
let decimal = row.get_decimal(i, dt.precision() as usize, dt.scale() as usize);
let decimal = row.get_decimal(i, dt.precision() as usize, dt.scale() as usize)?;
Datum::Decimal(decimal)
}
fcore::metadata::DataType::Char(dt) => Datum::String(Cow::Owned(
row.get_char(i, dt.length() as usize).to_string(),
row.get_char(i, dt.length() as usize)?.to_string(),
)),
fcore::metadata::DataType::Binary(dt) => {
Datum::Blob(Cow::Owned(row.get_binary(i, dt.length()).to_vec()))
Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec()))
}
other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")),
};
Expand Down
56 changes: 42 additions & 14 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1256,91 +1256,119 @@ pub fn datum_to_python_value(
use fcore::metadata::DataType;

// Check for null first
if row.is_null_at(pos) {
if row
.is_null_at(pos)
.map_err(|e| FlussError::from_core_error(&e))?
{
return Ok(py.None());
}

match data_type {
DataType::Boolean(_) => Ok(row
.get_boolean(pos)
.map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::TinyInt(_) => Ok(row
.get_byte(pos)
.map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::SmallInt(_) => Ok(row
.get_short(pos)
.map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::Int(_) => Ok(row
.get_int(pos)
.map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::BigInt(_) => Ok(row
.get_long(pos)
.map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::Float(_) => Ok(row
.get_float(pos)
.map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::Double(_) => Ok(row
.get_double(pos)
.map_err(|e| FlussError::from_core_error(&e))?
.into_pyobject(py)?
.to_owned()
.into_any()
.unbind()),
DataType::String(_) => {
let s = row.get_string(pos);
let s = row
.get_string(pos)
.map_err(|e| FlussError::from_core_error(&e))?;
Ok(s.into_pyobject(py)?.into_any().unbind())
}
DataType::Char(char_type) => {
let s = row.get_char(pos, char_type.length() as usize);
let s = row
.get_char(pos, char_type.length() as usize)
.map_err(|e| FlussError::from_core_error(&e))?;
Ok(s.into_pyobject(py)?.into_any().unbind())
}
DataType::Bytes(_) => {
let b = row.get_bytes(pos);
let b = row
.get_bytes(pos)
.map_err(|e| FlussError::from_core_error(&e))?;
Ok(PyBytes::new(py, b).into_any().unbind())
}
DataType::Binary(binary_type) => {
let b = row.get_binary(pos, binary_type.length());
let b = row
.get_binary(pos, binary_type.length())
.map_err(|e| FlussError::from_core_error(&e))?;
Ok(PyBytes::new(py, b).into_any().unbind())
}
DataType::Decimal(decimal_type) => {
let decimal = row.get_decimal(
pos,
decimal_type.precision() as usize,
decimal_type.scale() as usize,
);
let decimal = row
.get_decimal(
pos,
decimal_type.precision() as usize,
decimal_type.scale() as usize,
)
.map_err(|e| FlussError::from_core_error(&e))?;
rust_decimal_to_python(py, &decimal)
}
DataType::Date(_) => {
let date = row.get_date(pos);
let date = row
.get_date(pos)
.map_err(|e| FlussError::from_core_error(&e))?;
rust_date_to_python(py, date)
}
DataType::Time(_) => {
let time = row.get_time(pos);
let time = row
.get_time(pos)
.map_err(|e| FlussError::from_core_error(&e))?;
rust_time_to_python(py, time)
}
DataType::Timestamp(ts_type) => {
let ts = row.get_timestamp_ntz(pos, ts_type.precision());
let ts = row
.get_timestamp_ntz(pos, ts_type.precision())
.map_err(|e| FlussError::from_core_error(&e))?;
rust_timestamp_ntz_to_python(py, ts)
}
DataType::TimestampLTz(ts_type) => {
let ts = row.get_timestamp_ltz(pos, ts_type.precision());
let ts = row
.get_timestamp_ltz(pos, ts_type.precision())
.map_err(|e| FlussError::from_core_error(&e))?;
rust_timestamp_ltz_to_python(py, ts)
}
_ => Err(FlussError::new_err(format!(
Expand Down
8 changes: 4 additions & 4 deletions crates/examples/src/example_kv_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ pub async fn main() -> Result<()> {
let row = result.get_single_row()?.unwrap();
println!(
"Found id={id}: name={}, age={}",
row.get_string(1),
row.get_long(2)
row.get_string(1)?,
row.get_long(2)?
);
}

Expand All @@ -92,8 +92,8 @@ pub async fn main() -> Result<()> {
let row = result.get_single_row()?.unwrap();
println!(
"Verified update: name={}, age={}",
row.get_string(1),
row.get_long(2)
row.get_string(1)?,
row.get_long(2)?
);

println!("\n=== Deleting ===");
Expand Down
Loading
Loading