Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ name = "server"
path = "src/bin/server.rs"

[dependencies]
datafusion = "51.0"
datafusion-tracing = "51.0.0"
datafusion = { version = "52.1", default-features = false, features = ["parquet", "nested_expressions", "crypto_expressions", "regex_expressions", "unicode_expressions", "datetime_expressions", "encoding_expressions", "string_expressions", "math_expressions", "sql", "recursive_protection"] }
datafusion-tracing = "52.0.0"
instrumented-object-store = "52.0.0"
duckdb = { version = "1.4.4", features = ["bundled"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "sqlite", "mysql", "chrono", "tls-rustls", "bigdecimal"] }
Expand Down Expand Up @@ -57,7 +57,8 @@ arrow-array-54 = { package = "arrow-array", version = "54" }
arrow-ipc-54 = { package = "arrow-ipc", version = "54" }
# Arrow 56 for duckdb compatibility (duckdb uses arrow 56, datafusion uses arrow 57)
arrow-ipc-56 = { package = "arrow-ipc", version = "56" }
liquid-cache-client = { git = "https://github.com/XiangpengHao/liquid-cache", rev = "5e63d811132230a0e15fb6d4311be2eb5551cb4d" }
# Temporarily disabled - waiting for DataFusion 52 support
# liquid-cache-client = { git = "https://github.com/XiangpengHao/liquid-cache", rev = "5e63d811132230a0e15fb6d4311be2eb5551cb4d" }
redis = { version = "0.27", features = ["tokio-comp", "connection-manager"] }

[build-dependencies]
Expand Down
15 changes: 15 additions & 0 deletions migrations/postgres/v5.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Add indexes table for storing projection/sorted index metadata
CREATE TABLE indexes (
id SERIAL PRIMARY KEY,
connection_id TEXT NOT NULL,
schema_name TEXT NOT NULL,
table_name TEXT NOT NULL,
index_name TEXT NOT NULL,
sort_columns TEXT NOT NULL, -- JSON array of column names
parquet_path TEXT, -- Path to sorted projection parquet
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (connection_id) REFERENCES connections(id) ON DELETE CASCADE,
UNIQUE (connection_id, schema_name, table_name, index_name)
);

CREATE INDEX idx_indexes_table ON indexes(connection_id, schema_name, table_name);
15 changes: 15 additions & 0 deletions migrations/sqlite/v5.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Add indexes table for storing projection/sorted index metadata
CREATE TABLE indexes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
connection_id TEXT NOT NULL,
schema_name TEXT NOT NULL,
table_name TEXT NOT NULL,
index_name TEXT NOT NULL,
sort_columns TEXT NOT NULL, -- JSON array of column names
parquet_path TEXT, -- Path to sorted projection parquet
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (connection_id) REFERENCES connections(id) ON DELETE CASCADE,
UNIQUE (connection_id, schema_name, table_name, index_name)
);

CREATE INDEX idx_indexes_table ON indexes(connection_id, schema_name, table_name);
62 changes: 60 additions & 2 deletions src/catalog/caching_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
//! to the inner catalog directly.

use super::{
CatalogManager, ConnectionInfo, DatasetInfo, OptimisticLock, PendingDeletion, QueryResult,
ResultStatus, ResultUpdate, TableInfo, UploadInfo,
CatalogManager, ConnectionInfo, DatasetInfo, IndexInfo, OptimisticLock, PendingDeletion,
QueryResult, ResultStatus, ResultUpdate, TableInfo, UploadInfo,
};
use crate::config::CacheConfig;
use crate::secrets::{SecretMetadata, SecretStatus};
Expand Down Expand Up @@ -988,4 +988,62 @@ impl CatalogManager for CachingCatalogManager {
self.cached_read(&key, || self.inner().list_dataset_table_names(&schema))
.await
}

// Index methods - pass through to inner catalog (no caching for now)

async fn create_index(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
index_name: &str,
sort_columns: &[String],
parquet_path: Option<&str>,
) -> Result<i32> {
self.inner()
.create_index(
connection_id,
schema_name,
table_name,
index_name,
sort_columns,
parquet_path,
)
.await
}

async fn get_index(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
index_name: &str,
) -> Result<Option<IndexInfo>> {
self.inner()
.get_index(connection_id, schema_name, table_name, index_name)
.await
}

async fn list_indexes(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
) -> Result<Vec<IndexInfo>> {
self.inner()
.list_indexes(connection_id, schema_name, table_name)
.await
}

async fn delete_index(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
index_name: &str,
) -> Result<Option<IndexInfo>> {
self.inner()
.delete_index(connection_id, schema_name, table_name, index_name)
.await
}
}
78 changes: 78 additions & 0 deletions src/catalog/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,45 @@ pub struct DatasetInfo {
pub updated_at: DateTime<Utc>,
}

/// A sorted index (projection) on a table.
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct IndexInfo {
pub id: i32,
pub connection_id: String,
pub schema_name: String,
pub table_name: String,
pub index_name: String,
pub sort_columns: String, // JSON array of column names
pub parquet_path: Option<String>,
pub created_at: DateTime<Utc>,
}

impl IndexInfo {
/// Parse sort_columns JSON into a Vec<String>.
pub fn sort_columns_vec(&self) -> Vec<String> {
serde_json::from_str(&self.sort_columns).unwrap_or_default()
}

/// Parse sort_columns JSON into (column_name, is_descending) pairs.
/// Always returns ASC (false). Strips legacy " ASC"/" DESC" suffixes for backward compat.
pub fn sort_columns_with_direction(&self) -> Vec<(String, bool)> {
let cols: Vec<String> = serde_json::from_str(&self.sort_columns).unwrap_or_default();
cols.into_iter()
.map(|c| {
let c_upper = c.to_uppercase();
let name = if c_upper.ends_with(" DESC") {
c[..c.len() - 5].trim().to_string()
} else if c_upper.ends_with(" ASC") {
c[..c.len() - 4].trim().to_string()
} else {
c
};
(name, false)
})
.collect()
}
}

/// Async interface for catalog operations.
#[async_trait]
pub trait CatalogManager: Debug + Send + Sync {
Expand Down Expand Up @@ -385,4 +424,43 @@ pub trait CatalogManager: Debug + Send + Sync {

/// Delete a dataset by ID. Returns the deleted dataset if it existed.
async fn delete_dataset(&self, id: &str) -> Result<Option<DatasetInfo>>;

// Index management methods

/// Create a new index with its parquet path. Returns the index ID.
async fn create_index(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
index_name: &str,
sort_columns: &[String],
parquet_path: Option<&str>,
) -> Result<i32>;

/// Get an index by name.
async fn get_index(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
index_name: &str,
) -> Result<Option<IndexInfo>>;

/// List all indexes for a table.
async fn list_indexes(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
) -> Result<Vec<IndexInfo>>;

/// Delete an index. Returns the deleted index if it existed.
async fn delete_index(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
index_name: &str,
) -> Result<Option<IndexInfo>>;
}
47 changes: 45 additions & 2 deletions src/catalog/mock_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
//! in tests to avoid needing a real database.

use super::{
CatalogManager, ConnectionInfo, DatasetInfo, OptimisticLock, PendingDeletion, QueryResult,
ResultStatus, ResultUpdate, TableInfo, UploadInfo,
CatalogManager, ConnectionInfo, DatasetInfo, IndexInfo, OptimisticLock, PendingDeletion,
QueryResult, ResultStatus, ResultUpdate, TableInfo, UploadInfo,
};
use crate::secrets::{SecretMetadata, SecretStatus};
use anyhow::Result;
Expand Down Expand Up @@ -357,4 +357,47 @@ impl CatalogManager for MockCatalog {
async fn delete_dataset(&self, _id: &str) -> Result<Option<DatasetInfo>> {
Ok(None)
}

// Index methods - not implemented for mock

async fn create_index(
&self,
_connection_id: &str,
_schema_name: &str,
_table_name: &str,
_index_name: &str,
_sort_columns: &[String],
_parquet_path: Option<&str>,
) -> Result<i32> {
Ok(1)
}

async fn get_index(
&self,
_connection_id: &str,
_schema_name: &str,
_table_name: &str,
_index_name: &str,
) -> Result<Option<IndexInfo>> {
Ok(None)
}

async fn list_indexes(
&self,
_connection_id: &str,
_schema_name: &str,
_table_name: &str,
) -> Result<Vec<IndexInfo>> {
Ok(vec![])
}

async fn delete_index(
&self,
_connection_id: &str,
_schema_name: &str,
_table_name: &str,
_index_name: &str,
) -> Result<Option<IndexInfo>> {
Ok(None)
}
}
4 changes: 2 additions & 2 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ mod mock_catalog;
pub use mock_catalog::MockCatalog;

pub use manager::{
CatalogManager, ConnectionInfo, DatasetInfo, OptimisticLock, PendingDeletion, QueryResult,
QueryResultRow, ResultStatus, ResultUpdate, TableInfo, UploadInfo,
CatalogManager, ConnectionInfo, DatasetInfo, IndexInfo, OptimisticLock, PendingDeletion,
QueryResult, QueryResultRow, ResultStatus, ResultUpdate, TableInfo, UploadInfo,
};
pub use postgres_manager::PostgresCatalogManager;
pub use sqlite_manager::SqliteCatalogManager;
Expand Down
99 changes: 97 additions & 2 deletions src/catalog/postgres_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::catalog::backend::CatalogBackend;
use crate::catalog::manager::{
CatalogManager, ConnectionInfo, DatasetInfo, OptimisticLock, PendingDeletion, QueryResult,
QueryResultRow, ResultStatus, ResultUpdate, TableInfo, UploadInfo,
CatalogManager, ConnectionInfo, DatasetInfo, IndexInfo, OptimisticLock, PendingDeletion,
QueryResult, QueryResultRow, ResultStatus, ResultUpdate, TableInfo, UploadInfo,
};
use crate::catalog::migrations::{
run_migrations, wrap_migration_sql, CatalogMigrations, Migration, POSTGRES_MIGRATIONS,
Expand Down Expand Up @@ -895,6 +895,101 @@ impl CatalogManager for PostgresCatalogManager {

Ok(dataset)
}

// Index management methods

async fn create_index(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
index_name: &str,
sort_columns: &[String],
parquet_path: Option<&str>,
) -> Result<i32> {
let sort_columns_json = serde_json::to_string(sort_columns)?;

let row: (i32,) = sqlx::query_as(
"INSERT INTO indexes (connection_id, schema_name, table_name, index_name, sort_columns, parquet_path)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id",
)
.bind(connection_id)
.bind(schema_name)
.bind(table_name)
.bind(index_name)
.bind(&sort_columns_json)
.bind(parquet_path)
.fetch_one(self.backend.pool())
.await?;

Ok(row.0)
}

async fn get_index(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
index_name: &str,
) -> Result<Option<IndexInfo>> {
let row: Option<IndexInfo> = sqlx::query_as(
"SELECT id, connection_id, schema_name, table_name, index_name, sort_columns, parquet_path, created_at
FROM indexes
WHERE connection_id = $1 AND schema_name = $2 AND table_name = $3 AND index_name = $4",
)
.bind(connection_id)
.bind(schema_name)
.bind(table_name)
.bind(index_name)
.fetch_optional(self.backend.pool())
.await?;

Ok(row)
}

async fn list_indexes(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
) -> Result<Vec<IndexInfo>> {
let rows: Vec<IndexInfo> = sqlx::query_as(
"SELECT id, connection_id, schema_name, table_name, index_name, sort_columns, parquet_path, created_at
FROM indexes
WHERE connection_id = $1 AND schema_name = $2 AND table_name = $3
ORDER BY index_name",
)
.bind(connection_id)
.bind(schema_name)
.bind(table_name)
.fetch_all(self.backend.pool())
.await?;

Ok(rows)
}

async fn delete_index(
&self,
connection_id: &str,
schema_name: &str,
table_name: &str,
index_name: &str,
) -> Result<Option<IndexInfo>> {
let row: Option<IndexInfo> = sqlx::query_as(
"DELETE FROM indexes
WHERE connection_id = $1 AND schema_name = $2 AND table_name = $3 AND index_name = $4
RETURNING id, connection_id, schema_name, table_name, index_name, sort_columns, parquet_path, created_at",
)
.bind(connection_id)
.bind(schema_name)
.bind(table_name)
.bind(index_name)
.fetch_optional(self.backend.pool())
.await?;

Ok(row)
}
}

impl Debug for PostgresCatalogManager {
Expand Down
Loading