Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,14 @@ struct PartitionInfo {
std::string partition_name;
};

struct ServerNode {
int32_t id;
std::string host;
uint32_t port;
std::string server_type;
std::string uid;
};

/// Descriptor for create_database (optional). Leave comment and properties empty for default.
struct DatabaseDescriptor {
std::string comment;
Expand Down Expand Up @@ -1068,6 +1076,8 @@ class Admin {

Result TableExists(const TablePath& table_path, bool& out);

Result GetServerNodes(std::vector<ServerNode>& out);

private:
Result DoListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids,
const OffsetSpec& offset_spec, std::unordered_map<int32_t, int64_t>& out,
Expand Down
18 changes: 18 additions & 0 deletions bindings/cpp/src/admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,4 +346,22 @@ Result Admin::TableExists(const TablePath& table_path, bool& out) {
return result;
}

Result Admin::GetServerNodes(std::vector<ServerNode>& out) {
if (!Available()) {
return utils::make_client_error("Admin not available");
}

auto ffi_result = admin_->get_server_nodes();
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out.clear();
out.reserve(ffi_result.server_nodes.size());
for (const auto& node : ffi_result.server_nodes) {
out.push_back({node.node_id, std::string(node.host), node.port,
std::string(node.server_type), std::string(node.uid)});
}
}
return result;
}

} // namespace fluss
41 changes: 41 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,19 @@ mod ffi {
value: bool,
}

struct FfiServerNode {
node_id: i32,
host: String,
port: u32,
server_type: String,
uid: String,
}

struct FfiServerNodesResult {
result: FfiResult,
server_nodes: Vec<FfiServerNode>,
}

extern "Rust" {
type Connection;
type Admin;
Expand Down Expand Up @@ -316,6 +329,7 @@ mod ffi {
fn get_database_info(self: &Admin, database_name: &str) -> FfiDatabaseInfoResult;
fn list_tables(self: &Admin, database_name: &str) -> FfiListTablesResult;
fn table_exists(self: &Admin, table_path: &FfiTablePath) -> FfiBoolResult;
fn get_server_nodes(self: &Admin) -> FfiServerNodesResult;

// Table
unsafe fn delete_table(table: *mut Table);
Expand Down Expand Up @@ -1089,6 +1103,33 @@ impl Admin {
},
}
}

fn get_server_nodes(&self) -> ffi::FfiServerNodesResult {
let result = RUNTIME.block_on(async { self.inner.get_server_nodes().await });

match result {
Ok(nodes) => {
let server_nodes: Vec<ffi::FfiServerNode> = nodes
.into_iter()
.map(|node| ffi::FfiServerNode {
node_id: node.id(),
host: node.host().to_string(),
port: node.port(),
server_type: node.server_type().to_string(),
uid: node.uid().to_string(),
})
.collect();
ffi::FfiServerNodesResult {
result: ok_result(),
server_nodes,
}
}
Err(e) => ffi::FfiServerNodesResult {
result: err_from_core_error(&e),
server_nodes: vec![],
},
}
}
}

// Table implementation
Expand Down
25 changes: 25 additions & 0 deletions bindings/cpp/test/test_admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,31 @@ TEST_F(AdminTest, ErrorTableAlreadyExist) {
ASSERT_OK(adm.DropDatabase(db_name, true, true));
}

TEST_F(AdminTest, GetServerNodes) {
auto& adm = admin();

std::vector<fluss::ServerNode> nodes;
ASSERT_OK(adm.GetServerNodes(nodes));

ASSERT_GT(nodes.size(), 0u) << "Expected at least one server node";

bool has_coordinator = false;
bool has_tablet = false;
for (const auto& node : nodes) {
EXPECT_FALSE(node.host.empty()) << "Server node host should not be empty";
EXPECT_GT(node.port, 0u) << "Server node port should be > 0";
EXPECT_FALSE(node.uid.empty()) << "Server node uid should not be empty";

if (node.server_type == "CoordinatorServer") {
has_coordinator = true;
} else if (node.server_type == "TabletServer") {
has_tablet = true;
}
}
EXPECT_TRUE(has_coordinator) << "Expected a coordinator server node";
EXPECT_TRUE(has_tablet) << "Expected at least one tablet server node";
}

TEST_F(AdminTest, ErrorTableNotExist) {
auto& adm = admin();

Expand Down
32 changes: 32 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,31 @@ class FlussConnection:
) -> bool: ...
def __repr__(self) -> str: ...

class ServerNode:
"""Information about a server node in the Fluss cluster."""

@property
def id(self) -> int:
"""The server node ID."""
...
@property
def host(self) -> str:
"""The hostname of the server."""
...
@property
def port(self) -> int:
"""The port number of the server."""
...
@property
def server_type(self) -> str:
"""The type of server ('CoordinatorServer' or 'TabletServer')."""
...
@property
def uid(self) -> str:
"""The unique identifier of the server (e.g. 'cs-0', 'ts-1')."""
...
def __repr__(self) -> str: ...

class FlussAdmin:
async def create_database(
self,
Expand Down Expand Up @@ -303,6 +328,13 @@ class FlussAdmin:
List of PartitionInfo objects
"""
...
async def get_server_nodes(self) -> List[ServerNode]:
"""Get all alive server nodes in the cluster.

Returns:
List of ServerNode objects (coordinator and tablet servers)
"""
...
def __repr__(self) -> str: ...


Expand Down
81 changes: 81 additions & 0 deletions bindings/python/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,30 @@ impl FlussAdmin {
})
}

/// Get all alive server nodes in the cluster.
///
/// Returns:
/// List[ServerNode]: List of server nodes (coordinator and tablet servers)
pub fn get_server_nodes<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let admin = self.__admin.clone();

future_into_py(py, async move {
let nodes = admin
.get_server_nodes()
.await
.map_err(|e| FlussError::from_core_error(&e))?;

Python::attach(|py| {
let py_list = pyo3::types::PyList::empty(py);
for node in nodes {
let py_node = ServerNode::from_core(node);
py_list.append(Py::new(py, py_node)?)?;
}
Ok(py_list.unbind())
})
})
}

fn __repr__(&self) -> String {
"FlussAdmin()".to_string()
}
Expand Down Expand Up @@ -552,3 +576,60 @@ impl PartitionInfo {
}
}
}

/// Information about a server node in the Fluss cluster
#[pyclass]
pub struct ServerNode {
id: i32,
host: String,
port: u32,
server_type: String,
uid: String,
}

#[pymethods]
impl ServerNode {
#[getter]
fn id(&self) -> i32 {
self.id
}

#[getter]
fn host(&self) -> &str {
&self.host
}

#[getter]
fn port(&self) -> u32 {
self.port
}

#[getter]
fn server_type(&self) -> &str {
&self.server_type
}

#[getter]
fn uid(&self) -> &str {
&self.uid
}

fn __repr__(&self) -> String {
format!(
"ServerNode(id={}, host='{}', port={}, server_type='{}')",
self.id, self.host, self.port, self.server_type
)
}
}

impl ServerNode {
pub fn from_core(node: fcore::ServerNode) -> Self {
Self {
id: node.id(),
host: node.host().to_string(),
port: node.port(),
server_type: node.server_type().to_string(),
uid: node.uid().to_string(),
}
}
}
1 change: 1 addition & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<ScanRecords>()?;
m.add_class::<RecordBatch>()?;
m.add_class::<PartitionInfo>()?;
m.add_class::<ServerNode>()?;
m.add_class::<OffsetSpec>()?;
m.add_class::<WriteResultHandle>()?;
m.add_class::<DatabaseDescriptor>()?;
Expand Down
17 changes: 17 additions & 0 deletions bindings/python/test/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,23 @@ async def test_error_table_not_exist(admin):
await admin.drop_table(table_path, ignore_if_not_exists=True)


async def test_get_server_nodes(admin):
"""Test get_server_nodes returns coordinator and tablet servers."""
nodes = await admin.get_server_nodes()

assert len(nodes) > 0, "Expected at least one server node"

server_types = [n.server_type for n in nodes]
assert "CoordinatorServer" in server_types, "Expected a coordinator server"
assert "TabletServer" in server_types, "Expected at least one tablet server"

for node in nodes:
assert node.host, "Server node host should not be empty"
assert node.port > 0, "Server node port should be > 0"
assert node.uid, "Server node uid should not be empty"
assert repr(node).startswith("ServerNode(")


async def test_error_table_not_partitioned(admin):
"""Test error when calling partition operations on non-partitioned table."""
db_name = "py_test_error_not_partitioned_db"
Expand Down
8 changes: 8 additions & 0 deletions crates/fluss/src/client/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::client::metadata::Metadata;
use crate::cluster::ServerNode;
use crate::metadata::{
DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec,
PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath,
Expand Down Expand Up @@ -267,6 +268,13 @@ impl FlussAdmin {
))
}

/// Get all alive server nodes in the cluster, including the coordinator
/// and all tablet servers. Refreshes cluster metadata before returning.
pub async fn get_server_nodes(&self) -> Result<Vec<ServerNode>> {
self.metadata.reinit_cluster().await?;
Ok(self.metadata.get_cluster().get_server_nodes())
}

/// Get the latest lake snapshot for a table
pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result<LakeSnapshot> {
let response = self
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/client/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Metadata {
Cluster::from_metadata_response(response, None)
}

async fn reinit_cluster(&self) -> Result<()> {
pub(crate) async fn reinit_cluster(&self) -> Result<()> {
let cluster = Self::init_cluster(&self.bootstrap, self.connections.clone()).await?;
*self.cluster.write() = cluster.into();
Ok(())
Expand Down
Loading
Loading