From f4f0b15ab0f78985b58e0077febf4db554e2a2f7 Mon Sep 17 00:00:00 2001 From: BadMachine Date: Thu, 14 Aug 2025 03:20:34 +0200 Subject: [PATCH 1/2] Initial browser split added --- Cargo.toml | 22 +- browser.js | 2 +- index.d.ts | 1 - package.json | 2 +- src/bin/main.rs | 5 +- src/client/base.rs | 37 +++ src/client/browser/client.rs | 105 ++++++++ src/client/browser/mod.rs | 21 ++ src/client/mod.rs | 238 +----------------- src/client/napi_rs/client.rs | 164 ++++++++++++ src/client/napi_rs/mod.rs | 2 + src/client/native/client.rs | 139 ++++++++++ src/client/native/mod.rs | 2 + src/lib.rs | 1 + src/point/mod.rs | 1 - src/query/browser/mod.rs | 2 + src/query/browser/query_processor.rs | 32 +++ src/query/common/mod.rs | 1 + src/query/common/query_processor.rs | 28 +++ src/query/mod.rs | 7 +- src/query/query_processor.rs | 28 --- src/serializer/browser/mod.rs | 11 + src/serializer/browser/unsafe_serializer.rs | 12 + .../{ => common}/library_serializer.rs | 6 +- src/serializer/common/mod.rs | 37 +++ src/serializer/{ => common}/raw_serializer.rs | 6 +- .../{ => common}/unsafe_serializer.rs | 3 +- src/serializer/mod.rs | 34 +-- tests/e2e_write.rs | 6 +- 29 files changed, 637 insertions(+), 318 deletions(-) create mode 100644 src/client/base.rs create mode 100644 src/client/browser/client.rs create mode 100644 src/client/browser/mod.rs create mode 100644 src/client/napi_rs/client.rs create mode 100644 src/client/napi_rs/mod.rs create mode 100644 src/client/native/client.rs create mode 100644 src/client/native/mod.rs create mode 100644 src/query/browser/mod.rs create mode 100644 src/query/browser/query_processor.rs create mode 100644 src/query/common/mod.rs create mode 100644 src/query/common/query_processor.rs delete mode 100644 src/query/query_processor.rs create mode 100644 src/serializer/browser/mod.rs create mode 100644 src/serializer/browser/unsafe_serializer.rs rename src/serializer/{ => common}/library_serializer.rs (99%) create mode 100644 src/serializer/common/mod.rs rename src/serializer/{ => common}/raw_serializer.rs (85%) rename src/serializer/{ => common}/unsafe_serializer.rs (84%) diff --git a/Cargo.toml b/Cargo.toml index 0dc9f81..d064e99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,27 +28,19 @@ version = "3.2" git = "https://github.com/BadMachine/napi-rs" branch = "fix/readable-stream-byte-mode-lock" package = "napi" -features = ["full","experimental","tokio","web_stream","tokio-stream"] +#features = ["full","experimental","tokio","web_stream","tokio-stream"] +features = ["experimental", "web_stream", "serde-json", "napi10"] [dependencies.reqwest] version = "0.12" features = ["json","rustls-tls-native-roots-no-provider","gzip"] -[dependencies.tonic] -version = "0.13" -features = ["gzip","tls-native-roots","tls-webpki-roots", "tls-ring"] # since rustls update it requires to manually initialize cryptoprovider +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tonic = { version = "0.13", features = ["gzip","tls-native-roots","tls-webpki-roots","tls-ring"] } +arrow = { version = "56.0.0", features = ["arrow-json"] } +arrow-flight = { version = "56.0.0", features = ["flight-sql"] } +serde_arrow = { version = "0.13.5", features = ["arrow-56"] } -[dependencies.arrow] -version = "56.0.0" -features = ["arrow-json"] - -[dependencies.arrow-flight] -version = "56.0.0" -features = ["flight-sql"] - -[dependencies.serde_arrow] -version = "0.13.5" -features = ["arrow-56"] [build-dependencies] napi-build = "2.2.3" diff --git a/browser.js b/browser.js index a4e3521..10d33fd 100644 --- a/browser.js +++ b/browser.js @@ -1 +1 @@ -export * from 'influxdb3-napi-wasm32-wasi' +export * from '@badmachine/influxdb3-napi-wasm32-wasi' diff --git a/index.d.ts b/index.d.ts index 4f91ebc..de907fa 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,7 +1,6 @@ /* auto-generated by NAPI-RS */ /* eslint-disable */ export declare class InfluxDbClient { - constructor(addr: string, token?: string | undefined | null, serializer?: Serializer | undefined | null, options?: FlightOptions | undefined | null) query(queryPayload: QueryPayload): ReadableStream | ReadableStream> | ReadableStream /** * # Safety diff --git a/package.json b/package.json index 83d1785..0bc5788 100644 --- a/package.json +++ b/package.json @@ -58,7 +58,7 @@ "bench": "node --import @oxc-node/core/register benchmark/bench.ts", "build": "napi build --platform --release --no-const-enum", "build:wasm-unknown": "napi build --platform --release -t wasm32-unknown-unknown", - "build:wasip2": "napi build --platform --release -t wasm32-wasip2", + "build:wasip1-threads": "napi build --platform --release -t wasm32-wasip1-threads", "build:debug": "napi build --platform", "format": "run-p format:prettier format:rs format:toml", "format:prettier": "prettier . -w", diff --git a/src/bin/main.rs b/src/bin/main.rs index 90f32ce..99331e0 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -1,14 +1,13 @@ -use influxdb3_napi::client::options::{QueryPayload, QueryType}; -use napi::bindgen_prelude::Either3; use napi::tokio; -use napi::tokio_stream::StreamExt; #[tokio::main] #[cfg(feature = "native")] async fn main() { + println!("Not implemented yet for native binary"); } #[tokio::main] +#[cfg(not(target_arch = "wasm32"))] #[cfg(not(feature = "native"))] async fn main() { println!("Not implemented yet"); diff --git a/src/client/base.rs b/src/client/base.rs new file mode 100644 index 0000000..ea4fa61 --- /dev/null +++ b/src/client/base.rs @@ -0,0 +1,37 @@ +// CURRENTLY NAPI-RS DOES NOT SUPPORT TRAIT IMPLEMENTATIONS + +// use napi::bindgen_prelude::{Either3, ReadableStream}; +// use napi::Env; +// use crate::client::options::{FlightOptions, QueryPayload}; +// use crate::client::WriteOptions; +// use crate::serializer::library_serializer::LibraryReturnType; +// use crate::serializer::Serializer; +// +// pub trait InfluxClientTrait { +// fn new( +// addr: String, +// token: Option, +// serializer: Option, +// options: Option, +// ) -> Self; +// +// fn query( +// &mut self, +// query_payload: QueryPayload, +// env: &Env, +// ) -> napi::Result< +// Either3< +// ReadableStream<'_, LibraryReturnType>, +// ReadableStream<'_, serde_json::Map>, +// ReadableStream<'_, napi::bindgen_prelude::Buffer>, +// >, +// >; +// +// fn write( +// &mut self, +// lines: Vec, +// database: String, +// write_options: Option, +// org: Option, +// ) -> napi::Result<()>; +// } diff --git a/src/client/browser/client.rs b/src/client/browser/client.rs new file mode 100644 index 0000000..52726c1 --- /dev/null +++ b/src/client/browser/client.rs @@ -0,0 +1,105 @@ +use crate::client::http_client::get_http_client; +use crate::client::options::{FlightOptions, QueryPayload, WriteOptions}; +use crate::serializer::browser::{Serializer}; +use napi::bindgen_prelude::{Buffer, Either, ReadableStream}; +use napi::Env; +use napi::tokio_stream::wrappers::ReceiverStream; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use crate::client::browser::HttpQueryResponseV1; +use crate::query::browser::query_processor::into_stream; +use napi::bindgen_prelude::*; + +#[napi_derive::napi] +pub struct InfluxDBClient { + addr: String, + http_client: Client, + serializer: Serializer, + options: FlightOptions, +} + +// replace it with #[napi_derive::napi] in the future + // impl InfluxClientTrait for InfluxDBClient { +#[napi_derive::napi] +impl InfluxDBClient { + fn new( + addr: String, + token: Option, + serializer: Option, + options: Option, + ) -> Self { + Self { + addr, + http_client: get_http_client(token.unwrap_or(String::from(""))), + serializer: serializer.unwrap_or_default(), + options: options.unwrap_or_default(), + } + } + + #[napi_derive::napi] + pub fn query( + &mut self, + query_payload: QueryPayload, + env: &Env, + ) -> napi::Result< + Either< + ReadableStream<'_, Map>, + ReadableStream<'_, Buffer>, + >, + > { + let stream = self.query_inner(query_payload, env)?; + Ok(Either::A(stream)) + } + + pub fn query_inner( + &mut self, + query_payload: QueryPayload, + env: &Env, + ) -> Result>> { + use napi::bindgen_prelude::block_on; + + let stream: ReceiverStream>> = block_on(async { + let url = format!("{}/query", self.addr); + + let response = self + .http_client + .get(&url) + .query(&[ + ("db", query_payload.database.clone()), + ("q", query_payload.query.clone()), + ]) + .send() + .await + .map_err(|e| Error::from_reason(format!("HTTP request failed: {}", e)))?; + + let status = response.status(); + if !status.is_success() { + return Err(Error::from_reason(format!( + "InfluxDB returned non-success status: {}", + status + ))); + } + + let data: HttpQueryResponseV1 = response + .json() + .await + .map_err(|e| Error::from_reason(format!("Failed to parse JSON: {}", e)))?; + + Ok(into_stream(data)) + })?; + + ReadableStream::new(env, stream) + } + + #[napi_derive::napi] + pub fn write( + &mut self, + lines: Vec, + database: String, + write_options: Option, + org: Option, + ) -> Result<()> { + todo!() + } +} diff --git a/src/client/browser/mod.rs b/src/client/browser/mod.rs new file mode 100644 index 0000000..302b66e --- /dev/null +++ b/src/client/browser/mod.rs @@ -0,0 +1,21 @@ +use serde::{Deserialize, Serialize}; + +pub(crate) mod client; + +#[derive(Deserialize, Serialize)] +pub struct Series { + name: String, + pub(crate)columns: Vec, + pub(crate)values: Vec>, +} + +#[derive(Deserialize, Serialize)] +pub struct QueryDataV1 { + statement_id: u32, + pub(crate) series: Vec, +} + +#[derive(Deserialize, Serialize)] +pub(crate) struct HttpQueryResponseV1 { + pub(crate)results: Vec, +} \ No newline at end of file diff --git a/src/client/mod.rs b/src/client/mod.rs index d8f1fb5..c11708c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,231 +1,15 @@ -mod channel; -mod http_client; -pub mod options; +#[cfg(not(target_arch = "wasm32"))] +pub mod channel; -use crate::client::channel::get_channel; -use crate::client::http_client::get_http_client; -pub use crate::client::options::{to_header_map, WriteOptions}; -use crate::client::options::{FlightOptions, QueryPayload}; -use crate::query::query_processor::into_stream; -use crate::serializer::library_serializer::{LibraryReturnType, LibrarySerializer}; -use crate::serializer::raw_serializer::RawSerializer; -use crate::serializer::unsafe_serializer::UnsafeSerializer; -use crate::serializer::Serializer; -use crate::serializer::SerializerTrait; -use crate::write::get_write_path; -use crate::Status; -use arrow_flight::{FlightClient, Ticket}; -use napi::bindgen_prelude::*; -use napi::Env; -use napi_derive::napi; +#[cfg(feature = "native")] +pub mod native; -use reqwest::Client; -use tonic::codegen::Bytes; +#[cfg(target_arch = "wasm32")] +pub mod browser; -#[cfg_attr(not(feature = "native"), napi_derive::napi)] -pub struct InfluxDBClient { - addr: String, - flight_client: FlightClient, - serializer: Serializer, - http_client: Client, -} +#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "native"))] +pub mod napi_rs; -#[cfg_attr(not(feature = "native"), napi_derive::napi)] -impl InfluxDBClient { - #[cfg_attr(not(feature = "native"), napi_derive::napi(constructor))] - pub fn new( - addr: String, - token: Option, - serializer: Option, - options: Option, - ) -> Self { - #[cfg(not(feature = "native"))] - use napi::bindgen_prelude::block_on; - #[cfg(not(feature = "native"))] - let channel = block_on(async { - get_channel(addr.clone(), options) - .connect() - .await - .expect("error connecting") - }); - - #[cfg(feature = "native")] - let channel = get_channel(addr.clone(), options).connect_lazy(); - - let http_client = get_http_client(token.clone().unwrap_or(String::from(""))); - - let mut flight_client = FlightClient::new(channel); - - if let Some(token) = token { - flight_client - .add_header("authorization", format!("Bearer {token}").as_str()) - .unwrap(); - } - - Self { - addr, - flight_client, - http_client, - serializer: serializer.unwrap_or(Serializer::Unsafe), - } - } - - #[cfg(feature = "native")] - pub async fn query_inner( - &mut self, - query_payload: QueryPayload, - ) -> napi::tokio_stream::wrappers::ReceiverStream::Output>> - { - let payload: String = query_payload.into(); - - let ticket = Ticket { - ticket: Bytes::from(payload), - }; - - let response = self.flight_client.do_get(ticket).await; - into_stream::(response.unwrap()) - } - - #[cfg(not(feature = "native"))] - pub fn query_inner( - &mut self, - query_payload: QueryPayload, - env: &Env, - ) -> napi::Result> { - let payload: String = query_payload.into(); - - let ticket = Ticket { - ticket: Bytes::from(payload), - }; - - use napi::bindgen_prelude::block_on; - - let stream = block_on(async { - let response = self.flight_client.do_get(ticket).await; - into_stream::(response.unwrap()) - }); - - ReadableStream::new(env, stream) - } - - #[allow(clippy::type_complexity)] - #[cfg(not(feature = "native"))] - #[napi_derive::napi] - pub fn query( - &mut self, - query_payload: QueryPayload, - env: &Env, - ) -> napi::Result< - Either3< - ReadableStream<'_, LibraryReturnType>, - ReadableStream<'_, serde_json::Map>, - ReadableStream<'_, napi::bindgen_prelude::Buffer>, - >, - > { - match self.serializer { - Serializer::Library => { - let stream = self.query_inner::(query_payload, env)?; - Ok(Either3::A(stream)) - } - Serializer::Unsafe => { - let stream = self.query_inner::(query_payload, env)?; - Ok(Either3::B(stream)) - } - Serializer::Raw => { - let stream = self.query_inner::(query_payload, env)?; - Ok(Either3::C(stream)) - } - } - } - - #[cfg(feature = "native")] - pub async fn query( - &mut self, - query_payload: QueryPayload, - ) -> napi::Result< - Either3< - napi::tokio_stream::wrappers::ReceiverStream>, - napi::tokio_stream::wrappers::ReceiverStream< - napi::Result>, - >, - napi::tokio_stream::wrappers::ReceiverStream>, - >, - > { - match self.serializer { - Serializer::Library => { - let stream = self.query_inner::(query_payload).await; - Ok(Either3::A(stream)) - } - Serializer::Unsafe => { - let stream = self.query_inner::(query_payload).await; - Ok(Either3::B(stream)) - } - Serializer::Raw => { - let stream = self.query_inner::(query_payload).await; - Ok(Either3::C(stream)) - } - } - } - - async fn write_inner( - &mut self, - lines: Vec, - database: String, - write_options: Option, - org: Option, - ) -> napi::Result<()> { - let (url, write_options) = get_write_path(&self.addr, database, org, write_options)?; - - let headers = to_header_map(&write_options.headers.unwrap_or_default()).unwrap(); - let response = &self - .http_client - .post(url) - .body(lines.join("\n")) - .headers(headers) - .send() - .await; - - if let Err(e) = response { - match e.status() { - Some(reqwest::StatusCode::UNAUTHORIZED) => Err(napi::Error::from_reason("Unauthorized")), - _ => Err(napi::Error::from_status(Status::Cancelled)), - } - } else { - match response { - Ok(response) => match response.status() { - reqwest::StatusCode::OK => Ok(()), - reqwest::StatusCode::NO_CONTENT => Ok(()), - reqwest::StatusCode::UNAUTHORIZED => Err(napi::Error::from_reason("Unauthorized")), - _ => Err(napi::Error::from_reason("Unknown")), - }, - Err(_error) => Err(napi::Error::from_status(Status::Cancelled)), - } - } - } - - #[cfg(feature = "native")] - pub async fn write( - &mut self, - lines: Vec, - database: String, - write_options: Option, - org: Option, - ) -> napi::Result<()> { - self.write_inner(lines, database, write_options, org).await - } - - #[cfg(not(feature = "native"))] - #[napi_derive::napi] - /// # Safety - /// - /// This function should not be called before the horsemen are ready. - pub async unsafe fn write( - &mut self, - lines: Vec, - database: String, - write_options: Option, - org: Option, - ) -> napi::Result<()> { - self.write_inner(lines, database, write_options, org).await - } -} +pub mod http_client; +pub mod options; \ No newline at end of file diff --git a/src/client/napi_rs/client.rs b/src/client/napi_rs/client.rs new file mode 100644 index 0000000..2254a81 --- /dev/null +++ b/src/client/napi_rs/client.rs @@ -0,0 +1,164 @@ +use crate::client::channel::get_channel; +use crate::client::http_client::get_http_client; +pub use crate::client::options::{to_header_map, WriteOptions}; +use crate::client::options::{FlightOptions, QueryPayload}; +use crate::query::common::query_processor::into_stream; +use crate::serializer::common::library_serializer::{LibraryReturnType, LibrarySerializer}; +use crate::serializer::common::raw_serializer::RawSerializer; +use crate::serializer::common::unsafe_serializer::UnsafeSerializer; +use crate::serializer::common::Serializer; +use crate::serializer::common::SerializerTrait; +use crate::write::get_write_path; +use crate::Status; +use arrow_flight::{FlightClient, Ticket}; +use napi::bindgen_prelude::*; +use napi::Env; +use napi::tokio_stream::wrappers::ReceiverStream; +use reqwest::Client; +use tonic::codegen::Bytes; + +#[cfg(not(target_arch = "wasm32"))] +#[napi_derive::napi] +pub struct InfluxDBClient { + addr: String, + flight_client: FlightClient, + serializer: Serializer, + http_client: Client, +} + +#[cfg(not(target_arch = "wasm32"))] +#[napi_derive::napi] +impl InfluxDBClient { + + pub fn new( + addr: String, + token: Option, + serializer: Option, + options: Option, + ) -> Self { + use napi::bindgen_prelude::block_on; + let channel = block_on(async { + get_channel(addr.clone(), options) + .connect() + .await + .expect("error connecting") + }); + + let http_client = get_http_client(token.clone().unwrap_or(String::from(""))); + + let mut flight_client = FlightClient::new(channel); + + if let Some(token) = token { + flight_client + .add_header("authorization", format!("Bearer {token}").as_str()) + .unwrap(); + } + + Self { + addr, + flight_client, + http_client, + serializer: serializer.unwrap_or(Serializer::Unsafe), + } + } + + #[allow(clippy::type_complexity)] + #[napi_derive::napi] + pub fn query( + &mut self, + query_payload: QueryPayload, + env: &Env, + ) -> napi::Result< + Either3< + ReadableStream<'_, LibraryReturnType>, + ReadableStream<'_, serde_json::Map>, + ReadableStream<'_, Buffer>, + >, + > { + match self.serializer { + Serializer::Library => { + let stream = self.query_inner::(query_payload, env)?; + Ok(Either3::A(stream)) + } + Serializer::Unsafe => { + let stream = self.query_inner::(query_payload, env)?; + Ok(Either3::B(stream)) + } + Serializer::Raw => { + let stream = self.query_inner::(query_payload, env)?; + Ok(Either3::C(stream)) + } + } + } + + pub fn query_inner( + &mut self, + query_payload: QueryPayload, + env: &Env, + ) -> napi::Result> { + let payload: String = query_payload.into(); + + let ticket = Ticket { + ticket: Bytes::from(payload), + }; + + use napi::bindgen_prelude::block_on; + + let stream: ReceiverStream::Output>> = block_on(async { + let response = self.flight_client.do_get(ticket).await; + into_stream::(response.unwrap()) + }); + + ReadableStream::new(env, stream) + } + + #[napi_derive::napi] + /// # Safety + /// + /// This function should not be called before the horsemen are ready. + pub async unsafe fn write( + &mut self, + lines: Vec, + database: String, + write_options: Option, + org: Option, + ) -> Result<()> { + self.write_inner(lines, database, write_options, org).await + } + + async fn write_inner( + &mut self, + lines: Vec, + database: String, + write_options: Option, + org: Option, + ) -> napi::Result<()> { + let (url, write_options) = get_write_path(&self.addr, database, org, write_options)?; + + let headers = to_header_map(&write_options.headers.unwrap_or_default()).unwrap(); + let response = &self + .http_client + .post(url) + .body(lines.join("\n")) + .headers(headers) + .send() + .await; + + if let Err(e) = response { + match e.status() { + Some(reqwest::StatusCode::UNAUTHORIZED) => Err(napi::Error::from_reason("Unauthorized")), + _ => Err(napi::Error::from_status(Status::Cancelled)), + } + } else { + match response { + Ok(response) => match response.status() { + reqwest::StatusCode::OK => Ok(()), + reqwest::StatusCode::NO_CONTENT => Ok(()), + reqwest::StatusCode::UNAUTHORIZED => Err(napi::Error::from_reason("Unauthorized")), + _ => Err(napi::Error::from_reason("Unknown")), + }, + Err(_error) => Err(napi::Error::from_status(Status::Cancelled)), + } + } + } +} diff --git a/src/client/napi_rs/mod.rs b/src/client/napi_rs/mod.rs new file mode 100644 index 0000000..a1cf985 --- /dev/null +++ b/src/client/napi_rs/mod.rs @@ -0,0 +1,2 @@ +#[cfg(not(feature = "native"))] +pub mod client; \ No newline at end of file diff --git a/src/client/native/client.rs b/src/client/native/client.rs new file mode 100644 index 0000000..83b3539 --- /dev/null +++ b/src/client/native/client.rs @@ -0,0 +1,139 @@ +use crate::client::channel::get_channel; +use crate::client::http_client::get_http_client; +pub use crate::client::options::{to_header_map, FlightOptions, QueryPayload, WriteOptions}; +use arrow_flight::{FlightClient, Ticket}; +use napi::bindgen_prelude::*; +use napi::Status; +use reqwest::Client; +use tonic::codegen::Bytes; + +use crate::query::common::query_processor::into_stream; +use crate::serializer::common::library_serializer::{LibraryReturnType, LibrarySerializer}; +use crate::serializer::common::raw_serializer::RawSerializer; +use crate::serializer::common::unsafe_serializer::UnsafeSerializer; +use crate::serializer::common::{Serializer, SerializerTrait}; +use crate::write::get_write_path; + +pub struct InfluxDBClient { + addr: String, + flight_client: FlightClient, + serializer: Serializer, + http_client: Client, +} + +impl InfluxDBClient { + pub fn new( + addr: String, + token: Option, + serializer: Option, + options: Option, + ) -> Self { + let channel = get_channel(addr.clone(), options).connect_lazy(); + + let http_client = get_http_client(token.clone().unwrap_or(String::from(""))); + + let mut flight_client = FlightClient::new(channel); + + if let Some(token) = token { + flight_client + .add_header("authorization", format!("Bearer {token}").as_str()) + .unwrap(); + } + + Self { + addr, + flight_client, + http_client, + serializer: serializer.unwrap_or(Serializer::Unsafe), + } + } + + pub async fn query( + &mut self, + query_payload: QueryPayload, + ) -> Result< + Either3< + napi::tokio_stream::wrappers::ReceiverStream>, + napi::tokio_stream::wrappers::ReceiverStream< + Result>, + >, + napi::tokio_stream::wrappers::ReceiverStream>, + >, + > { + match self.serializer { + Serializer::Library => { + let stream = self.query_inner::(query_payload).await; + Ok(Either3::A(stream)) + } + Serializer::Unsafe => { + let stream = self.query_inner::(query_payload).await; + Ok(Either3::B(stream)) + } + Serializer::Raw => { + let stream = self.query_inner::(query_payload).await; + Ok(Either3::C(stream)) + } + } + } + + pub async fn query_inner( + &mut self, + query_payload: QueryPayload, + ) -> napi::tokio_stream::wrappers::ReceiverStream::Output>> + { + let payload: String = query_payload.into(); + + let ticket = Ticket { + ticket: Bytes::from(payload), + }; + + let response = self.flight_client.do_get(ticket).await; + into_stream::(response.unwrap()) + } + + async fn write_inner( + &mut self, + lines: Vec, + database: String, + write_options: Option, + org: Option, + ) -> napi::Result<()> { + let (url, write_options) = get_write_path(&self.addr, database, org, write_options)?; + + let headers = to_header_map(&write_options.headers.unwrap_or_default()).unwrap(); + let response = &self + .http_client + .post(url) + .body(lines.join("\n")) + .headers(headers) + .send() + .await; + + if let Err(e) = response { + match e.status() { + Some(reqwest::StatusCode::UNAUTHORIZED) => Err(napi::Error::from_reason("Unauthorized")), + _ => Err(napi::Error::from_status(Status::Cancelled)), + } + } else { + match response { + Ok(response) => match response.status() { + reqwest::StatusCode::OK => Ok(()), + reqwest::StatusCode::NO_CONTENT => Ok(()), + reqwest::StatusCode::UNAUTHORIZED => Err(napi::Error::from_reason("Unauthorized")), + _ => Err(napi::Error::from_reason("Unknown")), + }, + Err(_error) => Err(napi::Error::from_status(Status::Cancelled)), + } + } + } + + pub async fn write( + &mut self, + lines: Vec, + database: String, + write_options: Option, + org: Option, + ) -> napi::Result<()> { + self.write_inner(lines, database, write_options, org).await + } +} diff --git a/src/client/native/mod.rs b/src/client/native/mod.rs new file mode 100644 index 0000000..eb5ab24 --- /dev/null +++ b/src/client/native/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "native")] +pub mod client; \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index bc28186..eb25704 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ pub mod point; pub mod query; pub mod serializer; pub mod write; + // #[global_allocator] // static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; diff --git a/src/point/mod.rs b/src/point/mod.rs index 627e883..5d85fbb 100644 --- a/src/point/mod.rs +++ b/src/point/mod.rs @@ -1,6 +1,5 @@ mod escape; pub mod point_values; - use crate::client::options::TimeUnitV2; use crate::point::escape::{escape, COMMA_EQ_SPACE, COMMA_SPACE}; use crate::point::point_values::{PointFieldType, PointValues}; diff --git a/src/query/browser/mod.rs b/src/query/browser/mod.rs new file mode 100644 index 0000000..10d062d --- /dev/null +++ b/src/query/browser/mod.rs @@ -0,0 +1,2 @@ +#[cfg(target_arch = "wasm32")] +pub(crate) mod query_processor; \ No newline at end of file diff --git a/src/query/browser/query_processor.rs b/src/query/browser/query_processor.rs new file mode 100644 index 0000000..b51b44d --- /dev/null +++ b/src/query/browser/query_processor.rs @@ -0,0 +1,32 @@ +use napi::tokio_stream::wrappers::ReceiverStream; +use napi::bindgen_prelude::*; +use crate::client::browser::{HttpQueryResponseV1, Series}; +use crate::serializer::browser::unsafe_serializer::UnsafeSerializer; + +pub(crate) fn into_stream( + mut response: HttpQueryResponseV1, +) -> ReceiverStream>> { + let (tx, rx) = tokio::sync::mpsc::channel::< + Result>, + >(100); + + tokio::spawn(async move { + let HttpQueryResponseV1 { results } = response; + + if let Some(data) = results.first() { + if let Some(series) = data.series.first() { + let Series { columns, values, .. } = series; + + for item in values { + if let Some(result) = UnsafeSerializer::serialize(columns, item).await { + if tx.send(Ok(result)).await.is_err() { + break; + } + } + } + } + } + }); + + ReceiverStream::new(rx) +} diff --git a/src/query/common/mod.rs b/src/query/common/mod.rs new file mode 100644 index 0000000..738d505 --- /dev/null +++ b/src/query/common/mod.rs @@ -0,0 +1 @@ +pub mod query_processor; \ No newline at end of file diff --git a/src/query/common/query_processor.rs b/src/query/common/query_processor.rs new file mode 100644 index 0000000..923acd8 --- /dev/null +++ b/src/query/common/query_processor.rs @@ -0,0 +1,28 @@ +use crate::serializer::common::SerializerTrait; + +use arrow_flight::decode::FlightRecordBatchStream; +use napi::bindgen_prelude::*; +use napi::tokio_stream::wrappers::ReceiverStream; +use tonic::codegen::tokio_stream::StreamExt; + +pub(crate) fn into_stream( + mut response: FlightRecordBatchStream, +) -> ReceiverStream::Output>> { + let (tx, rx) = tokio::sync::mpsc::channel::>(100); + + tokio::spawn(async move { + while let Some(batch) = response.next().await { + let serialized_result = S::serialize(batch).await; + + if let Some(data) = serialized_result { + for item in data { + if tx.send(Ok(item)).await.is_err() { + break; + } + } + } + } + }); + + ReceiverStream::new(rx) +} \ No newline at end of file diff --git a/src/query/mod.rs b/src/query/mod.rs index 2276020..8dcdb3b 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -1,2 +1,5 @@ -// pub mod by_batch; -pub mod query_processor; +#[cfg(target_arch = "wasm32")] +pub mod browser; + +#[cfg(not(target_arch = "wasm32"))] +pub mod common; \ No newline at end of file diff --git a/src/query/query_processor.rs b/src/query/query_processor.rs deleted file mode 100644 index c568108..0000000 --- a/src/query/query_processor.rs +++ /dev/null @@ -1,28 +0,0 @@ -use crate::serializer::SerializerTrait; - -use arrow_flight::decode::FlightRecordBatchStream; -use napi::bindgen_prelude::*; -use napi::tokio_stream::wrappers::ReceiverStream; -use tonic::codegen::tokio_stream::StreamExt; - -pub(crate) fn into_stream( - mut response: FlightRecordBatchStream, -) -> ReceiverStream::Output>> { - let (tx, rx) = tokio::sync::mpsc::channel::>(100); - - napi::tokio::spawn(async move { - while let Some(batch) = response.next().await { - let serialized_result = S::serialize(batch).await; - - if let Some(data) = serialized_result { - for item in data { - if tx.send(Ok(item)).await.is_err() { - break; - } - } - } - } - }); - - ReceiverStream::new(rx) -} diff --git a/src/serializer/browser/mod.rs b/src/serializer/browser/mod.rs new file mode 100644 index 0000000..4681dca --- /dev/null +++ b/src/serializer/browser/mod.rs @@ -0,0 +1,11 @@ +#[cfg(target_arch = "wasm32")] +pub(crate) mod unsafe_serializer; + +pub enum Serializer { + Unsafe +} +impl Default for Serializer { + fn default() -> Self { + Self::Unsafe + } +} diff --git a/src/serializer/browser/unsafe_serializer.rs b/src/serializer/browser/unsafe_serializer.rs new file mode 100644 index 0000000..d68f09b --- /dev/null +++ b/src/serializer/browser/unsafe_serializer.rs @@ -0,0 +1,12 @@ +pub struct UnsafeSerializer; + +impl UnsafeSerializer { + pub(crate) async fn serialize(columns: &Vec, values: &Vec) -> Option> { + let mut map: serde_json::Map = serde_json::Map::new(); + for (col, val) in columns.iter().zip(values.iter()) { + map.insert((*col).to_string(), val.clone()); + } + + Some(map) + } +} diff --git a/src/serializer/library_serializer.rs b/src/serializer/common/library_serializer.rs similarity index 99% rename from src/serializer/library_serializer.rs rename to src/serializer/common/library_serializer.rs index 951eff3..c4b827c 100644 --- a/src/serializer/library_serializer.rs +++ b/src/serializer/common/library_serializer.rs @@ -1,4 +1,5 @@ -use crate::serializer::{FlightResult, SerializerTrait}; + +use arrow_flight::error::Result as FlightResult; use crate::Value; use arrow::array::{ Array, ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, DurationMicrosecondArray, @@ -13,12 +14,13 @@ use arrow::datatypes::{DataType, TimeUnit}; use napi_derive::napi; use std::collections::HashMap; use std::sync::Arc; +use crate::serializer::common::SerializerTrait; +#[cfg(not(target_arch = "wasm32"))] pub struct LibrarySerializer; #[napi] pub type LibraryReturnType = HashMap>; - impl SerializerTrait for LibrarySerializer { type Output = LibraryReturnType; diff --git a/src/serializer/common/mod.rs b/src/serializer/common/mod.rs new file mode 100644 index 0000000..dedefe3 --- /dev/null +++ b/src/serializer/common/mod.rs @@ -0,0 +1,37 @@ +use arrow_flight::error::Result as FlightResult; +use std::future::Future; +use arrow::array::RecordBatch; +use napi::bindgen_prelude::ToNapiValue; +use napi_derive::napi; + +pub mod library_serializer; +pub mod raw_serializer; +pub mod unsafe_serializer; + +#[napi(string_enum)] +#[derive(Debug, Clone)] +pub enum Serializer { + #[napi(value = "unsafe")] + Unsafe, + + #[napi(value = "library")] + Library, + + #[napi(value = "raw")] + Raw, +} + +#[cfg(not(target_arch = "wasm32"))] +impl Default for Serializer { + fn default() -> Self { + Self::Library + } +} + +pub trait SerializerTrait { + type Output: ToNapiValue + Send + 'static; + + fn serialize( + batch: FlightResult, + ) -> impl Future>> + Send; +} \ No newline at end of file diff --git a/src/serializer/raw_serializer.rs b/src/serializer/common/raw_serializer.rs similarity index 85% rename from src/serializer/raw_serializer.rs rename to src/serializer/common/raw_serializer.rs index 066e773..cbca018 100644 --- a/src/serializer/raw_serializer.rs +++ b/src/serializer/common/raw_serializer.rs @@ -1,11 +1,11 @@ -use crate::serializer::FlightResult; -use crate::serializer::SerializerTrait; +use arrow_flight::error::Result as FlightResult; +use crate::serializer::common::SerializerTrait; use arrow::array::RecordBatch; use arrow::ipc::writer::StreamWriter; use napi::bindgen_prelude::Buffer; +#[cfg(not(target_arch = "wasm32"))] pub struct RawSerializer; - impl SerializerTrait for RawSerializer { type Output = Buffer; diff --git a/src/serializer/unsafe_serializer.rs b/src/serializer/common/unsafe_serializer.rs similarity index 84% rename from src/serializer/unsafe_serializer.rs rename to src/serializer/common/unsafe_serializer.rs index f894a7c..ab42d62 100644 --- a/src/serializer/unsafe_serializer.rs +++ b/src/serializer/common/unsafe_serializer.rs @@ -1,7 +1,8 @@ -use crate::serializer::SerializerTrait; +use crate::serializer::common::SerializerTrait; use arrow::array::RecordBatch; use arrow_flight::error::Result as FlightResult; +#[cfg(not(target_arch = "wasm32"))] pub struct UnsafeSerializer; impl SerializerTrait for UnsafeSerializer { type Output = serde_json::Map; diff --git a/src/serializer/mod.rs b/src/serializer/mod.rs index b24bf6b..8dcdb3b 100644 --- a/src/serializer/mod.rs +++ b/src/serializer/mod.rs @@ -1,31 +1,5 @@ -use arrow::array::RecordBatch; -use std::future::Future; -// NAPI DOES NOT SUPPORT GENERIC STRUCTURE DEPS -use napi_derive::napi; -pub mod library_serializer; -pub mod raw_serializer; -pub mod unsafe_serializer; +#[cfg(target_arch = "wasm32")] +pub mod browser; -use arrow_flight::error::Result as FlightResult; -use napi::bindgen_prelude::ToNapiValue; - -#[napi(string_enum)] -#[derive(Debug, Clone)] -pub enum Serializer { - #[napi(value = "unsafe")] - Unsafe, - - #[napi(value = "library")] - Library, - - #[napi(value = "raw")] - Raw, -} - -pub trait SerializerTrait { - type Output: ToNapiValue + Send + 'static; - - fn serialize( - batch: FlightResult, - ) -> impl Future>> + Send; -} +#[cfg(not(target_arch = "wasm32"))] +pub mod common; \ No newline at end of file diff --git a/tests/e2e_write.rs b/tests/e2e_write.rs index 8d636ab..614b4b4 100644 --- a/tests/e2e_write.rs +++ b/tests/e2e_write.rs @@ -1,7 +1,7 @@ -use influxdb3_napi::client::options::{Precision, QueryPayload, TimeUnitV2}; -use influxdb3_napi::client::{InfluxDBClient, WriteOptions}; +use influxdb3_napi::client::native::client::InfluxDBClient; +use influxdb3_napi::client::options::{Precision, QueryPayload, TimeUnitV2, WriteOptions}; use influxdb3_napi::point::Point; -use influxdb3_napi::serializer::Serializer; +use influxdb3_napi::serializer::common::Serializer; #[tokio::test] async fn test_write_points_cloud_serverless() { From 623f149b6b29936f02c1b73d84bcc6bf547abcb6 Mon Sep 17 00:00:00 2001 From: BadMachine Date: Thu, 14 Aug 2025 03:38:21 +0200 Subject: [PATCH 2/2] Initial browser split added --- __test__/index.spec.ts | 1 + index.d.ts | 1 + package.json | 3 +++ src/client/browser/client.rs | 3 ++- src/client/napi_rs/client.rs | 4 +--- src/serializer/browser/mod.rs | 1 + src/serializer/browser/unsafe_serializer.rs | 3 ++- yarn.lock | 12 ++++++++++++ 8 files changed, 23 insertions(+), 5 deletions(-) diff --git a/__test__/index.spec.ts b/__test__/index.spec.ts index c17a625..4805406 100644 --- a/__test__/index.spec.ts +++ b/__test__/index.spec.ts @@ -13,6 +13,7 @@ test('Test sql query from cloud serverless', async (t) => { const arr = [] for await (const item of result) { + console.log(item) arr.push(item) } diff --git a/index.d.ts b/index.d.ts index de907fa..4f91ebc 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,6 +1,7 @@ /* auto-generated by NAPI-RS */ /* eslint-disable */ export declare class InfluxDbClient { + constructor(addr: string, token?: string | undefined | null, serializer?: Serializer | undefined | null, options?: FlightOptions | undefined | null) query(queryPayload: QueryPayload): ReadableStream | ReadableStream> | ReadableStream /** * # Safety diff --git a/package.json b/package.json index 0bc5788..8245ce2 100644 --- a/package.json +++ b/package.json @@ -139,5 +139,8 @@ "@badmachine/influxdb3-napi-win32-arm64-msvc": "1.0.0", "@badmachine/influxdb3-napi-win32-ia32-msvc": "1.0.0", "@badmachine/influxdb3-napi-win32-x64-msvc": "1.0.0" + }, + "dependencies": { + "@napi-rs/wasm-runtime": "^1.0.3" } } diff --git a/src/client/browser/client.rs b/src/client/browser/client.rs index 52726c1..2dc69af 100644 --- a/src/client/browser/client.rs +++ b/src/client/browser/client.rs @@ -23,7 +23,8 @@ pub struct InfluxDBClient { // impl InfluxClientTrait for InfluxDBClient { #[napi_derive::napi] impl InfluxDBClient { - fn new( + #[napi(constructor)] + pub fn new( addr: String, token: Option, serializer: Option, diff --git a/src/client/napi_rs/client.rs b/src/client/napi_rs/client.rs index 2254a81..a1222e2 100644 --- a/src/client/napi_rs/client.rs +++ b/src/client/napi_rs/client.rs @@ -17,7 +17,6 @@ use napi::tokio_stream::wrappers::ReceiverStream; use reqwest::Client; use tonic::codegen::Bytes; -#[cfg(not(target_arch = "wasm32"))] #[napi_derive::napi] pub struct InfluxDBClient { addr: String, @@ -26,10 +25,9 @@ pub struct InfluxDBClient { http_client: Client, } -#[cfg(not(target_arch = "wasm32"))] #[napi_derive::napi] impl InfluxDBClient { - + #[napi(constructor)] pub fn new( addr: String, token: Option, diff --git a/src/serializer/browser/mod.rs b/src/serializer/browser/mod.rs index 4681dca..6ad3c0c 100644 --- a/src/serializer/browser/mod.rs +++ b/src/serializer/browser/mod.rs @@ -1,6 +1,7 @@ #[cfg(target_arch = "wasm32")] pub(crate) mod unsafe_serializer; +#[napi_derive::napi] pub enum Serializer { Unsafe } diff --git a/src/serializer/browser/unsafe_serializer.rs b/src/serializer/browser/unsafe_serializer.rs index d68f09b..1833d2f 100644 --- a/src/serializer/browser/unsafe_serializer.rs +++ b/src/serializer/browser/unsafe_serializer.rs @@ -4,7 +4,8 @@ impl UnsafeSerializer { pub(crate) async fn serialize(columns: &Vec, values: &Vec) -> Option> { let mut map: serde_json::Map = serde_json::Map::new(); for (col, val) in columns.iter().zip(values.iter()) { - map.insert((*col).to_string(), val.clone()); + // map.insert((*col).to_string(), val.clone()); + map.insert((*col).to_string(), serde_json::Value::String("TEST_STRING_GETS_ATTENTION".to_string())); } Some(map) diff --git a/yarn.lock b/yarn.lock index ba2e8f5..d37111f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -116,6 +116,7 @@ __metadata: "@emnapi/core": "npm:^1.4.5" "@emnapi/runtime": "npm:^1.4.5" "@napi-rs/cli": "npm:^3.0.3" + "@napi-rs/wasm-runtime": "npm:^1.0.3" "@oxc-node/core": "npm:^0.0.30" "@taplo/cli": "npm:^0.7.0" "@tybys/wasm-util": "npm:^0.10.0" @@ -887,6 +888,17 @@ __metadata: languageName: node linkType: hard +"@napi-rs/wasm-runtime@npm:^1.0.3": + version: 1.0.3 + resolution: "@napi-rs/wasm-runtime@npm:1.0.3" + dependencies: + "@emnapi/core": "npm:^1.4.5" + "@emnapi/runtime": "npm:^1.4.5" + "@tybys/wasm-util": "npm:^0.10.0" + checksum: 10c0/7918d82477e75931b6e35bb003464382eb93e526362f81a98bf8610407a67b10f4d041931015ad48072c89db547deb7e471dfb91f4ab11ac63a24d8580297f75 + languageName: node + linkType: hard + "@napi-rs/wasm-tools-android-arm-eabi@npm:1.0.0": version: 1.0.0 resolution: "@napi-rs/wasm-tools-android-arm-eabi@npm:1.0.0"