Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
438 changes: 433 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ conjure-runtime = "6"
nominal-api = "0.917.0"
prost = "0.13"
snap = "1"
tokio = { version = "1", features = ["full", "tracing"] }
futures = "0.3"
tokio = { version = "1", features = ["full", "tracing", "io-util"] }
url = "2.5.4"
parking_lot = "0.12"
tracing = "0.1"
Expand All @@ -28,6 +29,8 @@ serde_json = "1.0.140"
chrono = "0.4.41"
thiserror = "2"
crossbeam-channel = "0.5.15"
reqwest = "0.12.22"
async-channel = "2.5.0"

[profile.release]
debug = true
Expand Down
104 changes: 80 additions & 24 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io::Write;
use std::sync::LazyLock;

use conjure_error::Error;
use conjure_http::client::AsyncClient;
use conjure_http::client::AsyncRequestBody;
use conjure_http::client::AsyncService;
use conjure_http::private::header::CONTENT_ENCODING;
use conjure_http::private::header::CONTENT_TYPE;
use conjure_http::private::Request;
Expand All @@ -18,8 +18,10 @@ use conjure_runtime::Client;
use conjure_runtime::Idempotency;
use conjure_runtime::ResponseBody;
use conjure_runtime::UserAgent;
use derive_more::From;
use nominal_api::api::rids::NominalDataSourceOrDatasetRid;
use nominal_api::api::rids::WorkspaceRid;
use nominal_api::ingest::api::IngestServiceAsyncClient;
use nominal_api::upload::api::UploadServiceAsyncClient;
use snap::write::FrameEncoder;
use url::Url;

Expand All @@ -32,27 +34,86 @@ pub mod conjure {
pub use conjure_runtime as runtime;
}

const PRODUCTION_API_URL: &str = "https://api.gov.nominal.io/api";
const STAGING_API_URL: &str = "https://api-staging.gov.nominal.io/api";
const USER_AGENT: &str = "nominal-streaming";

impl AuthProvider for BearerToken {
fn token(&self) -> Option<BearerToken> {
Some(self.clone())
}
}

pub static PRODUCTION_STREAMING_CLIENT: LazyLock<StreamingClient> = LazyLock::new(|| {
async_conjure_streaming_client("https://api.gov.nominal.io/api".try_into().unwrap())
.expect("Failed to create client")
#[derive(Debug, Clone)]
pub struct TokenAndWorkspaceRid {
pub token: BearerToken,
pub workspace_rid: Option<WorkspaceRid>,
}

impl AuthProvider for TokenAndWorkspaceRid {
fn token(&self) -> Option<BearerToken> {
Some(self.token.clone())
}

fn workspace_rid(&self) -> Option<WorkspaceRid> {
self.workspace_rid.clone()
}
}

#[derive(Clone)]
pub struct NominalApiClients {
pub streaming: Client,
pub upload: UploadServiceAsyncClient<Client>,
pub ingest: IngestServiceAsyncClient<Client>,
}

impl Debug for NominalApiClients {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NominalApiClients")
.field("streaming", &"Client")
.field("upload", &"UploadServiceAsyncClient<Client>")
.field("ingest", &"IngestServiceAsyncClient<Client>")
.finish()
}
}

impl NominalApiClients {
pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
self.streaming.send(req).await
}
}

pub static PRODUCTION_CLIENTS: LazyLock<NominalApiClients> = LazyLock::new(|| NominalApiClients {
streaming: async_conjure_streaming_client(PRODUCTION_API_URL.try_into().unwrap())
.expect("Failed to create streaming client"),
upload: UploadServiceAsyncClient::new(
async_conjure_client("upload", PRODUCTION_API_URL.try_into().unwrap())
.expect("Failed to create upload client"),
),
ingest: IngestServiceAsyncClient::new(
async_conjure_client("ingest", PRODUCTION_API_URL.try_into().unwrap())
.expect("Failed to create ingest client"),
),
});

pub static STAGING_STREAMING_CLIENT: LazyLock<StreamingClient> = LazyLock::new(|| {
async_conjure_streaming_client("https://api-staging.gov.nominal.io/api".try_into().unwrap())
.expect("Failed to create client")
pub static STAGING_CLIENTS: LazyLock<NominalApiClients> = LazyLock::new(|| NominalApiClients {
streaming: async_conjure_streaming_client(STAGING_API_URL.try_into().unwrap())
.expect("Failed to create streaming client"),
upload: UploadServiceAsyncClient::new(
async_conjure_client("upload", STAGING_API_URL.try_into().unwrap())
.expect("Failed to create upload client"),
),
ingest: IngestServiceAsyncClient::new(
async_conjure_client("ingest", STAGING_API_URL.try_into().unwrap())
.expect("Failed to create ingest client"),
),
});

fn async_conjure_streaming_client(uri: Url) -> Result<StreamingClient, Error> {
fn async_conjure_streaming_client(uri: Url) -> Result<Client, Error> {
Client::builder()
.service("core-streaming-rs")
.user_agent(UserAgent::new(Agent::new(
"core-streaming-rs",
USER_AGENT,
env!("CARGO_PKG_VERSION"),
)))
.uri(uri)
Expand All @@ -64,22 +125,17 @@ fn async_conjure_streaming_client(uri: Url) -> Result<StreamingClient, Error> {
// enables retries for POST endpoints like the streaming ingest one
.idempotency(Idempotency::Always)
.build()
.map(|client| client.into())
}

#[derive(From, Clone)]
pub struct StreamingClient(Client);

impl Debug for StreamingClient {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "StreamingClient")
}
}

impl StreamingClient {
pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
self.0.send(req).await
}
fn async_conjure_client(service: &'static str, uri: Url) -> Result<Client, Error> {
Client::builder()
.service(service)
.user_agent(UserAgent::new(Agent::new(
USER_AGENT,
env!("CARGO_PKG_VERSION"),
)))
.uri(uri)
.build()
}

pub type WriteRequest<'a> = Request<AsyncRequestBody<'a, BodyWriter>>;
Expand Down
18 changes: 9 additions & 9 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use parking_lot::Mutex;
use prost::Message;
use tracing::warn;

use crate::client::StreamingClient;
use crate::client::NominalApiClients;
use crate::client::{self};
use crate::notifier::NominalStreamListener;
use crate::types::AuthProvider;
Expand All @@ -45,24 +45,24 @@ pub trait WriteRequestConsumer: Send + Sync + Debug {
}

#[derive(Clone)]
pub struct NominalCoreConsumer<T: AuthProvider> {
client: StreamingClient,
pub struct NominalCoreConsumer<A: AuthProvider> {
client: NominalApiClients,
handle: tokio::runtime::Handle,
token_provider: T,
auth_provider: A,
data_source_rid: ResourceIdentifier,
}

impl<T: AuthProvider> NominalCoreConsumer<T> {
impl<A: AuthProvider> NominalCoreConsumer<A> {
pub fn new(
client: StreamingClient,
client: NominalApiClients,
handle: tokio::runtime::Handle,
token_provider: T,
auth_provider: A,
data_source_rid: ResourceIdentifier,
) -> Self {
Self {
client,
handle,
token_provider,
auth_provider,
data_source_rid,
}
}
Expand All @@ -80,7 +80,7 @@ impl<T: AuthProvider> Debug for NominalCoreConsumer<T> {
impl<T: AuthProvider + 'static> WriteRequestConsumer for NominalCoreConsumer<T> {
fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
let token = self
.token_provider
.auth_provider
.token()
.ok_or(ConsumerError::MissingTokenError)?;
let write_request =
Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub mod notifier;
pub mod stream;
mod types;
pub mod upload;

/// This includes the most common types in this crate, re-exported for your convenience.
pub mod prelude {
Expand All @@ -17,11 +18,9 @@
pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;

pub use crate::client::PRODUCTION_STREAMING_CLIENT;
pub use crate::client::STAGING_STREAMING_CLIENT;
pub use crate::consumer::NominalCoreConsumer;
pub use crate::stream::NominalDatasetStream;
pub use crate::stream::NominalDatasourceStream;

Check warning on line 23 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

use of deprecated type alias `stream::NominalDatasourceStream`

Check warning on line 23 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

use of deprecated type alias `stream::NominalDatasourceStream`
pub use crate::stream::NominalStreamOpts;
pub use crate::types::ChannelDescriptor;
}
Expand Down
4 changes: 2 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tracing::debug;
use tracing::error;
use tracing::info;

use crate::client::PRODUCTION_STREAMING_CLIENT;
use crate::client::PRODUCTION_CLIENTS;
use crate::consumer::AvroFileConsumer;
use crate::consumer::DualWriteRequestConsumer;
use crate::consumer::ListeningWriteRequestConsumer;
Expand Down Expand Up @@ -148,7 +148,7 @@ impl NominalDatasetStreamBuilder {
.as_ref()
.map(|(token, dataset, handle)| {
NominalCoreConsumer::new(
PRODUCTION_STREAMING_CLIENT.clone(),
PRODUCTION_CLIENTS.clone(),
handle.clone(),
token.clone(),
dataset.clone(),
Expand Down
5 changes: 5 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::BTreeMap;
use std::time::Duration;

use conjure_object::BearerToken;
use nominal_api::api::rids::WorkspaceRid;
use nominal_api::tonic::google::protobuf::Timestamp;
use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
Expand Down Expand Up @@ -53,6 +54,10 @@ impl ChannelDescriptor {

pub trait AuthProvider: Clone + Send + Sync {
fn token(&self) -> Option<BearerToken>;

fn workspace_rid(&self) -> Option<WorkspaceRid> {
None
}
}

pub trait IntoPoints {
Expand Down
Loading