Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 0 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,6 @@ High-performance backend server for the Nvisy document processing platform.
- **Real-Time Updates** - AI streaming via SSE and job processing via NATS
- **Interactive Docs** - Auto-generated OpenAPI with Scalar UI

## Architecture

```
server/
├── crates/
│ ├── nvisy-cli/ # Server binary with CLI and configuration
│ ├── nvisy-core/ # Shared types, errors, and utilities
│ ├── nvisy-nats/ # NATS client (streams, KV, object storage, jobs)
│ ├── nvisy-postgres/ # PostgreSQL database layer with Diesel ORM
│ ├── nvisy-rig/ # AI services (chat, RAG, embeddings)
│ ├── nvisy-server/ # HTTP handlers, middleware, pipeline, and OpenAPI
│ └── nvisy-webhook/ # Webhook delivery with HTTP client
├── migrations/ # PostgreSQL database migrations
└── Cargo.toml # Workspace configuration
```

## Quick Start

```bash
Expand Down
1 change: 1 addition & 0 deletions crates/nvisy-dal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ serde_json = { workspace = true, features = [] }
# Derive macros & utilities
async-trait = { workspace = true, features = [] }
derive_more = { workspace = true, features = ["from"] }
strum = { workspace = true, features = [] }
thiserror = { workspace = true, features = [] }

# Data types
Expand Down
24 changes: 24 additions & 0 deletions crates/nvisy-dal/src/core/contexts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,28 @@ impl AnyContext {
}
self
}

/// Returns a reference to the object context if this is an object context.
pub fn as_object(&self) -> Option<&ObjectContext> {
match self {
Self::Object(ctx) => Some(ctx),
_ => None,
}
}

/// Returns a reference to the relational context if this is a relational context.
pub fn as_relational(&self) -> Option<&RelationalContext> {
match self {
Self::Relational(ctx) => Some(ctx),
_ => None,
}
}

/// Returns a reference to the vector context if this is a vector context.
pub fn as_vector(&self) -> Option<&VectorContext> {
match self {
Self::Vector(ctx) => Some(ctx),
_ => None,
}
}
}
17 changes: 5 additions & 12 deletions crates/nvisy-dal/src/core/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
//! Core types and traits for data operations.

mod contexts;
mod datatypes;
mod params;
mod streams;
pub mod contexts;
pub mod datatypes;
pub mod params;
pub mod streams;

pub use contexts::{AnyContext, ObjectContext, RelationalContext, VectorContext};
pub use datatypes::{
AnyDataValue, DataType, Document, Edge, Embedding, Graph, Message, Metadata, Node, Object,
Record,
};
pub use nvisy_core::Provider;
pub use params::{DistanceMetric, ObjectParams, RelationalParams, VectorParams};
pub use streams::{InputStream, ItemSink, ItemStream, OutputStream};
use streams::InputStream;

use crate::Result;

Expand Down
8 changes: 4 additions & 4 deletions crates/nvisy-dal/src/core/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use serde::{Deserialize, Serialize};

/// Common parameters for relational database operations.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct RelationalParams {
/// Target table name.
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -23,7 +23,7 @@ pub struct RelationalParams {
}

/// Common parameters for object storage operations.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct ObjectParams {
/// Bucket name (S3 bucket, GCS bucket, Azure container).
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -37,7 +37,7 @@ pub struct ObjectParams {
}

/// Common parameters for vector database operations.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct VectorParams {
/// Collection or index name (Pinecone index, Qdrant collection).
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -54,7 +54,7 @@ pub struct VectorParams {
}

/// Distance metric for vector similarity search.
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DistanceMetric {
/// Cosine similarity (default).
Expand Down
32 changes: 24 additions & 8 deletions crates/nvisy-dal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,34 @@
#![forbid(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod core;
mod core;
mod error;
mod runtime;

pub mod provider;

mod python;
pub mod contexts {
//! Context types for pagination and filtering.
pub use crate::core::contexts::*;
}

mod error;
pub mod datatypes {
//! Data types for storage operations.
pub use crate::core::datatypes::*;
}

pub mod params {
//! Parameter types for provider configuration.
pub use crate::core::params::*;
}

pub mod streams {
//! Stream types for data input/output.
pub use crate::core::streams::*;
}

pub use core::{
AnyContext, AnyDataValue, DataInput, DataOutput, DataType, Document, Edge, Embedding, Graph,
InputStream, ItemSink, ItemStream, Message, Metadata, Node, Object, ObjectContext,
OutputStream, Provider, Record, RelationalContext, VectorContext,
};
pub use core::{DataInput, DataOutput};

pub use error::{BoxError, Error, ErrorKind, Result};
pub use nvisy_core::Provider;
pub use provider::{AnyCredentials, AnyParams, AnyProvider};
131 changes: 129 additions & 2 deletions crates/nvisy-dal/src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use derive_more::From;
use serde::{Deserialize, Serialize};
use strum::AsRefStr;

mod pinecone;
mod postgres;
Expand All @@ -30,8 +31,9 @@ pub use self::postgres::{PostgresCredentials, PostgresParams, PostgresProvider};
pub use self::s3::{S3Credentials, S3Params, S3Provider};

/// Type-erased credentials for any provider.
#[derive(Debug, Clone, From, Serialize, Deserialize)]
#[derive(Debug, Clone, From, AsRefStr, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum AnyCredentials {
/// PostgreSQL credentials.
Postgres(PostgresCredentials),
Expand All @@ -42,8 +44,9 @@ pub enum AnyCredentials {
}

/// Type-erased parameters for any provider.
#[derive(Debug, Clone, From, Serialize, Deserialize)]
#[derive(Debug, Clone, From, AsRefStr, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum AnyParams {
/// PostgreSQL parameters.
Postgres(PostgresParams),
Expand All @@ -63,3 +66,127 @@ pub enum AnyProvider {
/// Pinecone provider.
Pinecone(PineconeProvider),
}

use futures::StreamExt;

use crate::contexts::AnyContext;
use crate::datatypes::AnyDataValue;
use crate::streams::InputStream;
use crate::{DataInput, DataOutput, Error, Result};

#[async_trait::async_trait]
impl crate::Provider for AnyProvider {
type Credentials = AnyCredentials;
type Params = AnyParams;

async fn connect(
params: Self::Params,
credentials: Self::Credentials,
) -> nvisy_core::Result<Self> {
match (params, credentials) {
(AnyParams::Postgres(params), AnyCredentials::Postgres(credentials)) => {
let provider = PostgresProvider::connect(params, credentials).await?;
Ok(Self::Postgres(provider))
}
(AnyParams::S3(params), AnyCredentials::S3(credentials)) => {
let provider = S3Provider::connect(params, credentials).await?;
Ok(Self::S3(provider))
}
(AnyParams::Pinecone(params), AnyCredentials::Pinecone(credentials)) => {
let provider = PineconeProvider::connect(params, credentials).await?;
Ok(Self::Pinecone(provider))
}
(params, credentials) => Err(nvisy_core::Error::new(
nvisy_core::ErrorKind::InvalidInput,
)
.with_message(format!(
"mismatched provider types: params={}, credentials={}",
params.as_ref(),
credentials.as_ref()
))),
}
}

async fn disconnect(self) -> nvisy_core::Result<()> {
match self {
Self::Postgres(provider) => provider.disconnect().await,
Self::S3(provider) => provider.disconnect().await,
Self::Pinecone(provider) => provider.disconnect().await,
}
}
}

#[async_trait::async_trait]
impl DataInput for AnyProvider {
type Context = AnyContext;
type Item = AnyDataValue;

async fn read(&self, ctx: &Self::Context) -> Result<InputStream<Self::Item>> {
match self {
Self::Postgres(provider) => {
let ctx = ctx.as_relational().cloned().unwrap_or_default();
let stream = provider.read(&ctx).await?;
let mapped = stream.map(|r| r.map(AnyDataValue::from));
Ok(InputStream::new(Box::pin(mapped)))
}
Self::S3(provider) => {
let ctx = ctx.as_object().cloned().unwrap_or_default();
let stream = provider.read(&ctx).await?;
let mapped = stream.map(|r| r.map(AnyDataValue::from));
Ok(InputStream::new(Box::pin(mapped)))
}
Self::Pinecone(_) => Err(Error::invalid_input(
"Pinecone provider does not support reading",
)),
}
}
}

#[async_trait::async_trait]
impl DataOutput for AnyProvider {
type Item = AnyDataValue;

async fn write(&self, items: Vec<Self::Item>) -> Result<()> {
match self {
Self::Postgres(provider) => {
let records: Result<Vec<_>> = items
.into_iter()
.map(|item| match item {
AnyDataValue::Record(r) => Ok(r),
other => Err(Error::invalid_input(format!(
"expected Record, got {:?}",
std::mem::discriminant(&other)
))),
})
.collect();
provider.write(records?).await
}
Self::S3(provider) => {
let objects: Result<Vec<_>> = items
.into_iter()
.map(|item| match item {
AnyDataValue::Object(o) => Ok(o),
other => Err(Error::invalid_input(format!(
"expected Object, got {:?}",
std::mem::discriminant(&other)
))),
})
.collect();
provider.write(objects?).await
}
Self::Pinecone(provider) => {
let embeddings: Result<Vec<_>> = items
.into_iter()
.map(|item| match item {
AnyDataValue::Embedding(e) => Ok(e),
other => Err(Error::invalid_input(format!(
"expected Embedding, got {:?}",
std::mem::discriminant(&other)
))),
})
.collect();
provider.write(embeddings?).await
}
}
}
}
10 changes: 5 additions & 5 deletions crates/nvisy-dal/src/provider/pinecone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

use serde::{Deserialize, Serialize};

use crate::Result;
use crate::core::{DataOutput, Embedding, Provider};
use crate::python::{self, PyDataOutput, PyProvider};
use crate::datatypes::Embedding;
use crate::runtime::{self, PyDataOutput, PyProvider};
use crate::{DataOutput, Provider, Result};

/// Credentials for Pinecone connection.
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -16,7 +16,7 @@ pub struct PineconeCredentials {
}

/// Parameters for Pinecone operations.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PineconeParams {
/// Index name.
pub index_name: String,
Expand All @@ -39,7 +39,7 @@ impl Provider for PineconeProvider {
params: Self::Params,
credentials: Self::Credentials,
) -> nvisy_core::Result<Self> {
let inner = python::connect("pinecone", credentials, params).await?;
let inner = runtime::connect("pinecone", credentials, params).await?;
Ok(Self {
output: inner.as_data_output(),
inner,
Expand Down
15 changes: 8 additions & 7 deletions crates/nvisy-dal/src/provider/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

use serde::{Deserialize, Serialize};

use crate::Result;
use crate::core::{
DataInput, DataOutput, InputStream, Provider, Record, RelationalContext, RelationalParams,
};
use crate::python::{self, PyDataInput, PyDataOutput, PyProvider};
use crate::contexts::RelationalContext;
use crate::datatypes::Record;
use crate::params::RelationalParams;
use crate::runtime::{self, PyDataInput, PyDataOutput, PyProvider};
use crate::streams::InputStream;
use crate::{DataInput, DataOutput, Provider, Result};

/// Credentials for PostgreSQL connection.
///
Expand All @@ -20,7 +21,7 @@ pub struct PostgresCredentials {
}

/// Parameters for PostgreSQL operations.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PostgresParams {
/// Schema name (defaults to "public").
#[serde(default = "default_schema")]
Expand Down Expand Up @@ -50,7 +51,7 @@ impl Provider for PostgresProvider {
params: Self::Params,
credentials: Self::Credentials,
) -> nvisy_core::Result<Self> {
let inner = python::connect("postgres", credentials, params).await?;
let inner = runtime::connect("postgres", credentials, params).await?;
Ok(Self {
input: inner.as_data_input(),
output: inner.as_data_output(),
Expand Down
Loading