Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,10 @@ class FlussError(Exception):
message: str
error_code: int
def __init__(self, message: str, error_code: int = -2) -> None: ...

def is_retriable(self) -> bool:
...

def __str__(self) -> str: ...

class LakeSnapshot:
Expand Down
20 changes: 20 additions & 0 deletions bindings/python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ use pyo3::prelude::*;
/// collide with current or future API codes. Consistent with the CPP binding.
const CLIENT_ERROR_CODE: i32 = -2;

const RETRIABLE_ERROR_CODES: &[i32] = &[
1, // NetworkException
3, // CorruptMessage
10, // LogStorageException
11, // KvStorageException
12, // NotLeaderOrFollower
14, // CorruptRecordException
21, // UnknownTableOrBucketException
25, // RequestTimeOut
26, // StorageException
28, // NotEnoughReplicasAfterAppendException
29, // NotEnoughReplicasException
44, // LeaderNotAvailableException
51, // RetriableAuthenticateException
];

/// Fluss errors
#[pyclass(extends=PyException)]
#[derive(Debug, Clone)]
Expand All @@ -44,6 +60,10 @@ impl FlussError {
}
}

fn is_retriable(&self) -> bool {
RETRIABLE_ERROR_CODES.contains(&self.error_code)
}

fn __str__(&self) -> String {
if self.error_code != CLIENT_ERROR_CODE {
format!("FlussError(code={}): {}", self.error_code, self.message)
Expand Down
39 changes: 39 additions & 0 deletions bindings/python/test/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ async def test_fluss_error_response(admin):
await admin.get_table_info(table_path)

assert exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_EXIST
assert exc_info.value.is_retriable() is False


async def test_error_database_not_exist(admin):
Expand Down Expand Up @@ -299,3 +300,41 @@ async def test_error_table_not_partitioned(admin):

await admin.drop_table(table_path, ignore_if_not_exists=True)
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)


def test_fluss_error_is_retriable():
client_err = fluss.FlussError("connection failed", fluss.ErrorCode.CLIENT_ERROR)
assert client_err.is_retriable() is False

table_not_exist = fluss.FlussError(
"table not found", fluss.ErrorCode.TABLE_NOT_EXIST
)
assert table_not_exist.is_retriable() is False

db_not_exist = fluss.FlussError(
"database not found", fluss.ErrorCode.DATABASE_NOT_EXIST
)
assert db_not_exist.is_retriable() is False

assert (
fluss.FlussError("network", fluss.ErrorCode.NETWORK_EXCEPTION).is_retriable()
is True
)
assert (
fluss.FlussError(
"timeout", fluss.ErrorCode.REQUEST_TIME_OUT
).is_retriable()
is True
)
assert (
fluss.FlussError(
"leader not available", fluss.ErrorCode.LEADER_NOT_AVAILABLE_EXCEPTION
).is_retriable()
is True
)
assert (
fluss.FlussError(
"retriable auth", fluss.ErrorCode.RETRIABLE_AUTHENTICATE_EXCEPTION
).is_retriable()
is True
)
Loading