diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 189df3d..7471159 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -42,8 +42,8 @@ updates: patterns: - "*" - # Version updates for Python packages - - package-ecosystem: "pip" + # Version updates for Python packages (uv) + - package-ecosystem: "uv" directory: "/packages/nvisy-dal" schedule: interval: "weekly" @@ -64,7 +64,7 @@ updates: - "minor" - "patch" - - package-ecosystem: "pip" + - package-ecosystem: "uv" directory: "/packages/nvisy-rig" schedule: interval: "weekly" diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 25c98ae..d1c2985 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,6 +7,7 @@ on: - "release" paths: - "crates/**" + - "packages/**" - "migrations/**" - "scripts/**" - "Cargo.toml" @@ -18,6 +19,7 @@ on: branches: [main, release] paths: - "crates/**" + - "packages/**" - "migrations/**" - "scripts/**" - "Cargo.toml" @@ -36,8 +38,8 @@ concurrency: cancel-in-progress: true jobs: - fmt: - name: Format + fmt-rs: + name: Format (Rust) runs-on: ubuntu-latest steps: - name: Checkout @@ -51,8 +53,8 @@ jobs: - name: Check formatting run: cargo +nightly fmt --all -- --check - check: - name: Check & Clippy + check-rs: + name: Check & Clippy (Rust) runs-on: ubuntu-latest steps: - name: Checkout @@ -72,10 +74,10 @@ jobs: - name: Clippy run: cargo clippy --all-targets --all-features --workspace -- -D warnings - test: - name: Test + test-rs: + name: Test (Rust) runs-on: ubuntu-latest - needs: check + needs: check-rs env: POSTGRES_URL: postgresql://postgres:postgres@localhost:5432/postgres NATS_URL: nats://localhost:4222 @@ -110,10 +112,10 @@ jobs: if: always() run: docker compose -f docker/docker-compose.dev.yml down -v - docs: - name: Docs + docs-rs: + name: Docs (Rust) runs-on: ubuntu-latest - needs: check + needs: check-rs steps: - name: Checkout uses: actions/checkout@v6 @@ -128,3 +130,55 @@ jobs: run: cargo doc --no-deps --all-features --workspace env: RUSTDOCFLAGS: "-D warnings" + + lint-py: + name: Lint (Python) + runs-on: ubuntu-latest + strategy: + matrix: + package: [nvisy-dal] + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Install uv + uses: astral-sh/setup-uv@v5 + + - name: Set up Python + run: uv python install 3.13 + + - name: Install dependencies + working-directory: packages/${{ matrix.package }} + run: uv sync + + - name: Ruff check + working-directory: packages/${{ matrix.package }} + run: uv run ruff check src/ + + - name: Ruff format + working-directory: packages/${{ matrix.package }} + run: uv run ruff format --check src/ + + typecheck-py: + name: Typecheck (Python) + runs-on: ubuntu-latest + strategy: + matrix: + package: [nvisy-dal] + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Install uv + uses: astral-sh/setup-uv@v5 + + - name: Set up Python + run: uv python install 3.13 + + - name: Install dependencies + working-directory: packages/${{ matrix.package }} + run: uv sync --all-extras + + - name: Pyright + working-directory: packages/${{ matrix.package }} + run: uv run pyright src/ diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index 835a818..254aa65 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -5,6 +5,7 @@ on: branches: [main, release] paths: - "crates/**" + - "packages/**" - "Cargo.toml" - "Cargo.lock" - "deny.toml" @@ -13,6 +14,7 @@ on: branches: [main, release] paths: - "crates/**" + - "packages/**" - "Cargo.toml" - "Cargo.lock" - "deny.toml" @@ -29,8 +31,8 @@ concurrency: cancel-in-progress: true jobs: - deny: - name: Deny + deny-rs: + name: Deny (Rust) runs-on: ubuntu-latest steps: - name: Checkout @@ -44,3 +46,27 @@ jobs: - name: Run deny run: cargo deny check all + + audit-py: + name: Audit (Python) + runs-on: ubuntu-latest + strategy: + matrix: + package: [nvisy-dal] + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Install uv + uses: astral-sh/setup-uv@v5 + + - name: Set up Python + run: uv python install 3.13 + + - name: Install dependencies + working-directory: packages/${{ matrix.package }} + run: uv sync --all-extras + + - name: Audit dependencies + working-directory: packages/${{ matrix.package }} + run: uv run pip-audit diff --git a/Cargo.lock b/Cargo.lock index 5009ad4..39392a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2771,6 +2771,7 @@ dependencies = [ "pyo3-async-runtimes", "serde", "serde_json", + "strum", "thiserror 2.0.18", "tokio", "uuid", diff --git a/README.md b/README.md index 2405be6..d4e2d34 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ # Nvisy Server [![Build](https://img.shields.io/github/actions/workflow/status/nvisycom/server/build.yml?branch=main&label=build%20%26%20test&style=flat-square)](https://github.com/nvisycom/server/actions/workflows/build.yml) -[![Crates.io](https://img.shields.io/crates/v/nvisy-server?style=flat-square)](https://crates.io/crates/nvisy-server) -[![Docs](https://img.shields.io/docsrs/nvisy-server?style=flat-square&label=docs)](https://docs.rs/nvisy-server) High-performance backend server for the Nvisy document processing platform. @@ -14,22 +12,6 @@ High-performance backend server for the Nvisy document processing platform. - **Real-Time Updates** - AI streaming via SSE and job processing via NATS - **Interactive Docs** - Auto-generated OpenAPI with Scalar UI -## Architecture - -``` -server/ -├── crates/ -│ ├── nvisy-cli/ # Server binary with CLI and configuration -│ ├── nvisy-core/ # Shared types, errors, and utilities -│ ├── nvisy-nats/ # NATS client (streams, KV, object storage, jobs) -│ ├── nvisy-postgres/ # PostgreSQL database layer with Diesel ORM -│ ├── nvisy-rig/ # AI services (chat, RAG, embeddings) -│ ├── nvisy-server/ # HTTP handlers, middleware, pipeline, and OpenAPI -│ └── nvisy-webhook/ # Webhook delivery with HTTP client -├── migrations/ # PostgreSQL database migrations -└── Cargo.toml # Workspace configuration -``` - ## Quick Start ```bash diff --git a/crates/README.md b/crates/README.md index d092687..2c2e25b 100644 --- a/crates/README.md +++ b/crates/README.md @@ -1,6 +1,8 @@ # Crates [![Build](https://img.shields.io/github/actions/workflow/status/nvisycom/server/build.yml?branch=main&label=build%20%26%20test&style=flat-square)](https://github.com/nvisycom/server/actions/workflows/build.yml) +[![Crates.io](https://img.shields.io/crates/v/nvisy-server?style=flat-square)](https://crates.io/crates/nvisy-server) +[![Docs](https://img.shields.io/docsrs/nvisy-server?style=flat-square&label=docs)](https://docs.rs/nvisy-server) This directory contains the workspace crates for Nvisy Server. diff --git a/crates/nvisy-dal/Cargo.toml b/crates/nvisy-dal/Cargo.toml index 2331349..c8a1b20 100644 --- a/crates/nvisy-dal/Cargo.toml +++ b/crates/nvisy-dal/Cargo.toml @@ -37,6 +37,7 @@ serde_json = { workspace = true, features = [] } # Derive macros & utilities async-trait = { workspace = true, features = [] } derive_more = { workspace = true, features = ["from"] } +strum = { workspace = true, features = [] } thiserror = { workspace = true, features = [] } # Data types diff --git a/crates/nvisy-dal/src/core/contexts.rs b/crates/nvisy-dal/src/core/contexts.rs index cf31741..db6f36c 100644 --- a/crates/nvisy-dal/src/core/contexts.rs +++ b/crates/nvisy-dal/src/core/contexts.rs @@ -1,43 +1,44 @@ //! Context types for data operations. //! -//! Contexts carry state from previous runs to enable pagination and resumption. +//! Contexts carry state needed to resume reading from a specific position. +//! They only track *where* to resume, not *how much* to read (that's in Params). use derive_more::From; use serde::{Deserialize, Serialize}; /// Context for object storage operations (S3, GCS, Azure Blob). -#[derive(Debug, Clone, Default, Serialize, Deserialize)] +/// +/// Uses marker-based pagination (last seen key) which is portable across +/// S3, GCS, Azure Blob, and MinIO. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct ObjectContext { /// Path prefix for listing objects. pub prefix: Option, - /// Continuation token for pagination. + /// Last seen object key (used as StartAfter/marker for resumption). pub token: Option, - /// Maximum number of items to read. - pub limit: Option, } /// Context for relational database operations (Postgres, MySQL). -#[derive(Debug, Clone, Default, Serialize, Deserialize)] +/// +/// Uses keyset pagination which is more efficient than offset-based +/// pagination for large datasets and provides stable results. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct RelationalContext { /// Last seen cursor value (for keyset pagination). pub cursor: Option, /// Tiebreaker value for resolving cursor conflicts. pub tiebreaker: Option, - /// Maximum number of items to read. - pub limit: Option, } /// Context for vector database operations (Qdrant, Pinecone, pgvector). -#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct VectorContext { /// Continuation token or offset for pagination. pub token: Option, - /// Maximum number of items to read. - pub limit: Option, } /// Type-erased context for runtime dispatch. -#[derive(Debug, Clone, Default, From, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, PartialEq, Eq, From, Serialize, Deserialize)] #[serde(tag = "type", content = "data", rename_all = "snake_case")] pub enum AnyContext { /// No context / empty state. @@ -52,24 +53,27 @@ pub enum AnyContext { } impl AnyContext { - /// Returns the limit if set in any context type. - pub fn limit(&self) -> Option { + /// Returns a reference to the object context if this is an object context. + pub fn as_object(&self) -> Option<&ObjectContext> { match self { - Self::None => None, - Self::Object(ctx) => ctx.limit, - Self::Relational(ctx) => ctx.limit, - Self::Vector(ctx) => ctx.limit, + Self::Object(ctx) => Some(ctx), + _ => None, } } - /// Sets the limit on the inner context. - pub fn with_limit(mut self, limit: usize) -> Self { - match &mut self { - Self::None => {} - Self::Object(ctx) => ctx.limit = Some(limit), - Self::Relational(ctx) => ctx.limit = Some(limit), - Self::Vector(ctx) => ctx.limit = Some(limit), + /// Returns a reference to the relational context if this is a relational context. + pub fn as_relational(&self) -> Option<&RelationalContext> { + match self { + Self::Relational(ctx) => Some(ctx), + _ => None, + } + } + + /// Returns a reference to the vector context if this is a vector context. + pub fn as_vector(&self) -> Option<&VectorContext> { + match self { + Self::Vector(ctx) => Some(ctx), + _ => None, } - self } } diff --git a/crates/nvisy-dal/src/core/mod.rs b/crates/nvisy-dal/src/core/mod.rs index 986279d..55d49a9 100644 --- a/crates/nvisy-dal/src/core/mod.rs +++ b/crates/nvisy-dal/src/core/mod.rs @@ -1,35 +1,54 @@ //! Core types and traits for data operations. -mod contexts; -mod datatypes; -mod params; -mod streams; - -pub use contexts::{AnyContext, ObjectContext, RelationalContext, VectorContext}; -pub use datatypes::{ - AnyDataValue, DataType, Document, Edge, Embedding, Graph, Message, Metadata, Node, Object, - Record, -}; -pub use nvisy_core::Provider; -pub use params::{DistanceMetric, ObjectParams, RelationalParams, VectorParams}; -pub use streams::{InputStream, ItemSink, ItemStream, OutputStream}; +pub mod contexts; +pub mod datatypes; +pub mod params; +pub mod streams; + +use streams::InputStream; use crate::Result; +/// Data paired with context for resumable streaming. +/// +/// When reading from a data source, each item is paired with a context +/// that represents the state needed to resume reading after that item. +/// This allows for efficient recovery if streaming is interrupted. +#[derive(Debug, Clone, PartialEq)] +pub struct Resumable { + /// The data item. + pub data: T, + /// The context for resuming after this item. + pub context: C, +} + +impl Resumable { + /// Creates a new resumable item. + pub fn new(data: T, context: C) -> Self { + Self { data, context } + } +} + /// Trait for reading data from a source. /// /// Implementations provide streaming access to data with optional pagination. +/// Each item is paired with a context that can be used to resume reading +/// from that point if the stream is interrupted. #[async_trait::async_trait] pub trait DataInput: Send + Sync { /// The item type produced by this provider. - type Item; + type Datatype; /// The context type for read operations. type Context; /// Reads items from the source. /// - /// Returns an input stream containing items. - async fn read(&self, ctx: &Self::Context) -> Result>; + /// Returns an input stream of [`Resumable`] items. + /// The context represents the state needed to resume reading after that item. + async fn read( + &self, + ctx: &Self::Context, + ) -> Result>>; } /// Trait for writing data to a sink. @@ -38,8 +57,8 @@ pub trait DataInput: Send + Sync { #[async_trait::async_trait] pub trait DataOutput: Send + Sync { /// The item type accepted by this provider. - type Item; + type Datatype; /// Writes a batch of items to the sink. - async fn write(&self, items: Vec) -> Result<()>; + async fn write(&self, items: Vec) -> Result<()>; } diff --git a/crates/nvisy-dal/src/core/params.rs b/crates/nvisy-dal/src/core/params.rs index 2f34efe..e401a9f 100644 --- a/crates/nvisy-dal/src/core/params.rs +++ b/crates/nvisy-dal/src/core/params.rs @@ -6,11 +6,13 @@ use serde::{Deserialize, Serialize}; /// Common parameters for relational database operations. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct RelationalParams { /// Target table name. + pub table: String, + /// Columns to select. If None, selects all columns. #[serde(skip_serializing_if = "Option::is_none")] - pub table: Option, + pub columns: Option>, /// Column to use for cursor-based pagination (e.g., "id", "created_at"). #[serde(skip_serializing_if = "Option::is_none")] pub cursor_column: Option, @@ -23,11 +25,10 @@ pub struct RelationalParams { } /// Common parameters for object storage operations. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ObjectParams { /// Bucket name (S3 bucket, GCS bucket, Azure container). - #[serde(skip_serializing_if = "Option::is_none")] - pub bucket: Option, + pub bucket: String, /// Default prefix for object keys. #[serde(skip_serializing_if = "Option::is_none")] pub prefix: Option, @@ -37,11 +38,10 @@ pub struct ObjectParams { } /// Common parameters for vector database operations. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct VectorParams { /// Collection or index name (Pinecone index, Qdrant collection). - #[serde(skip_serializing_if = "Option::is_none")] - pub collection: Option, + pub collection: String, /// Dimension of vectors (required for some providers). #[serde(skip_serializing_if = "Option::is_none")] pub dimension: Option, @@ -54,7 +54,7 @@ pub struct VectorParams { } /// Distance metric for vector similarity search. -#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Default, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum DistanceMetric { /// Cosine similarity (default). diff --git a/crates/nvisy-dal/src/lib.rs b/crates/nvisy-dal/src/lib.rs index 4949cd5..13035b4 100644 --- a/crates/nvisy-dal/src/lib.rs +++ b/crates/nvisy-dal/src/lib.rs @@ -12,18 +12,34 @@ #![forbid(unsafe_code)] #![cfg_attr(docsrs, feature(doc_cfg))] -pub mod core; +mod core; +mod error; +mod runtime; + pub mod provider; -mod python; +pub mod contexts { + //! Context types for pagination and filtering. + pub use crate::core::contexts::*; +} -mod error; +pub mod datatypes { + //! Data types for storage operations. + pub use crate::core::datatypes::*; +} + +pub mod params { + //! Parameter types for provider configuration. + pub use crate::core::params::*; +} + +pub mod streams { + //! Stream types for data input/output. + pub use crate::core::streams::*; +} -pub use core::{ - AnyContext, AnyDataValue, DataInput, DataOutput, DataType, Document, Edge, Embedding, Graph, - InputStream, ItemSink, ItemStream, Message, Metadata, Node, Object, ObjectContext, - OutputStream, Provider, Record, RelationalContext, VectorContext, -}; +pub use core::{DataInput, DataOutput, Resumable}; pub use error::{BoxError, Error, ErrorKind, Result}; +pub use nvisy_core::Provider; pub use provider::{AnyCredentials, AnyParams, AnyProvider}; diff --git a/crates/nvisy-dal/src/provider/mod.rs b/crates/nvisy-dal/src/provider/mod.rs index 9db8751..3d2e3ee 100644 --- a/crates/nvisy-dal/src/provider/mod.rs +++ b/crates/nvisy-dal/src/provider/mod.rs @@ -20,6 +20,7 @@ use derive_more::From; use serde::{Deserialize, Serialize}; +use strum::AsRefStr; mod pinecone; mod postgres; @@ -30,8 +31,9 @@ pub use self::postgres::{PostgresCredentials, PostgresParams, PostgresProvider}; pub use self::s3::{S3Credentials, S3Params, S3Provider}; /// Type-erased credentials for any provider. -#[derive(Debug, Clone, From, Serialize, Deserialize)] +#[derive(Debug, Clone, From, AsRefStr, Serialize, Deserialize)] #[serde(tag = "type", content = "data", rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] pub enum AnyCredentials { /// PostgreSQL credentials. Postgres(PostgresCredentials), @@ -42,8 +44,9 @@ pub enum AnyCredentials { } /// Type-erased parameters for any provider. -#[derive(Debug, Clone, From, Serialize, Deserialize)] +#[derive(Debug, Clone, From, AsRefStr, Serialize, Deserialize)] #[serde(tag = "type", content = "data", rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] pub enum AnyParams { /// PostgreSQL parameters. Postgres(PostgresParams), @@ -63,3 +66,144 @@ pub enum AnyProvider { /// Pinecone provider. Pinecone(PineconeProvider), } + +use futures::StreamExt; + +use crate::contexts::AnyContext; +use crate::datatypes::AnyDataValue; +use crate::streams::InputStream; +use crate::{DataInput, DataOutput, Error, Result, Resumable}; + +#[async_trait::async_trait] +impl crate::Provider for AnyProvider { + type Credentials = AnyCredentials; + type Params = AnyParams; + + async fn connect( + params: Self::Params, + credentials: Self::Credentials, + ) -> nvisy_core::Result { + match (params, credentials) { + (AnyParams::Postgres(params), AnyCredentials::Postgres(credentials)) => { + let provider = PostgresProvider::connect(params, credentials).await?; + Ok(Self::Postgres(provider)) + } + (AnyParams::S3(params), AnyCredentials::S3(credentials)) => { + let provider = S3Provider::connect(params, credentials).await?; + Ok(Self::S3(provider)) + } + (AnyParams::Pinecone(params), AnyCredentials::Pinecone(credentials)) => { + let provider = PineconeProvider::connect(params, credentials).await?; + Ok(Self::Pinecone(provider)) + } + (params, credentials) => Err(nvisy_core::Error::new( + nvisy_core::ErrorKind::InvalidInput, + ) + .with_message(format!( + "mismatched provider types: params={}, credentials={}", + params.as_ref(), + credentials.as_ref() + ))), + } + } + + async fn disconnect(self) -> nvisy_core::Result<()> { + match self { + Self::Postgres(provider) => provider.disconnect().await, + Self::S3(provider) => provider.disconnect().await, + Self::Pinecone(provider) => provider.disconnect().await, + } + } +} + +#[async_trait::async_trait] +impl DataInput for AnyProvider { + type Context = AnyContext; + type Datatype = AnyDataValue; + + async fn read( + &self, + ctx: &Self::Context, + ) -> Result>> { + match self { + Self::Postgres(provider) => { + let ctx = ctx.as_relational().cloned().unwrap_or_default(); + let stream = provider.read(&ctx).await?; + let mapped = stream.map(|r| { + r.map(|item| { + Resumable::new( + AnyDataValue::from(item.data), + AnyContext::Relational(item.context), + ) + }) + }); + Ok(InputStream::new(Box::pin(mapped))) + } + Self::S3(provider) => { + let ctx = ctx.as_object().cloned().unwrap_or_default(); + let stream = provider.read(&ctx).await?; + let mapped = stream.map(|r| { + r.map(|item| { + Resumable::new( + AnyDataValue::from(item.data), + AnyContext::Object(item.context), + ) + }) + }); + Ok(InputStream::new(Box::pin(mapped))) + } + Self::Pinecone(_) => Err(Error::invalid_input( + "Pinecone provider does not support reading", + )), + } + } +} + +#[async_trait::async_trait] +impl DataOutput for AnyProvider { + type Datatype = AnyDataValue; + + async fn write(&self, items: Vec) -> Result<()> { + match self { + Self::Postgres(provider) => { + let records: Result> = items + .into_iter() + .map(|item| match item { + AnyDataValue::Record(r) => Ok(r), + other => Err(Error::invalid_input(format!( + "expected Record, got {:?}", + std::mem::discriminant(&other) + ))), + }) + .collect(); + provider.write(records?).await + } + Self::S3(provider) => { + let objects: Result> = items + .into_iter() + .map(|item| match item { + AnyDataValue::Object(o) => Ok(o), + other => Err(Error::invalid_input(format!( + "expected Object, got {:?}", + std::mem::discriminant(&other) + ))), + }) + .collect(); + provider.write(objects?).await + } + Self::Pinecone(provider) => { + let embeddings: Result> = items + .into_iter() + .map(|item| match item { + AnyDataValue::Embedding(e) => Ok(e), + other => Err(Error::invalid_input(format!( + "expected Embedding, got {:?}", + std::mem::discriminant(&other) + ))), + }) + .collect(); + provider.write(embeddings?).await + } + } + } +} diff --git a/crates/nvisy-dal/src/provider/pinecone.rs b/crates/nvisy-dal/src/provider/pinecone.rs index e78ba1d..5a8f14d 100644 --- a/crates/nvisy-dal/src/provider/pinecone.rs +++ b/crates/nvisy-dal/src/provider/pinecone.rs @@ -4,9 +4,9 @@ use serde::{Deserialize, Serialize}; -use crate::Result; -use crate::core::{DataOutput, Embedding, Provider}; -use crate::python::{self, PyDataOutput, PyProvider}; +use crate::datatypes::Embedding; +use crate::runtime::{self, PyDataOutput, PyProvider}; +use crate::{DataOutput, Provider, Result}; /// Credentials for Pinecone connection. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -16,7 +16,7 @@ pub struct PineconeCredentials { } /// Parameters for Pinecone operations. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct PineconeParams { /// Index name. pub index_name: String, @@ -39,7 +39,7 @@ impl Provider for PineconeProvider { params: Self::Params, credentials: Self::Credentials, ) -> nvisy_core::Result { - let inner = python::connect("pinecone", credentials, params).await?; + let inner = runtime::connect("pinecone", credentials, params).await?; Ok(Self { output: inner.as_data_output(), inner, @@ -53,9 +53,9 @@ impl Provider for PineconeProvider { #[async_trait::async_trait] impl DataOutput for PineconeProvider { - type Item = Embedding; + type Datatype = Embedding; - async fn write(&self, items: Vec) -> Result<()> { + async fn write(&self, items: Vec) -> Result<()> { self.output.write(items).await } } diff --git a/crates/nvisy-dal/src/provider/postgres.rs b/crates/nvisy-dal/src/provider/postgres.rs index d7bcc82..aa26e08 100644 --- a/crates/nvisy-dal/src/provider/postgres.rs +++ b/crates/nvisy-dal/src/provider/postgres.rs @@ -4,11 +4,12 @@ use serde::{Deserialize, Serialize}; -use crate::Result; -use crate::core::{ - DataInput, DataOutput, InputStream, Provider, Record, RelationalContext, RelationalParams, -}; -use crate::python::{self, PyDataInput, PyDataOutput, PyProvider}; +use crate::contexts::RelationalContext; +use crate::datatypes::Record; +use crate::params::RelationalParams; +use crate::runtime::{self, PyDataInput, PyDataOutput, PyProvider}; +use crate::streams::InputStream; +use crate::{DataInput, DataOutput, Provider, Result, Resumable}; /// Credentials for PostgreSQL connection. /// @@ -20,7 +21,7 @@ pub struct PostgresCredentials { } /// Parameters for PostgreSQL operations. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct PostgresParams { /// Schema name (defaults to "public"). #[serde(default = "default_schema")] @@ -50,7 +51,7 @@ impl Provider for PostgresProvider { params: Self::Params, credentials: Self::Credentials, ) -> nvisy_core::Result { - let inner = python::connect("postgres", credentials, params).await?; + let inner = runtime::connect("postgres", credentials, params).await?; Ok(Self { input: inner.as_data_input(), output: inner.as_data_output(), @@ -66,18 +67,21 @@ impl Provider for PostgresProvider { #[async_trait::async_trait] impl DataInput for PostgresProvider { type Context = RelationalContext; - type Item = Record; + type Datatype = Record; - async fn read(&self, ctx: &Self::Context) -> Result> { + async fn read( + &self, + ctx: &Self::Context, + ) -> Result>> { self.input.read(ctx).await } } #[async_trait::async_trait] impl DataOutput for PostgresProvider { - type Item = Record; + type Datatype = Record; - async fn write(&self, items: Vec) -> Result<()> { + async fn write(&self, items: Vec) -> Result<()> { self.output.write(items).await } } diff --git a/crates/nvisy-dal/src/provider/s3.rs b/crates/nvisy-dal/src/provider/s3.rs index 75ded4d..22b1eb0 100644 --- a/crates/nvisy-dal/src/provider/s3.rs +++ b/crates/nvisy-dal/src/provider/s3.rs @@ -4,9 +4,12 @@ use serde::{Deserialize, Serialize}; -use crate::Result; -use crate::core::{DataInput, DataOutput, InputStream, Object, ObjectContext, Provider}; -use crate::python::{self, PyDataInput, PyDataOutput, PyProvider}; +use crate::contexts::ObjectContext; +use crate::datatypes::Object; +use crate::params::ObjectParams; +use crate::runtime::{self, PyDataInput, PyDataOutput, PyProvider}; +use crate::streams::InputStream; +use crate::{DataInput, DataOutput, Provider, Result, Resumable}; /// Credentials for S3 connection. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -23,12 +26,18 @@ pub struct S3Credentials { } /// Parameters for S3 operations. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct S3Params { - /// Target bucket name. - pub bucket: String, - /// Key prefix for all operations. - pub prefix: String, + /// Default content type for uploaded objects. + #[serde(default = "default_content_type")] + pub content_type: String, + /// Object storage parameters (bucket, prefix, batch_size). + #[serde(flatten)] + pub object: ObjectParams, +} + +fn default_content_type() -> String { + "application/octet-stream".to_string() } /// S3 provider for object storage operations. @@ -47,7 +56,7 @@ impl Provider for S3Provider { params: Self::Params, credentials: Self::Credentials, ) -> nvisy_core::Result { - let inner = python::connect("s3", credentials, params).await?; + let inner = runtime::connect("s3", credentials, params).await?; Ok(Self { input: inner.as_data_input(), output: inner.as_data_output(), @@ -63,18 +72,21 @@ impl Provider for S3Provider { #[async_trait::async_trait] impl DataInput for S3Provider { type Context = ObjectContext; - type Item = Object; + type Datatype = Object; - async fn read(&self, ctx: &Self::Context) -> Result> { + async fn read( + &self, + ctx: &Self::Context, + ) -> Result>> { self.input.read(ctx).await } } #[async_trait::async_trait] impl DataOutput for S3Provider { - type Item = Object; + type Datatype = Object; - async fn write(&self, items: Vec) -> Result<()> { + async fn write(&self, items: Vec) -> Result<()> { self.output.write(items).await } } diff --git a/crates/nvisy-dal/src/python/error.rs b/crates/nvisy-dal/src/runtime/error.rs similarity index 100% rename from crates/nvisy-dal/src/python/error.rs rename to crates/nvisy-dal/src/runtime/error.rs diff --git a/crates/nvisy-dal/src/python/loader.rs b/crates/nvisy-dal/src/runtime/loader.rs similarity index 100% rename from crates/nvisy-dal/src/python/loader.rs rename to crates/nvisy-dal/src/runtime/loader.rs diff --git a/crates/nvisy-dal/src/python/mod.rs b/crates/nvisy-dal/src/runtime/mod.rs similarity index 100% rename from crates/nvisy-dal/src/python/mod.rs rename to crates/nvisy-dal/src/runtime/mod.rs diff --git a/crates/nvisy-dal/src/python/provider.rs b/crates/nvisy-dal/src/runtime/provider.rs similarity index 79% rename from crates/nvisy-dal/src/python/provider.rs rename to crates/nvisy-dal/src/runtime/provider.rs index 0a6f67e..a3d5b26 100644 --- a/crates/nvisy-dal/src/python/provider.rs +++ b/crates/nvisy-dal/src/runtime/provider.rs @@ -4,12 +4,14 @@ use std::marker::PhantomData; use async_stream::try_stream; use futures::Stream; -use pyo3::prelude::*; +use pyo3::exceptions::PyStopAsyncIteration; +use pyo3::types::PyAnyMethods; +use pyo3::{Py, PyAny, Python}; use super::PyError; -use super::loader::pyobject_to_json; -use crate::Result; -use crate::core::{DataInput, DataOutput, InputStream}; +use super::loader::{json_to_pydict, json_to_pyobject, pyobject_to_json}; +use crate::streams::InputStream; +use crate::{DataInput, DataOutput, Result, Resumable}; /// A wrapper around a Python provider instance. /// @@ -85,19 +87,22 @@ impl PyDataInput { impl DataInput for PyDataInput where T: for<'de> serde::Deserialize<'de> + Send + Sync + 'static, - Ctx: serde::Serialize + Send + Sync, + Ctx: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + Sync + 'static, { type Context = Ctx; - type Item = T; + type Datatype = T; - async fn read(&self, ctx: &Self::Context) -> Result> { + async fn read( + &self, + ctx: &Self::Context, + ) -> Result>> { let ctx_json = serde_json::to_value(ctx) .map_err(|e| PyError::conversion(format!("Failed to serialize context: {}", e)))?; // Call Python read method which returns an async iterator let coro = Python::attach(|py| { let bound = self.provider.instance.bind(py); - let ctx_dict = super::loader::json_to_pydict(py, &ctx_json)?; + let ctx_dict = json_to_pydict(py, &ctx_json)?; let coro = bound .call_method1("read", (ctx_dict,)) .map_err(|e| PyError::call_failed(format!("Failed to call read: {}", e)))?; @@ -110,7 +115,8 @@ where .map_err(|e| PyError::call_failed(format!("Failed to call read: {}", e)))?; // Create a stream that pulls from the Python async iterator - let stream = py_async_iterator_to_stream::(py_iterator); + // Python yields (item, context) tuples, we convert to Resumable + let stream = py_async_iterator_to_stream::(py_iterator); Ok(InputStream::new(Box::pin(stream))) } } @@ -136,15 +142,15 @@ impl DataOutput for PyDataOutput where T: serde::Serialize + Send + Sync, { - type Item = T; + type Datatype = T; - async fn write(&self, items: Vec) -> Result<()> { + async fn write(&self, items: Vec) -> Result<()> { let items_json = serde_json::to_value(&items) .map_err(|e| PyError::conversion(format!("Failed to serialize items: {}", e)))?; let coro = Python::attach(|py| { let bound = self.provider.instance.bind(py); - let items_list = super::loader::json_to_pyobject(py, &items_json)?; + let items_list = json_to_pyobject(py, &items_json)?; let coro = bound .call_method1("write", (items_list,)) .map_err(|e| PyError::call_failed(format!("Failed to call write: {}", e)))?; @@ -159,10 +165,15 @@ where } } -/// Converts a Python async iterator to a Rust Stream. -fn py_async_iterator_to_stream(iterator: Py) -> impl Stream> +/// Converts a Python async iterator to a Rust Stream of Resumable items. +/// +/// Python yields `(data, context)` tuples which are converted to `Resumable`. +fn py_async_iterator_to_stream( + iterator: Py, +) -> impl Stream>> where T: for<'de> serde::Deserialize<'de> + Send + 'static, + C: for<'de> serde::Deserialize<'de> + Send + 'static, { try_stream! { loop { @@ -175,7 +186,7 @@ where Ok(Some(future)) } Err(e) => { - if e.is_instance_of::(py) { + if e.is_instance_of::(py) { Ok(None) } else { Err(PyError::from(e)) @@ -191,12 +202,12 @@ where // Await the coroutine let result = coro.await.map_err(PyError::from)?; - // Convert result to Rust type + // Convert Python (data, context) tuple to Resumable let json_value = Python::attach(|py| pyobject_to_json(result.bind(py)))?; - let item: T = serde_json::from_value(json_value) + let (data, context): (T, C) = serde_json::from_value(json_value) .map_err(|e| PyError::conversion(format!("Failed to deserialize item: {}", e)))?; - yield item; + yield Resumable::new(data, context); } } } diff --git a/crates/nvisy-runtime/src/definition/edge.rs b/crates/nvisy-runtime/src/definition/edge.rs index aaca631..6d1fe77 100644 --- a/crates/nvisy-runtime/src/definition/edge.rs +++ b/crates/nvisy-runtime/src/definition/edge.rs @@ -1,18 +1,11 @@ //! Edge types for connecting nodes in a workflow graph. -use derive_builder::Builder; use serde::{Deserialize, Serialize}; use super::NodeId; /// An edge connecting two nodes in the workflow graph. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Builder)] -#[builder( - name = "EdgeBuilder", - pattern = "owned", - setter(into, strip_option, prefix = "with"), - build_fn(validate = "Self::validate") -)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Edge { /// Source node ID. pub from: NodeId, @@ -20,39 +13,8 @@ pub struct Edge { pub to: NodeId, /// Optional port/slot name on the source node. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub from_port: Option, /// Optional port/slot name on the target node. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub to_port: Option, } - -impl EdgeBuilder { - fn validate(&self) -> Result<(), String> { - if self.from.is_none() { - return Err("from is required".into()); - } - if self.to.is_none() { - return Err("to is required".into()); - } - Ok(()) - } -} - -impl Edge { - /// Creates a new edge between two nodes. - pub fn new(from: NodeId, to: NodeId) -> Self { - Self { - from, - to, - from_port: None, - to_port: None, - } - } - - /// Returns a builder for creating an edge. - pub fn builder() -> EdgeBuilder { - EdgeBuilder::default() - } -} diff --git a/crates/nvisy-runtime/src/definition/input.rs b/crates/nvisy-runtime/src/definition/input.rs index b8126fa..aa66a4e 100644 --- a/crates/nvisy-runtime/src/definition/input.rs +++ b/crates/nvisy-runtime/src/definition/input.rs @@ -1,34 +1,48 @@ //! Input node definition types. +use nvisy_dal::provider::{AnyParams, PostgresParams, S3Params}; use serde::{Deserialize, Serialize}; +use uuid::Uuid; use super::route::CacheSlot; /// Input node definition - source of data for the workflow. -/// -/// Storage provider inputs (S3, Postgres, etc.) are handled externally via Python. -/// This enum only supports cache slots for internal workflow data flow. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(tag = "source", rename_all = "snake_case")] pub enum Input { + /// Read from a storage provider. + Provider(ProviderInput), /// Read from named cache slot (resolved at compile time). CacheSlot(CacheSlot), } -impl Input { - /// Creates a new input from a cache slot. - pub fn from_cache(slot: impl Into) -> Self { - Self::CacheSlot(CacheSlot { - slot: slot.into(), - priority: None, - }) - } +/// Provider-based input configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ProviderInput { + /// Credentials ID for the provider. + pub credentials_id: Uuid, + /// Provider-specific parameters. + #[serde(flatten)] + pub params: InputParams, +} + +/// Type-erased parameters for input providers. +/// +/// Only includes providers that support reading data (DataInput). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "provider", content = "params", rename_all = "snake_case")] +pub enum InputParams { + /// PostgreSQL parameters. + Postgres(PostgresParams), + /// S3 parameters. + S3(S3Params), +} - /// Creates a new input from a cache slot with priority. - pub fn from_cache_with_priority(slot: impl Into, priority: u32) -> Self { - Self::CacheSlot(CacheSlot { - slot: slot.into(), - priority: Some(priority), - }) +impl From for AnyParams { + fn from(params: InputParams) -> Self { + match params { + InputParams::Postgres(p) => AnyParams::Postgres(p), + InputParams::S3(p) => AnyParams::S3(p), + } } } diff --git a/crates/nvisy-runtime/src/definition/metadata.rs b/crates/nvisy-runtime/src/definition/metadata.rs index 7e705cb..a7ebf2c 100644 --- a/crates/nvisy-runtime/src/definition/metadata.rs +++ b/crates/nvisy-runtime/src/definition/metadata.rs @@ -1,60 +1,28 @@ //! Workflow metadata. -use derive_builder::Builder; use jiff::Timestamp; use semver::Version; use serde::{Deserialize, Serialize}; /// Workflow metadata. -#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Builder)] -#[builder( - name = "WorkflowMetadataBuilder", - pattern = "owned", - setter(into, strip_option, prefix = "with"), - build_fn(validate = "Self::validate") -)] +#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)] pub struct WorkflowMetadata { /// Workflow name (optional). #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub name: Option, /// Workflow description. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub description: Option, /// Workflow version (semver, optional). #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub version: Option, /// Tags for organization. #[serde(default, skip_serializing_if = "Vec::is_empty")] - #[builder(default)] pub tags: Vec, /// Creation timestamp. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub created_at: Option, /// Last update timestamp. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub updated_at: Option, } - -impl WorkflowMetadataBuilder { - fn validate(&self) -> Result<(), String> { - // All fields are optional, so validation always succeeds - Ok(()) - } -} - -impl WorkflowMetadata { - /// Creates a new empty metadata. - pub fn new() -> Self { - Self::default() - } - - /// Returns a builder for creating workflow metadata. - pub fn builder() -> WorkflowMetadataBuilder { - WorkflowMetadataBuilder::default() - } -} diff --git a/crates/nvisy-runtime/src/definition/mod.rs b/crates/nvisy-runtime/src/definition/mod.rs index cbc9206..853f1a5 100644 --- a/crates/nvisy-runtime/src/definition/mod.rs +++ b/crates/nvisy-runtime/src/definition/mod.rs @@ -23,10 +23,10 @@ mod transform; mod util; pub use edge::Edge; -pub use input::Input; +pub use input::{Input, InputParams, ProviderInput}; pub use metadata::WorkflowMetadata; pub use node::{Node, NodeId, NodeKind}; -pub use output::Output; +pub use output::{Output, OutputParams, ProviderOutput}; pub use route::{ CacheSlot, FileCategory, FileCategoryCondition, LanguageCondition, SwitchCondition, SwitchDef, }; @@ -41,7 +41,7 @@ pub use util::Position; /// /// This is the JSON-friendly representation of a workflow graph. /// It contains all the information needed to compile and execute a workflow. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] pub struct Workflow { /// Nodes in the workflow, keyed by their ID. pub nodes: HashMap, @@ -52,76 +52,6 @@ pub struct Workflow { pub metadata: WorkflowMetadata, } -impl Workflow { - /// Creates a new empty workflow definition. - pub fn new() -> Self { - Self { - nodes: HashMap::new(), - edges: Vec::new(), - metadata: WorkflowMetadata::default(), - } - } - - /// Creates a workflow definition with metadata. - pub fn with_metadata(metadata: WorkflowMetadata) -> Self { - Self { - nodes: HashMap::new(), - edges: Vec::new(), - metadata, - } - } - - /// Adds a node to the workflow. - pub fn add_node(&mut self, id: NodeId, node: Node) -> &mut Self { - self.nodes.insert(id, node); - self - } - - /// Adds a node definition with default metadata. - pub fn add_node_def(&mut self, id: NodeId, def: NodeKind) -> &mut Self { - self.nodes.insert(id, Node::new(def)); - self - } - - /// Adds an edge to the workflow. - pub fn add_edge(&mut self, edge: Edge) -> &mut Self { - self.edges.push(edge); - self - } - - /// Adds a simple edge between two nodes. - pub fn connect(&mut self, from: NodeId, to: NodeId) -> &mut Self { - self.edges.push(Edge::new(from, to)); - self - } - - /// Returns an iterator over input nodes. - pub fn input_nodes(&self) -> impl Iterator { - self.nodes.iter().filter(|(_, node)| node.is_input()) - } - - /// Returns an iterator over output nodes. - pub fn output_nodes(&self) -> impl Iterator { - self.nodes.iter().filter(|(_, node)| node.is_output()) - } - - /// Returns an iterator over transform nodes. - pub fn transform_nodes(&self) -> impl Iterator { - self.nodes.iter().filter(|(_, node)| node.is_transform()) - } - - /// Returns an iterator over switch nodes. - pub fn switch_nodes(&self) -> impl Iterator { - self.nodes.iter().filter(|(_, node)| node.is_switch()) - } -} - -impl Default for Workflow { - fn default() -> Self { - Self::new() - } -} - #[cfg(test)] mod tests { use uuid::Uuid; @@ -130,55 +60,75 @@ mod tests { /// Creates a deterministic NodeId for testing. fn test_node_id(n: u128) -> NodeId { - NodeId::from_uuid(Uuid::from_u128(n)) + Uuid::from_u128(n).into() } fn input_node_cache(slot: &str) -> Node { - Node::new(NodeKind::Input(Input::CacheSlot(CacheSlot { - slot: slot.to_string(), - priority: None, - }))) + Node { + name: None, + description: None, + position: None, + kind: NodeKind::Input(Input::CacheSlot(CacheSlot { + slot: slot.to_string(), + priority: None, + })), + } } fn output_node_cache(slot: &str) -> Node { - Node::new(NodeKind::Output(Output::Cache(CacheSlot { - slot: slot.to_string(), - priority: None, - }))) + Node { + name: None, + description: None, + position: None, + kind: NodeKind::Output(Output::CacheSlot(CacheSlot { + slot: slot.to_string(), + priority: None, + })), + } } fn transform_node_partition() -> Node { - Node::new(NodeKind::Transform(Transformer::Partition(Partition { - strategy: Default::default(), - include_page_breaks: false, - discard_unsupported: false, - }))) + Node { + name: None, + description: None, + position: None, + kind: NodeKind::Transform(Transformer::Partition(Partition { + strategy: Default::default(), + include_page_breaks: false, + discard_unsupported: false, + })), + } } #[test] fn test_workflow_definition_new() { - let def = Workflow::new(); + let def = Workflow::default(); assert!(def.nodes.is_empty()); assert!(def.edges.is_empty()); } #[test] fn test_workflow_definition_add_node() { - let mut def = Workflow::new(); + let mut def = Workflow::default(); let id = test_node_id(1); - def.add_node(id, input_node_cache("test")); + def.nodes.insert(id, input_node_cache("test")); assert_eq!(def.nodes.len(), 1); assert!(def.nodes.contains_key(&id)); } #[test] fn test_workflow_definition_connect() { - let mut def = Workflow::new(); + let mut def = Workflow::default(); let id1 = test_node_id(1); let id2 = test_node_id(2); - def.add_node(id1, input_node_cache("in")) - .add_node(id2, output_node_cache("out")) - .connect(id1, id2); + def.nodes.insert(id1, input_node_cache("in")); + def.nodes.insert(id2, output_node_cache("out")); + def.edges.push(Edge { + from: id1, + to: id2, + from_port: None, + to_port: None, + }); assert_eq!(def.edges.len(), 1); assert_eq!(def.edges[0].from, id1); @@ -187,24 +137,30 @@ mod tests { #[test] fn test_workflow_definition_node_iterators() { - let mut def = Workflow::new(); - def.add_node(test_node_id(1), input_node_cache("in")) - .add_node(test_node_id(2), transform_node_partition()) - .add_node(test_node_id(3), output_node_cache("out")); - - assert_eq!(def.input_nodes().count(), 1); - assert_eq!(def.transform_nodes().count(), 1); - assert_eq!(def.output_nodes().count(), 1); + let mut def = Workflow::default(); + def.nodes.insert(test_node_id(1), input_node_cache("in")); + def.nodes + .insert(test_node_id(2), transform_node_partition()); + def.nodes.insert(test_node_id(3), output_node_cache("out")); + + assert_eq!(def.nodes.values().filter(|n| n.is_input()).count(), 1); + assert_eq!(def.nodes.values().filter(|n| n.is_transform()).count(), 1); + assert_eq!(def.nodes.values().filter(|n| n.is_output()).count(), 1); } #[test] fn test_workflow_definition_serialization() { - let mut def = Workflow::new(); + let mut def = Workflow::default(); let id1 = test_node_id(1); let id2 = test_node_id(2); - def.add_node(id1, input_node_cache("in")) - .add_node(id2, output_node_cache("out")) - .connect(id1, id2); + def.nodes.insert(id1, input_node_cache("in")); + def.nodes.insert(id2, output_node_cache("out")); + def.edges.push(Edge { + from: id1, + to: id2, + from_port: None, + to_port: None, + }); // Serialize to JSON let json = serde_json::to_string(&def).expect("serialization failed"); diff --git a/crates/nvisy-runtime/src/definition/node.rs b/crates/nvisy-runtime/src/definition/node.rs index a2e86a7..a25eb5b 100644 --- a/crates/nvisy-runtime/src/definition/node.rs +++ b/crates/nvisy-runtime/src/definition/node.rs @@ -2,7 +2,6 @@ use std::str::FromStr; -use derive_builder::Builder; use derive_more::{Debug, Display, From, Into}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -27,24 +26,6 @@ impl NodeId { pub fn new() -> Self { Self(Uuid::now_v7()) } - - /// Creates a node ID from an existing UUID. - #[inline] - pub const fn from_uuid(uuid: Uuid) -> Self { - Self(uuid) - } - - /// Returns the underlying UUID. - #[inline] - pub const fn as_uuid(&self) -> Uuid { - self.0 - } - - /// Returns the UUID as bytes. - #[inline] - pub const fn as_bytes(&self) -> &[u8; 16] { - self.0.as_bytes() - } } impl Default for NodeId { @@ -61,37 +42,17 @@ impl FromStr for NodeId { } } -impl AsRef for NodeId { - fn as_ref(&self) -> &Uuid { - &self.0 - } -} - /// A workflow node definition with metadata and kind. -/// -/// Nodes are categorized by their role in data flow: -/// - **Input**: Reads/produces data (entry points) -/// - **Transform**: Processes/transforms data (intermediate) -/// - **Output**: Writes/consumes data (exit points) -/// - **Switch**: Routes data based on conditions -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)] -#[builder( - name = "NodeBuilder", - pattern = "owned", - setter(into, strip_option, prefix = "with") -)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Node { /// Display name of the node. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub name: Option, /// Description of what this node does. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub description: Option, /// Position in the visual editor. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub position: Option, /// The node kind/type. #[serde(flatten)] @@ -99,21 +60,6 @@ pub struct Node { } impl Node { - /// Creates a new node with the given kind. - pub fn new(kind: impl Into) -> Self { - Self { - name: None, - description: None, - position: None, - kind: kind.into(), - } - } - - /// Returns a builder for creating a node. - pub fn builder() -> NodeBuilder { - NodeBuilder::default() - } - /// Returns whether this is an input node. pub const fn is_input(&self) -> bool { self.kind.is_input() diff --git a/crates/nvisy-runtime/src/definition/output.rs b/crates/nvisy-runtime/src/definition/output.rs index 0e8e346..5a77c17 100644 --- a/crates/nvisy-runtime/src/definition/output.rs +++ b/crates/nvisy-runtime/src/definition/output.rs @@ -1,34 +1,41 @@ //! Output node definition types. +use nvisy_dal::provider::{PineconeParams, PostgresParams, S3Params}; use serde::{Deserialize, Serialize}; +use uuid::Uuid; use super::route::CacheSlot; /// Output node definition - destination for workflow data. -/// -/// Storage provider outputs (S3, Qdrant, etc.) are handled externally via Python. -/// This enum only supports cache slots for internal workflow data flow. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(tag = "target", rename_all = "snake_case")] pub enum Output { + /// Write to a storage provider. + Provider(ProviderOutput), /// Write to named cache slot (resolved at compile time). - Cache(CacheSlot), + CacheSlot(CacheSlot), } -impl Output { - /// Creates a new output from a cache slot. - pub fn from_cache(slot: impl Into) -> Self { - Self::Cache(CacheSlot { - slot: slot.into(), - priority: None, - }) - } +/// Provider-based output configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ProviderOutput { + /// Credentials ID for the provider. + pub credentials_id: Uuid, + /// Provider-specific parameters. + #[serde(flatten)] + pub params: OutputParams, +} - /// Creates a new output from a cache slot with priority. - pub fn from_cache_with_priority(slot: impl Into, priority: u32) -> Self { - Self::Cache(CacheSlot { - slot: slot.into(), - priority: Some(priority), - }) - } +/// Type-erased parameters for output providers. +/// +/// Includes all providers that support writing data (DataOutput). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "provider", content = "params", rename_all = "snake_case")] +pub enum OutputParams { + /// PostgreSQL parameters. + Postgres(PostgresParams), + /// S3 parameters. + S3(S3Params), + /// Pinecone parameters. + Pinecone(PineconeParams), } diff --git a/crates/nvisy-runtime/src/engine/compiler.rs b/crates/nvisy-runtime/src/engine/compiler.rs index 0cb4a4a..59099f9 100644 --- a/crates/nvisy-runtime/src/engine/compiler.rs +++ b/crates/nvisy-runtime/src/engine/compiler.rs @@ -12,34 +12,32 @@ use std::collections::HashMap; +use nvisy_dal::provider::{AnyParams, AnyProvider}; +use nvisy_dal::{DataInput, Provider}; use nvisy_rig::agent::{ StructuredOutputAgent, TableAgent, TextAnalysisAgent, TextGenerationAgent, VisionAgent, }; use petgraph::graph::{DiGraph, NodeIndex}; -use super::context::Context; use super::credentials::CredentialsRegistry; use crate::definition::{Input, NodeId, NodeKind, Output, Workflow}; use crate::error::{Error, Result}; use crate::graph::{ - ChunkProcessor, CompiledGraph, CompiledInput, CompiledNode, CompiledOutput, CompiledSwitch, - CompiledTransform, DeriveProcessor, EdgeData, EmbeddingProcessor, EnrichProcessor, - ExtractProcessor, InputStream, OutputStream, PartitionProcessor, + ChunkProcessor, CompiledGraph, CompiledNode, CompiledSwitch, CompiledTransform, + DeriveProcessor, EdgeData, EmbeddingProcessor, EnrichProcessor, ExtractProcessor, InputStream, + OutputStream, PartitionProcessor, }; /// Workflow compiler that transforms definitions into executable graphs. pub struct WorkflowCompiler<'a> { /// Credentials registry for resolving provider credentials. registry: &'a CredentialsRegistry, - /// Execution context for provider initialization. - #[allow(dead_code)] - ctx: Context, } impl<'a> WorkflowCompiler<'a> { /// Creates a new workflow compiler. - pub fn new(registry: &'a CredentialsRegistry, ctx: Context) -> Self { - Self { registry, ctx } + pub fn new(registry: &'a CredentialsRegistry) -> Self { + Self { registry } } /// Compiles a workflow definition into an executable graph. @@ -107,7 +105,7 @@ impl<'a> WorkflowCompiler<'a> { fn is_cache_only_node(&self, def: &NodeKind) -> bool { match def { NodeKind::Input(input) => matches!(input, Input::CacheSlot(_)), - NodeKind::Output(output) => matches!(output, Output::Cache(_)), + NodeKind::Output(output) => matches!(output, Output::CacheSlot(_)), _ => false, } } @@ -117,7 +115,7 @@ impl<'a> WorkflowCompiler<'a> { // Collect cache slot outputs (nodes that write to cache slots) let mut cache_outputs: HashMap> = HashMap::new(); for (id, node) in &def.nodes { - if let NodeKind::Output(Output::Cache(slot)) = &node.kind { + if let NodeKind::Output(Output::CacheSlot(slot)) = &node.kind { cache_outputs .entry(slot.slot.clone()) .or_default() @@ -209,12 +207,12 @@ impl<'a> WorkflowCompiler<'a> { async fn compile_node(&self, def: &NodeKind) -> Result { match def { NodeKind::Input(input) => { - let stream = self.create_input_stream(input)?; - Ok(CompiledNode::Input(CompiledInput::new(stream))) + let stream = self.create_input_stream(input).await?; + Ok(CompiledNode::Input(stream)) } NodeKind::Output(output) => { - let stream = self.create_output_stream(output)?; - Ok(CompiledNode::Output(CompiledOutput::new(stream))) + let stream = self.create_output_stream(output).await?; + Ok(CompiledNode::Output(stream)) } NodeKind::Transform(transformer) => { let processor = self.create_processor(transformer).await?; @@ -227,8 +225,27 @@ impl<'a> WorkflowCompiler<'a> { } /// Creates an input stream from an input definition. - fn create_input_stream(&self, input: &Input) -> Result { + async fn create_input_stream(&self, input: &Input) -> Result { match input { + Input::Provider(provider_input) => { + let params = AnyParams::from(provider_input.params.clone()); + let (creds, ctx) = self + .registry + .get(provider_input.credentials_id)? + .clone() + .into_dal_credentials()?; + + let provider = AnyProvider::connect(params, creds) + .await + .map_err(|e| Error::Internal(e.to_string()))?; + + let stream = provider + .read(&ctx) + .await + .map_err(|e| Error::Internal(e.to_string()))?; + + Ok(stream) + } Input::CacheSlot(_) => { // Cache inputs are resolved during cache slot resolution // This shouldn't be called for cache inputs @@ -240,9 +257,18 @@ impl<'a> WorkflowCompiler<'a> { } /// Creates an output stream from an output definition. - fn create_output_stream(&self, output: &Output) -> Result { + async fn create_output_stream(&self, output: &Output) -> Result { match output { - Output::Cache(_) => { + Output::Provider(_provider_output) => { + // Output streams require a different approach - we need to batch writes + // For now, return an error as this requires more architecture work + // TODO: Implement output stream batching with DataOutput::write + Err(Error::Internal( + "provider output nodes are not yet implemented - requires batching support" + .into(), + )) + } + Output::CacheSlot(_) => { // Cache outputs are resolved during cache slot resolution Err(Error::Internal( "cache output nodes should be resolved before compilation".into(), diff --git a/crates/nvisy-runtime/src/engine/context.rs b/crates/nvisy-runtime/src/engine/context.rs index 39510ce..a182f9c 100644 --- a/crates/nvisy-runtime/src/engine/context.rs +++ b/crates/nvisy-runtime/src/engine/context.rs @@ -1,90 +1,10 @@ //! Context types for workflow execution. use derive_builder::Builder; -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; use super::CredentialsRegistry; -/// Context for provider operations during compilation and execution. -/// -/// Provides configuration for read/write operations including target, -/// pagination cursor, and limits. -#[derive(Debug, Clone, Default)] -pub struct Context { - /// Target collection, table, bucket, topic, etc. - pub target: Option, - /// Cursor for pagination (provider-specific format). - pub cursor: Option, - /// Tiebreaker for pagination conflicts. - pub tiebreaker: Option, - /// Maximum number of items to read. - pub limit: Option, -} - -impl Context { - /// Creates a new empty context. - pub fn new() -> Self { - Self::default() - } - - /// Sets the target. - pub fn with_target(mut self, target: impl Into) -> Self { - self.target = Some(target.into()); - self - } - - /// Sets the cursor for pagination. - pub fn with_cursor(mut self, cursor: impl Into) -> Self { - self.cursor = Some(cursor.into()); - self - } - - /// Sets the tiebreaker for pagination. - pub fn with_tiebreaker(mut self, tiebreaker: impl Into) -> Self { - self.tiebreaker = Some(tiebreaker.into()); - self - } - - /// Sets the limit. - pub fn with_limit(mut self, limit: usize) -> Self { - self.limit = Some(limit); - self - } - - /// Returns the target, if set. - pub fn target(&self) -> Option<&str> { - self.target.as_deref() - } - - /// Returns the cursor, if set. - pub fn cursor(&self) -> Option<&str> { - self.cursor.as_deref() - } - - /// Returns the limit, if set. - pub fn limit(&self) -> Option { - self.limit - } - - /// Converts to an ObjectContext for object storage providers. - pub fn to_object_context(&self) -> nvisy_dal::ObjectContext { - nvisy_dal::ObjectContext { - prefix: self.target.clone(), - token: self.cursor.clone(), - limit: self.limit, - } - } - - /// Converts to a RelationalContext for relational database providers. - pub fn to_relational_context(&self) -> nvisy_dal::RelationalContext { - nvisy_dal::RelationalContext { - cursor: self.cursor.clone(), - tiebreaker: self.tiebreaker.clone(), - limit: self.limit, - } - } -} - /// Execution context for a workflow run. /// /// Manages the current data items flowing through the pipeline and holds diff --git a/crates/nvisy-runtime/src/engine/credentials.rs b/crates/nvisy-runtime/src/engine/credentials.rs index e10fafb..b580e73 100644 --- a/crates/nvisy-runtime/src/engine/credentials.rs +++ b/crates/nvisy-runtime/src/engine/credentials.rs @@ -1,59 +1,152 @@ -//! Credentials management for AI providers. +//! Credentials management for workflow providers. //! //! This module provides a registry for storing and retrieving credentials -//! used by AI providers (completion, embedding) during workflow execution. +//! used by AI providers (completion, embedding) and DAL providers (postgres, s3, pinecone) +//! during workflow execution. use std::collections::HashMap; use derive_more::From; +use nvisy_dal::contexts::{AnyContext, ObjectContext, RelationalContext, VectorContext}; +use nvisy_dal::provider::{ + AnyCredentials, PineconeCredentials, PostgresCredentials, S3Credentials, +}; use nvisy_rig::provider::{CompletionCredentials, EmbeddingCredentials}; use serde::{Deserialize, Serialize}; -use strum::IntoStaticStr; +use strum::AsRefStr; use uuid::Uuid; use crate::error::{Error, Result}; /// AI provider credentials. -#[derive(Debug, Clone, From, Serialize, Deserialize, IntoStaticStr)] -#[serde(tag = "provider", rename_all = "snake_case")] +#[derive(Debug, Clone, From, AsRefStr, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] #[strum(serialize_all = "snake_case")] -pub enum ProviderCredentials { +pub enum AiCredentials { /// Completion provider credentials. Completion(CompletionCredentials), /// Embedding provider credentials. Embedding(EmbeddingCredentials), } -impl ProviderCredentials { - /// Returns the provider kind as a string. - pub fn kind(&self) -> &'static str { - self.into() - } - +impl AiCredentials { /// Converts to completion credentials if applicable. - pub fn into_completion_credentials(self) -> Result { + pub fn into_completion(self) -> Result { match self { Self::Completion(c) => Ok(c), other => Err(Error::Internal(format!( "expected completion credentials, got '{}'", - other.kind() + other.as_ref() ))), } } /// Converts to embedding credentials if applicable. - pub fn into_embedding_credentials(self) -> Result { + pub fn into_embedding(self) -> Result { match self { Self::Embedding(c) => Ok(c), other => Err(Error::Internal(format!( "expected embedding credentials, got '{}'", - other.kind() + other.as_ref() + ))), + } + } +} + +/// DAL provider credentials with context for data input/output. +#[derive(Debug, Clone, AsRefStr, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum DalCredentials { + /// PostgreSQL credentials with relational context. + Postgres { + credentials: PostgresCredentials, + context: RelationalContext, + }, + /// S3 credentials with object context. + S3 { + credentials: S3Credentials, + context: ObjectContext, + }, + /// Pinecone credentials with vector context. + Pinecone { + credentials: PineconeCredentials, + context: VectorContext, + }, +} + +impl DalCredentials { + /// Returns the credentials portion as AnyCredentials. + pub fn credentials(&self) -> AnyCredentials { + match self { + Self::Postgres { credentials, .. } => AnyCredentials::Postgres(credentials.clone()), + Self::S3 { credentials, .. } => AnyCredentials::S3(credentials.clone()), + Self::Pinecone { credentials, .. } => AnyCredentials::Pinecone(credentials.clone()), + } + } + + /// Returns the context portion as AnyContext. + pub fn context(&self) -> AnyContext { + match self { + Self::Postgres { context, .. } => AnyContext::Relational(context.clone()), + Self::S3 { context, .. } => AnyContext::Object(context.clone()), + Self::Pinecone { context, .. } => AnyContext::Vector(context.clone()), + } + } +} + +/// All provider credentials (AI and DAL). +#[derive(Debug, Clone, AsRefStr, Serialize, Deserialize)] +#[serde(tag = "category", rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum ProviderCredentials { + /// AI provider credentials. + Ai(AiCredentials), + /// DAL provider credentials. + Dal(DalCredentials), +} + +impl ProviderCredentials { + /// Converts to AI credentials if applicable. + pub fn into_ai(self) -> Result { + match self { + Self::Ai(c) => Ok(c), + other => Err(Error::Internal(format!( + "expected AI credentials, got '{}'", + other.as_ref() + ))), + } + } + + /// Converts to DAL credentials if applicable. + pub fn into_dal(self) -> Result { + match self { + Self::Dal(c) => Ok(c), + other => Err(Error::Internal(format!( + "expected DAL credentials, got '{}'", + other.as_ref() ))), } } + + /// Converts to completion credentials if applicable. + pub fn into_completion_credentials(self) -> Result { + self.into_ai()?.into_completion() + } + + /// Converts to embedding credentials if applicable. + pub fn into_embedding_credentials(self) -> Result { + self.into_ai()?.into_embedding() + } + + /// Converts to DAL credentials and context if applicable. + pub fn into_dal_credentials(self) -> Result<(AnyCredentials, AnyContext)> { + let dal = self.into_dal()?; + Ok((dal.credentials(), dal.context())) + } } -/// In-memory registry for AI provider credentials. +/// In-memory registry for provider credentials. /// /// Credentials are stored by UUID and can be retrieved during workflow compilation. #[derive(Debug, Clone, Default)] @@ -68,8 +161,8 @@ impl CredentialsRegistry { } /// Registers credentials with a UUID. - pub fn register(&mut self, id: Uuid, creds: ProviderCredentials) { - self.credentials.insert(id, creds); + pub fn register(&mut self, id: Uuid, creds: impl Into) { + self.credentials.insert(id, creds.into()); } /// Retrieves credentials by UUID. @@ -99,3 +192,28 @@ impl CredentialsRegistry { self.credentials.clear(); } } + +// Convenience From implementations for registering credentials directly +impl From for ProviderCredentials { + fn from(creds: AiCredentials) -> Self { + Self::Ai(creds) + } +} + +impl From for ProviderCredentials { + fn from(creds: DalCredentials) -> Self { + Self::Dal(creds) + } +} + +impl From for ProviderCredentials { + fn from(creds: CompletionCredentials) -> Self { + Self::Ai(AiCredentials::Completion(creds)) + } +} + +impl From for ProviderCredentials { + fn from(creds: EmbeddingCredentials) -> Self { + Self::Ai(AiCredentials::Embedding(creds)) + } +} diff --git a/crates/nvisy-runtime/src/engine/executor.rs b/crates/nvisy-runtime/src/engine/executor.rs index f192649..be0f2fc 100644 --- a/crates/nvisy-runtime/src/engine/executor.rs +++ b/crates/nvisy-runtime/src/engine/executor.rs @@ -3,11 +3,12 @@ use std::sync::Arc; use futures::{SinkExt, StreamExt}; +use nvisy_dal::Error as DalError; use tokio::sync::Semaphore; use super::EngineConfig; use super::compiler::WorkflowCompiler; -use super::context::{Context, ExecutionContext}; +use super::context::ExecutionContext; use super::credentials::CredentialsRegistry; use crate::definition::{NodeId, Workflow}; use crate::error::{Error, Result}; @@ -61,10 +62,9 @@ impl Engine { &self, definition: Workflow, credentials: CredentialsRegistry, - ctx: Context, ) -> Result { // Compile the definition into an executable graph - let compiler = WorkflowCompiler::new(&credentials, ctx); + let compiler = WorkflowCompiler::new(&credentials); let graph = compiler.compile(definition).await?; self.execute_graph(graph, credentials).await @@ -139,11 +139,11 @@ impl Engine { let mut input_streams: Vec<(NodeId, InputStream)> = Vec::new(); for id in &input_ids { if let Some(node) = graph.node_mut(id) - && let CompiledNode::Input(compiled_input) = node + && let CompiledNode::Input(input_stream) = node { // Create a placeholder stream and swap with the real one let placeholder = InputStream::new(Box::pin(futures::stream::empty())); - let stream = std::mem::replace(compiled_input.stream_mut(), placeholder); + let stream = std::mem::replace(input_stream, placeholder); input_streams.push((*id, stream)); } } @@ -151,12 +151,12 @@ impl Engine { // Take ownership of output streams let mut output_streams: Vec<(NodeId, OutputStream)> = Vec::new(); for id in &output_ids { - if let Some(CompiledNode::Output(compiled_output)) = graph.node_mut(id) { - // Create a placeholder sink + if let Some(CompiledNode::Output(output_stream)) = graph.node_mut(id) { + // Create a placeholder sink (must use DalError to match OutputStream type) let placeholder = OutputStream::new(Box::pin(futures::sink::drain().sink_map_err( - |_: std::convert::Infallible| Error::Internal("drain sink error".into()), + |_: std::convert::Infallible| DalError::provider("drain sink error"), ))); - let stream = std::mem::replace(compiled_output.stream_mut(), placeholder); + let stream = std::mem::replace(output_stream, placeholder); output_streams.push((*id, stream)); } } @@ -170,10 +170,11 @@ impl Engine { ); while let Some(result) = input_stream.next().await { - let item = result?; + let resumable = result?; // Start with single input item - ctx.set_current_single(item); + // TODO: Store resumable.context for resumption on failure + ctx.set_current_single(resumable.data); // Execute transforms in order for transform_id in &transform_ids { diff --git a/crates/nvisy-runtime/src/engine/mod.rs b/crates/nvisy-runtime/src/engine/mod.rs index 031ef29..f4898f0 100644 --- a/crates/nvisy-runtime/src/engine/mod.rs +++ b/crates/nvisy-runtime/src/engine/mod.rs @@ -13,6 +13,7 @@ mod credentials; mod executor; pub use config::EngineConfig; -pub use context::{Context, ExecutionContext}; +pub use context::ExecutionContext; pub use credentials::{CredentialsRegistry, ProviderCredentials}; pub use executor::Engine; +pub use nvisy_dal::contexts::AnyContext; diff --git a/crates/nvisy-runtime/src/graph/edge.rs b/crates/nvisy-runtime/src/graph/edge.rs index c2f82ba..d4b5ea4 100644 --- a/crates/nvisy-runtime/src/graph/edge.rs +++ b/crates/nvisy-runtime/src/graph/edge.rs @@ -1,30 +1,14 @@ //! Edge data for compiled graphs. -use derive_builder::Builder; use serde::{Deserialize, Serialize}; /// Edge data stored in the compiled graph. -#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)] -#[derive(Serialize, Deserialize, Builder)] -#[builder( - name = "EdgeDataBuilder", - pattern = "owned", - setter(into, strip_option, prefix = "with") -)] +#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct EdgeData { /// Optional port/slot name on the source node. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub from_port: Option, /// Optional port/slot name on the target node. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default)] pub to_port: Option, } - -impl EdgeData { - /// Returns a builder for creating edge data. - pub fn builder() -> EdgeDataBuilder { - EdgeDataBuilder::default() - } -} diff --git a/crates/nvisy-runtime/src/graph/input/mod.rs b/crates/nvisy-runtime/src/graph/input/mod.rs deleted file mode 100644 index dba1e79..0000000 --- a/crates/nvisy-runtime/src/graph/input/mod.rs +++ /dev/null @@ -1,38 +0,0 @@ -//! Compiled input node types. - -mod stream; - -pub use stream::{DataStream, InputStream}; - -/// Compiled input node - ready to stream data. -/// -/// This is the runtime representation of an input node after compilation. -/// Cache slots are resolved during compilation, so compiled inputs always -/// wrap concrete input streams. -#[derive(Debug)] -pub struct CompiledInput { - /// The input stream for reading data. - stream: InputStream, -} - -impl CompiledInput { - /// Creates a new compiled input from an input stream. - pub fn new(stream: InputStream) -> Self { - Self { stream } - } - - /// Returns a reference to the input stream. - pub fn stream(&self) -> &InputStream { - &self.stream - } - - /// Returns a mutable reference to the input stream. - pub fn stream_mut(&mut self) -> &mut InputStream { - &mut self.stream - } - - /// Consumes this compiled input and returns the underlying stream. - pub fn into_stream(self) -> InputStream { - self.stream - } -} diff --git a/crates/nvisy-runtime/src/graph/input/stream.rs b/crates/nvisy-runtime/src/graph/input/stream.rs deleted file mode 100644 index 5a0d667..0000000 --- a/crates/nvisy-runtime/src/graph/input/stream.rs +++ /dev/null @@ -1,124 +0,0 @@ -//! Input stream types for compiled workflow data flow. - -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures::stream::BoxStream; -use futures::{Stream, StreamExt}; -use nvisy_dal::AnyDataValue; - -use crate::error::Result; - -/// A boxed stream of workflow data values. -pub type DataStream = BoxStream<'static, Result>; - -/// Input stream for reading data in a workflow. -/// -/// Wraps a boxed stream and provides metadata about the source. -pub struct InputStream { - /// The underlying data stream. - stream: DataStream, - /// Optional cursor for pagination. - cursor: Option, - /// Optional limit on items to read. - limit: Option, - /// Number of items read so far. - items_read: usize, -} - -impl InputStream { - /// Creates a new input stream. - pub fn new(stream: DataStream) -> Self { - Self { - stream, - cursor: None, - limit: None, - items_read: 0, - } - } - - /// Creates an input stream with a cursor for pagination. - pub fn with_cursor(stream: DataStream, cursor: impl Into) -> Self { - Self { - stream, - cursor: Some(cursor.into()), - limit: None, - items_read: 0, - } - } - - /// Creates an input stream with a limit on items to read. - pub fn with_limit(stream: DataStream, limit: usize) -> Self { - Self { - stream: Box::pin(stream.take(limit)), - cursor: None, - limit: Some(limit), - items_read: 0, - } - } - - /// Creates an input stream with both cursor and limit. - pub fn with_cursor_and_limit( - stream: DataStream, - cursor: impl Into, - limit: usize, - ) -> Self { - Self { - stream: Box::pin(stream.take(limit)), - cursor: Some(cursor.into()), - limit: Some(limit), - items_read: 0, - } - } - - /// Returns the cursor for the next page, if any. - pub fn cursor(&self) -> Option<&str> { - self.cursor.as_deref() - } - - /// Returns the limit on items to read, if set. - pub fn limit(&self) -> Option { - self.limit - } - - /// Returns the number of items read so far. - pub fn items_read(&self) -> usize { - self.items_read - } - - /// Consumes the stream and returns the inner boxed stream. - pub fn into_inner(self) -> DataStream { - self.stream - } - - /// Consumes the stream and returns all parts. - pub fn into_parts(self) -> (DataStream, Option, Option) { - (self.stream, self.cursor, self.limit) - } -} - -impl Stream for InputStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let result = Pin::new(&mut self.stream).poll_next(cx); - if let Poll::Ready(Some(Ok(_))) = &result { - self.items_read += 1; - } - result - } - - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - -impl std::fmt::Debug for InputStream { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("InputStream") - .field("cursor", &self.cursor) - .field("limit", &self.limit) - .field("items_read", &self.items_read) - .finish_non_exhaustive() - } -} diff --git a/crates/nvisy-runtime/src/graph/mod.rs b/crates/nvisy-runtime/src/graph/mod.rs index 81cc5eb..442eedf 100644 --- a/crates/nvisy-runtime/src/graph/mod.rs +++ b/crates/nvisy-runtime/src/graph/mod.rs @@ -15,16 +15,12 @@ use petgraph::Direction; use petgraph::graph::{DiGraph, NodeIndex}; mod edge; -mod input; mod node; -mod output; mod route; mod transform; pub use edge::EdgeData; -pub use input::{CompiledInput, DataStream, InputStream}; -pub use node::CompiledNode; -pub use output::{CompiledOutput, DataSink, OutputStream}; +pub use node::{CompiledNode, InputStream, OutputStream}; pub use route::{CompiledSwitch, FileCategoryEvaluator, LanguageEvaluator, SwitchEvaluator}; pub use transform::{ ChunkProcessor, CompiledTransform, DeriveProcessor, EmbeddingProcessor, EnrichProcessor, @@ -117,18 +113,6 @@ impl CompiledGraph { .filter_map(|(id, &idx)| self.graph.node_weight(idx).map(|node| (id, node))) } - /// Returns an iterator over input nodes. - pub fn input_nodes(&self) -> impl Iterator { - self.nodes() - .filter_map(|(id, node)| node.as_input().map(|input| (id, input))) - } - - /// Returns an iterator over output nodes. - pub fn output_nodes(&self) -> impl Iterator { - self.nodes() - .filter_map(|(id, node)| node.as_output().map(|output| (id, output))) - } - /// Returns the predecessors (incoming nodes) of a node. pub fn predecessors(&self, id: &NodeId) -> impl Iterator { self.node_indices.get(id).into_iter().flat_map(|&idx| { diff --git a/crates/nvisy-runtime/src/graph/node.rs b/crates/nvisy-runtime/src/graph/node.rs index d848613..a22abee 100644 --- a/crates/nvisy-runtime/src/graph/node.rs +++ b/crates/nvisy-runtime/src/graph/node.rs @@ -1,21 +1,31 @@ //! Compiled node types. -use super::input::CompiledInput; -use super::output::CompiledOutput; +use derive_more::From; +use nvisy_dal::contexts::AnyContext; +use nvisy_dal::datatypes::AnyDataValue; +use nvisy_dal::{Resumable, streams}; + use super::route::CompiledSwitch; use super::transform::CompiledTransform; +/// Type alias for input streams in the runtime. +/// Each item is paired with a context for resumption. +pub type InputStream = streams::InputStream>; + +/// Type alias for output streams in the runtime. +pub type OutputStream = streams::OutputStream; + /// Compiled node enum for workflow execution. /// /// This is the runtime representation of a node after compilation. /// Cache slots are resolved during compilation, so compiled nodes /// only contain concrete processing types. -#[derive(Debug)] +#[derive(Debug, From)] pub enum CompiledNode { /// Data input node - ready to stream data. - Input(CompiledInput), + Input(InputStream), /// Data output node - ready to receive data. - Output(CompiledOutput), + Output(OutputStream), /// Data transform node - ready to process data. /// Boxed to reduce enum size variance (transform processors are large). Transform(Box), @@ -44,22 +54,6 @@ impl CompiledNode { matches!(self, CompiledNode::Switch(_)) } - /// Returns this node as an input, if it is one. - pub fn as_input(&self) -> Option<&CompiledInput> { - match self { - CompiledNode::Input(input) => Some(input), - _ => None, - } - } - - /// Returns this node as an output, if it is one. - pub fn as_output(&self) -> Option<&CompiledOutput> { - match self { - CompiledNode::Output(output) => Some(output), - _ => None, - } - } - /// Returns this node as a transform, if it is one. pub fn as_transform(&self) -> Option<&CompiledTransform> { match self { @@ -75,50 +69,6 @@ impl CompiledNode { _ => None, } } - - /// Consumes this node and returns the input, if it is one. - pub fn into_input(self) -> Option { - match self { - CompiledNode::Input(input) => Some(input), - _ => None, - } - } - - /// Consumes this node and returns the output, if it is one. - pub fn into_output(self) -> Option { - match self { - CompiledNode::Output(output) => Some(output), - _ => None, - } - } - - /// Consumes this node and returns the transform, if it is one. - pub fn into_transform(self) -> Option> { - match self { - CompiledNode::Transform(transform) => Some(transform), - _ => None, - } - } - - /// Consumes this node and returns the switch, if it is one. - pub fn into_switch(self) -> Option { - match self { - CompiledNode::Switch(switch) => Some(switch), - _ => None, - } - } -} - -impl From for CompiledNode { - fn from(input: CompiledInput) -> Self { - CompiledNode::Input(input) - } -} - -impl From for CompiledNode { - fn from(output: CompiledOutput) -> Self { - CompiledNode::Output(output) - } } impl From for CompiledNode { @@ -126,15 +76,3 @@ impl From for CompiledNode { CompiledNode::Transform(Box::new(transform)) } } - -impl From> for CompiledNode { - fn from(transform: Box) -> Self { - CompiledNode::Transform(transform) - } -} - -impl From for CompiledNode { - fn from(switch: CompiledSwitch) -> Self { - CompiledNode::Switch(switch) - } -} diff --git a/crates/nvisy-runtime/src/graph/output/mod.rs b/crates/nvisy-runtime/src/graph/output/mod.rs deleted file mode 100644 index 3282bc4..0000000 --- a/crates/nvisy-runtime/src/graph/output/mod.rs +++ /dev/null @@ -1,38 +0,0 @@ -//! Compiled output node types. - -mod stream; - -pub use stream::{DataSink, OutputStream}; - -/// Compiled output node - ready to receive data. -/// -/// This is the runtime representation of an output node after compilation. -/// Cache slots are resolved during compilation, so compiled outputs always -/// wrap concrete output streams. -#[derive(Debug)] -pub struct CompiledOutput { - /// The output stream for writing data. - stream: OutputStream, -} - -impl CompiledOutput { - /// Creates a new compiled output from an output stream. - pub fn new(stream: OutputStream) -> Self { - Self { stream } - } - - /// Returns a reference to the output stream. - pub fn stream(&self) -> &OutputStream { - &self.stream - } - - /// Returns a mutable reference to the output stream. - pub fn stream_mut(&mut self) -> &mut OutputStream { - &mut self.stream - } - - /// Consumes this compiled output and returns the underlying stream. - pub fn into_stream(self) -> OutputStream { - self.stream - } -} diff --git a/crates/nvisy-runtime/src/graph/output/stream.rs b/crates/nvisy-runtime/src/graph/output/stream.rs deleted file mode 100644 index 50873da..0000000 --- a/crates/nvisy-runtime/src/graph/output/stream.rs +++ /dev/null @@ -1,101 +0,0 @@ -//! Output stream types for compiled workflow data flow. - -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures::{Sink, SinkExt}; -use nvisy_dal::AnyDataValue; - -use crate::error::Error; - -/// A boxed sink for workflow data values. -pub type DataSink = Pin + Send + 'static>>; - -/// Output stream for writing data in a workflow. -/// -/// Wraps a boxed sink and tracks write statistics. -pub struct OutputStream { - /// The underlying data sink. - sink: DataSink, - /// Optional buffer size for batching. - buffer_size: Option, - /// Number of items written so far. - items_written: usize, -} - -impl OutputStream { - /// Creates a new output stream. - pub fn new(sink: DataSink) -> Self { - Self { - sink, - buffer_size: None, - items_written: 0, - } - } - - /// Creates an output stream with buffering for batched writes. - pub fn with_buffer(sink: DataSink, buffer_size: usize) -> Self { - Self { - sink: Box::pin(sink.buffer(buffer_size)), - buffer_size: Some(buffer_size), - items_written: 0, - } - } - - /// Returns the buffer size, if set. - pub fn buffer_size(&self) -> Option { - self.buffer_size - } - - /// Returns the number of items written so far. - pub fn items_written(&self) -> usize { - self.items_written - } - - /// Consumes the stream and returns the inner boxed sink. - pub fn into_inner(self) -> DataSink { - self.sink - } -} - -impl Sink for OutputStream { - type Error = Error; - - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.sink.as_mut().poll_ready(cx) - } - - fn start_send( - mut self: Pin<&mut Self>, - item: AnyDataValue, - ) -> std::result::Result<(), Self::Error> { - self.items_written += 1; - self.sink.as_mut().start_send(item) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.sink.as_mut().poll_flush(cx) - } - - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.sink.as_mut().poll_close(cx) - } -} - -impl std::fmt::Debug for OutputStream { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("OutputStream") - .field("buffer_size", &self.buffer_size) - .field("items_written", &self.items_written) - .finish_non_exhaustive() - } -} diff --git a/crates/nvisy-runtime/src/graph/route/file_category.rs b/crates/nvisy-runtime/src/graph/route/file_category.rs index e319e1d..d430185 100644 --- a/crates/nvisy-runtime/src/graph/route/file_category.rs +++ b/crates/nvisy-runtime/src/graph/route/file_category.rs @@ -1,6 +1,6 @@ //! File category evaluator for routing by file extension. -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; use crate::definition::FileCategory; diff --git a/crates/nvisy-runtime/src/graph/route/language.rs b/crates/nvisy-runtime/src/graph/route/language.rs index c1bed33..64df5a0 100644 --- a/crates/nvisy-runtime/src/graph/route/language.rs +++ b/crates/nvisy-runtime/src/graph/route/language.rs @@ -1,6 +1,6 @@ //! Language evaluator for routing by detected content language. -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; /// Evaluates language based on metadata. #[derive(Debug, Clone)] diff --git a/crates/nvisy-runtime/src/graph/route/mod.rs b/crates/nvisy-runtime/src/graph/route/mod.rs index ade7107..41df5ee 100644 --- a/crates/nvisy-runtime/src/graph/route/mod.rs +++ b/crates/nvisy-runtime/src/graph/route/mod.rs @@ -5,7 +5,7 @@ mod language; pub use file_category::FileCategoryEvaluator; pub use language::LanguageEvaluator; -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; use crate::definition::SwitchDef; diff --git a/crates/nvisy-runtime/src/graph/transform/chunk.rs b/crates/nvisy-runtime/src/graph/transform/chunk.rs index 0c632b7..841470a 100644 --- a/crates/nvisy-runtime/src/graph/transform/chunk.rs +++ b/crates/nvisy-runtime/src/graph/transform/chunk.rs @@ -1,6 +1,6 @@ //! Chunk processor. -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; use nvisy_rig::agent::TextGenerationAgent; use super::Process; diff --git a/crates/nvisy-runtime/src/graph/transform/derive.rs b/crates/nvisy-runtime/src/graph/transform/derive.rs index 4de85fa..eed6577 100644 --- a/crates/nvisy-runtime/src/graph/transform/derive.rs +++ b/crates/nvisy-runtime/src/graph/transform/derive.rs @@ -1,6 +1,6 @@ //! Derive processor. -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; use nvisy_rig::agent::TextGenerationAgent; use super::Process; diff --git a/crates/nvisy-runtime/src/graph/transform/embedding.rs b/crates/nvisy-runtime/src/graph/transform/embedding.rs index 078e7e4..d6b0394 100644 --- a/crates/nvisy-runtime/src/graph/transform/embedding.rs +++ b/crates/nvisy-runtime/src/graph/transform/embedding.rs @@ -1,6 +1,6 @@ //! Embedding processor. -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; use nvisy_rig::provider::EmbeddingProvider; use super::Process; diff --git a/crates/nvisy-runtime/src/graph/transform/enrich.rs b/crates/nvisy-runtime/src/graph/transform/enrich.rs index 4fe6fa9..41f2094 100644 --- a/crates/nvisy-runtime/src/graph/transform/enrich.rs +++ b/crates/nvisy-runtime/src/graph/transform/enrich.rs @@ -1,6 +1,6 @@ //! Enrich processor. -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; use nvisy_rig::agent::{TableAgent, VisionAgent}; use super::Process; diff --git a/crates/nvisy-runtime/src/graph/transform/extract.rs b/crates/nvisy-runtime/src/graph/transform/extract.rs index ee2864a..11e7149 100644 --- a/crates/nvisy-runtime/src/graph/transform/extract.rs +++ b/crates/nvisy-runtime/src/graph/transform/extract.rs @@ -1,6 +1,6 @@ //! Extract processor. -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; use nvisy_rig::agent::{StructuredOutputAgent, TableAgent, TextAnalysisAgent}; use super::Process; diff --git a/crates/nvisy-runtime/src/graph/transform/mod.rs b/crates/nvisy-runtime/src/graph/transform/mod.rs index 93818d6..7d46b0c 100644 --- a/crates/nvisy-runtime/src/graph/transform/mod.rs +++ b/crates/nvisy-runtime/src/graph/transform/mod.rs @@ -17,7 +17,7 @@ pub use derive::DeriveProcessor; pub use embedding::EmbeddingProcessor; pub use enrich::EnrichProcessor; pub use extract::ExtractProcessor; -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; pub use partition::PartitionProcessor; use crate::error::Result; diff --git a/crates/nvisy-runtime/src/graph/transform/partition.rs b/crates/nvisy-runtime/src/graph/transform/partition.rs index 042858a..e387c85 100644 --- a/crates/nvisy-runtime/src/graph/transform/partition.rs +++ b/crates/nvisy-runtime/src/graph/transform/partition.rs @@ -1,6 +1,6 @@ //! Partition processor. -use nvisy_dal::AnyDataValue; +use nvisy_dal::datatypes::AnyDataValue; use super::Process; use crate::definition::PartitionStrategy; diff --git a/packages/README.md b/packages/README.md new file mode 100644 index 0000000..3c42b74 --- /dev/null +++ b/packages/README.md @@ -0,0 +1,37 @@ +# Packages + +[![Build](https://img.shields.io/github/actions/workflow/status/nvisycom/server/build.yml?branch=main&label=build%20%26%20test&style=flat-square)](https://github.com/nvisycom/server/actions/workflows/build.yml) + +This directory contains Python packages that provide provider implementations for the Rust crates. + +## nvisy-dal + +Data abstraction layer for external integrations. Provides unified async interfaces for storage, databases, and vector stores. The Rust `nvisy-dal` crate loads this package via PyO3 to delegate provider calls. + +**Supported providers:** PostgreSQL, MySQL, S3, GCS, Azure Blob, Qdrant, Pinecone + +## nvisy-rig + +AI/LLM orchestration layer. Provides unified interfaces for LLM providers and agent workflows. Used by the Rust `nvisy-rig` crate for Python-based AI integrations. + +**Supported providers:** OpenAI, Anthropic, Cohere + +## Development + +Each package uses [uv](https://docs.astral.sh/uv/) for dependency management: + +```bash +cd packages/nvisy-dal + +# Install dependencies +uv sync --extra dev + +# Run tests +uv run pytest + +# Type check +uv run basedpyright + +# Lint +uv run ruff check . +``` diff --git a/packages/nvisy-dal/pyproject.toml b/packages/nvisy-dal/pyproject.toml index 9440352..7d7b8eb 100644 --- a/packages/nvisy-dal/pyproject.toml +++ b/packages/nvisy-dal/pyproject.toml @@ -46,6 +46,7 @@ asyncio_default_fixture_loop_scope = "function" [dependency-groups] dev = [ + "pip-audit>=2.9", "pyright>=1.1.408", - "ruff>=0.14.14", + "ruff>=0.14", ] diff --git a/packages/nvisy-dal/src/nvisy_dal/generated/__init__.py b/packages/nvisy-dal/src/nvisy_dal/generated/__init__.py index 92878c8..2a3113c 100644 --- a/packages/nvisy-dal/src/nvisy_dal/generated/__init__.py +++ b/packages/nvisy-dal/src/nvisy_dal/generated/__init__.py @@ -25,16 +25,7 @@ ) __all__ = [ - # Contexts (runtime state) - "ObjectContext", - "RelationalContext", - "VectorContext", - # Params (configuration) "DistanceMetric", - "ObjectParams", - "RelationalParams", - "VectorParams", - # Data types "Document", "Edge", "Embedding", @@ -44,5 +35,11 @@ "Metadata", "Node", "Object", + "ObjectContext", + "ObjectParams", "Record", + "RelationalContext", + "RelationalParams", + "VectorContext", + "VectorParams", ] diff --git a/packages/nvisy-dal/src/nvisy_dal/generated/contexts.py b/packages/nvisy-dal/src/nvisy_dal/generated/contexts.py index 34af2b6..76d06ad 100644 --- a/packages/nvisy-dal/src/nvisy_dal/generated/contexts.py +++ b/packages/nvisy-dal/src/nvisy_dal/generated/contexts.py @@ -1,4 +1,7 @@ -"""Context types for provider operations. +"""Context types for data operations. + +Contexts carry state needed to resume reading from a specific position. +They only track *where* to resume, not *how much* to read (that's in Params). Generated from Rust schemas. Do not edit manually. """ @@ -7,23 +10,35 @@ class ObjectContext(BaseModel, frozen=True): - """Context for object storage operations (S3, GCS, Azure Blob).""" + """Context for object storage operations (S3, GCS, Azure Blob). + + Uses marker-based pagination (last seen key) which is portable across + S3, GCS, Azure Blob, and MinIO. + """ prefix: str | None = None + """Path prefix for listing objects.""" + token: str | None = None - limit: int | None = None + """Last seen object key (used as StartAfter/marker for resumption).""" class RelationalContext(BaseModel, frozen=True): - """Context for relational database operations (Postgres, MySQL).""" + """Context for relational database operations (Postgres, MySQL). + + Uses keyset pagination which is more efficient than offset-based + pagination for large datasets and provides stable results. + """ cursor: str | None = None + """Last seen cursor value (for keyset pagination).""" + tiebreaker: str | None = None - limit: int | None = None + """Tiebreaker value for resolving cursor conflicts.""" class VectorContext(BaseModel, frozen=True): """Context for vector database operations (Qdrant, Pinecone, pgvector).""" token: str | None = None - limit: int | None = None + """Continuation token or offset for pagination.""" diff --git a/packages/nvisy-dal/src/nvisy_dal/generated/params.py b/packages/nvisy-dal/src/nvisy_dal/generated/params.py index db7b784..61bf6d8 100644 --- a/packages/nvisy-dal/src/nvisy_dal/generated/params.py +++ b/packages/nvisy-dal/src/nvisy_dal/generated/params.py @@ -1,5 +1,8 @@ """Parameter types for provider configuration. +Params define how providers operate (columns, batch sizes, etc.), +while contexts carry runtime state (cursors, tokens). + Generated from Rust schemas. Do not edit manually. """ @@ -11,32 +14,59 @@ class RelationalParams(BaseModel, frozen=True): """Common parameters for relational database operations.""" - table: str | None = None + table: str + """Target table name.""" + + columns: list[str] | None = None + """Columns to select. If None, selects all columns.""" + cursor_column: str | None = None + """Column to use for cursor-based pagination (e.g., "id", "created_at").""" + tiebreaker_column: str | None = None + """Column to use as tiebreaker when cursor values are not unique (e.g., "id").""" + batch_size: int = Field(default=1000) + """Default batch size for bulk operations.""" class ObjectParams(BaseModel, frozen=True): """Common parameters for object storage operations.""" - bucket: str | None = None + bucket: str + """Bucket name (S3 bucket, GCS bucket, Azure container).""" + prefix: str | None = None + """Default prefix for object keys.""" + batch_size: int = Field(default=1000) + """Default batch size for bulk operations.""" class DistanceMetric(str, Enum): """Distance metric for vector similarity search.""" COSINE = "cosine" + """Cosine similarity (default).""" + EUCLIDEAN = "euclidean" + """Euclidean distance (L2).""" + DOT_PRODUCT = "dot_product" + """Dot product.""" class VectorParams(BaseModel, frozen=True): """Common parameters for vector database operations.""" - collection: str | None = None + collection: str + """Collection or index name (Pinecone index, Qdrant collection).""" + dimension: int | None = None + """Dimension of vectors (required for some providers).""" + metric: DistanceMetric = DistanceMetric.COSINE + """Distance metric for similarity search.""" + batch_size: int = Field(default=1000) + """Default batch size for bulk operations.""" diff --git a/packages/nvisy-dal/src/nvisy_dal/protocols.py b/packages/nvisy-dal/src/nvisy_dal/protocols.py index c8a9a42..bcce5d2 100644 --- a/packages/nvisy-dal/src/nvisy_dal/protocols.py +++ b/packages/nvisy-dal/src/nvisy_dal/protocols.py @@ -5,17 +5,22 @@ T_co = TypeVar("T_co", covariant=True) T_contra = TypeVar("T_contra", contravariant=True) +Ctx = TypeVar("Ctx") # Invariant: used in both parameter and return positions Ctx_contra = TypeVar("Ctx_contra", contravariant=True) Cred_contra = TypeVar("Cred_contra", contravariant=True) Params_contra = TypeVar("Params_contra", contravariant=True) @runtime_checkable -class DataInput(Protocol[T_co, Ctx_contra]): +class DataInput(Protocol[T_co, Ctx]): """Protocol for reading data from external sources.""" - async def read(self, ctx: Ctx_contra) -> AsyncIterator[T_co]: - """Yield items from the source based on context.""" + async def read(self, ctx: Ctx) -> AsyncIterator[tuple[T_co, Ctx]]: + """Yield (item, context) tuples from the source. + + Each yielded context can be used to resume reading from + the next item if the stream is interrupted. + """ ... diff --git a/packages/nvisy-dal/src/nvisy_dal/providers/pinecone.py b/packages/nvisy-dal/src/nvisy_dal/providers/pinecone.py index 223359e..d0bba80 100644 --- a/packages/nvisy-dal/src/nvisy_dal/providers/pinecone.py +++ b/packages/nvisy-dal/src/nvisy_dal/providers/pinecone.py @@ -87,7 +87,7 @@ async def upsert(self, vectors: Sequence[PineconeVector]) -> int: for i in range(0, len(records), batch_size): batch = list(records[i : i + batch_size]) response = cast( - UpsertResponse, + "UpsertResponse", self._index.upsert( # pyright: ignore[reportUnknownMemberType] vectors=batch, namespace=self._params.namespace, diff --git a/packages/nvisy-dal/src/nvisy_dal/providers/postgres.py b/packages/nvisy-dal/src/nvisy_dal/providers/postgres.py index ce7b09b..e9a2f8b 100644 --- a/packages/nvisy-dal/src/nvisy_dal/providers/postgres.py +++ b/packages/nvisy-dal/src/nvisy_dal/providers/postgres.py @@ -6,6 +6,8 @@ from pydantic import BaseModel from nvisy_dal.errors import DalError, ErrorKind +from nvisy_dal.generated.contexts import RelationalContext +from nvisy_dal.generated.params import RelationalParams if TYPE_CHECKING: from asyncpg import Pool @@ -26,22 +28,20 @@ class PostgresCredentials(BaseModel): dsn: str -class PostgresParams(BaseModel): - """Parameters for PostgreSQL operations.""" +class PostgresParams(RelationalParams, frozen=True): + """Parameters for PostgreSQL operations. - table: str - schema_name: str = "public" - batch_size: int = 1000 + Inherits `table` and `batch_size` from RelationalParams. + """ + cursor_column: str = "id" + """Column to use for keyset pagination cursor.""" -class PostgresContext(BaseModel): - """Context for read/write operations.""" + schema_name: str = "public" + """Schema name (defaults to "public").""" - columns: list[str] | None = None where: dict[str, object] | None = None - order_by: str | None = None - limit: int | None = None - offset: int | None = None + """WHERE clause conditions as key-value pairs.""" class PostgresProvider: @@ -79,43 +79,89 @@ async def disconnect(self) -> None: """Close the connection pool.""" await self._pool.close() - async def read(self, ctx: PostgresContext) -> AsyncIterator[dict[str, object]]: - """Read records from the database using parameterized queries.""" + def _build_where_conditions( + self, + params: list[object], + conditions: list[str], + ) -> None: + """Add WHERE conditions from params.where to conditions list.""" + if not self._params.where: + return + for key, value in self._params.where.items(): + if value is None: + conditions.append(f'"{key}" IS NULL') + else: + params.append(value) + conditions.append(f'"{key}" = ${len(params)}') + + def _build_keyset_condition( + self, + ctx: RelationalContext, + params: list[object], + conditions: list[str], + cursor_col: str, + ) -> None: + """Add keyset pagination condition if cursor exists.""" + if ctx.cursor is None: + return + params.append(ctx.cursor) + if self._params.tiebreaker_column and ctx.tiebreaker is not None: + tiebreaker_col = f'"{self._params.tiebreaker_column}"' + params.append(ctx.tiebreaker) + p1, p2 = len(params) - 1, len(params) + conditions.append(f"({cursor_col}, {tiebreaker_col}) > (${p1}, ${p2})") + else: + conditions.append(f"{cursor_col} > ${len(params)}") + + def _extract_context(self, record_dict: dict[str, object]) -> RelationalContext: + """Extract resumption context from a record.""" + cursor_val = record_dict.get(self._params.cursor_column, "") + cursor_value = str(cursor_val) if cursor_val is not None else "" + tiebreaker_value: str | None = None + if self._params.tiebreaker_column: + tb_val = record_dict.get(self._params.tiebreaker_column, "") + tiebreaker_value = str(tb_val) if tb_val is not None else "" + return RelationalContext(cursor=cursor_value, tiebreaker=tiebreaker_value) + + async def read( + self, ctx: RelationalContext + ) -> AsyncIterator[tuple[dict[str, object], RelationalContext]]: + """Read records from the database using keyset pagination. + + Yields tuples of (record, context) where context can be used to resume + reading from the next record if the stream is interrupted. + """ try: async with self._pool.acquire() as conn: - # Build query with proper parameter binding - columns = ", ".join(f'"{c}"' for c in ctx.columns) if ctx.columns else "*" + columns = ( + ", ".join(f'"{c}"' for c in self._params.columns) + if self._params.columns + else "*" + ) table = f'"{self._params.schema_name}"."{self._params.table}"' + cursor_col = f'"{self._params.cursor_column}"' query_parts: list[str] = [f"SELECT {columns} FROM {table}"] # noqa: S608 params: list[object] = [] + conditions: list[str] = [] + + self._build_where_conditions(params, conditions) + self._build_keyset_condition(ctx, params, conditions, cursor_col) + + if conditions: + query_parts.append("WHERE " + " AND ".join(conditions)) - if ctx.where: - conditions: list[str] = [] - for key, value in ctx.where.items(): - if value is None: - conditions.append(f'"{key}" IS NULL') - else: - params.append(value) - conditions.append(f'"{key}" = ${len(params)}') - if conditions: - query_parts.append("WHERE " + " AND ".join(conditions)) - - if ctx.order_by: - # Order by should be validated/sanitized by caller - query_parts.append(f"ORDER BY {ctx.order_by}") - - if ctx.limit is not None: - params.append(ctx.limit) - query_parts.append(f"LIMIT ${len(params)}") - - if ctx.offset is not None: - params.append(ctx.offset) - query_parts.append(f"OFFSET ${len(params)}") + # Order by cursor column(s) for keyset pagination + if self._params.tiebreaker_column: + tiebreaker_col = f'"{self._params.tiebreaker_column}"' + query_parts.append(f"ORDER BY {cursor_col}, {tiebreaker_col}") + else: + query_parts.append(f"ORDER BY {cursor_col}") query = " ".join(query_parts) async for record in conn.cursor(query, *params): - yield dict(record) + record_dict: dict[str, object] = dict(record) + yield (record_dict, self._extract_context(record_dict)) except Exception as e: msg = f"Failed to read from PostgreSQL: {e}" raise DalError(msg, source=e) from e diff --git a/packages/nvisy-dal/src/nvisy_dal/providers/s3.py b/packages/nvisy-dal/src/nvisy_dal/providers/s3.py index ed1bd62..8008ac0 100644 --- a/packages/nvisy-dal/src/nvisy_dal/providers/s3.py +++ b/packages/nvisy-dal/src/nvisy_dal/providers/s3.py @@ -6,6 +6,8 @@ from pydantic import BaseModel from nvisy_dal.errors import DalError, ErrorKind +from nvisy_dal.generated.contexts import ObjectContext +from nvisy_dal.generated.params import ObjectParams if TYPE_CHECKING: from mypy_boto3_s3 import S3Client @@ -27,20 +29,17 @@ class S3Credentials(BaseModel): endpoint_url: str | None = None -class S3Params(BaseModel): - """Parameters for S3 operations.""" - - bucket: str - prefix: str = "" +class S3Params(ObjectParams, frozen=True): + """Parameters for S3 operations. + Inherits `bucket` and `batch_size` from ObjectParams. + """ -class S3Context(BaseModel): - """Context for read/write operations.""" + prefix: str = "" + """Key prefix for all operations.""" - key: str | None = None - prefix: str | None = None - max_keys: int = 1000 content_type: str = "application/octet-stream" + """Default content type for uploaded objects.""" class S3Object(BaseModel): @@ -94,25 +93,32 @@ async def connect(cls, credentials: S3Credentials, params: S3Params) -> Self: async def disconnect(self) -> None: """Close the S3 client (no-op for boto3).""" - async def read(self, ctx: S3Context) -> AsyncIterator[S3Object]: - """List and optionally read objects from S3.""" + async def read(self, ctx: ObjectContext) -> AsyncIterator[tuple[S3Object, ObjectContext]]: + """List objects from S3. + + Yields tuples of (object, context) where context can be used to resume + reading from the next object if the stream is interrupted. + + The token field is used as StartAfter for marker-based pagination. + """ prefix = ctx.prefix or self._params.prefix - continuation_token: str | None = None + last_key: str | None = ctx.token try: while True: - if continuation_token: + if last_key: + # Use last seen key as StartAfter for resumption response = self._client.list_objects_v2( Bucket=self._params.bucket, Prefix=prefix, - MaxKeys=ctx.max_keys, - ContinuationToken=continuation_token, + MaxKeys=self._params.batch_size, + StartAfter=last_key, ) else: response = self._client.list_objects_v2( Bucket=self._params.bucket, Prefix=prefix, - MaxKeys=ctx.max_keys, + MaxKeys=self._params.batch_size, ) for obj in response.get("Contents", []): @@ -124,32 +130,30 @@ async def read(self, ctx: S3Context) -> AsyncIterator[S3Object]: if not obj_key or obj_size is None or not obj_modified or not obj_etag: continue - content = None - if ctx.key and obj_key == ctx.key: - get_response = self._client.get_object( - Bucket=self._params.bucket, - Key=obj_key, - ) - content = get_response["Body"].read() - - yield S3Object( + last_key = obj_key + s3_obj = S3Object( key=obj_key, size=obj_size, last_modified=obj_modified.isoformat(), etag=obj_etag.strip('"'), - content=content, + content=None, + ) + + # Create context for resumption after this object + resume_ctx = ObjectContext( + prefix=prefix, + token=obj_key, ) + yield (s3_obj, resume_ctx) if not response.get("IsTruncated"): break - continuation_token = response.get("NextContinuationToken") - except ClientError as e: msg = f"Failed to read from S3: {e}" raise DalError(msg, source=e) from e - async def write(self, ctx: S3Context, items: Sequence[S3Object]) -> None: + async def write(self, items: Sequence[S3Object]) -> None: """Write objects to S3.""" try: for item in items: @@ -161,7 +165,7 @@ async def write(self, ctx: S3Context, items: Sequence[S3Object]) -> None: Bucket=self._params.bucket, Key=key, Body=item.content, - ContentType=ctx.content_type, + ContentType=self._params.content_type, ) except ClientError as e: msg = f"Failed to write to S3: {e}"