Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,19 @@ version = "3.2"
git = "https://github.com/BadMachine/napi-rs"
branch = "fix/readable-stream-byte-mode-lock"
package = "napi"
features = ["full","experimental","tokio","web_stream","tokio-stream"]
#features = ["full","experimental","tokio","web_stream","tokio-stream"]
features = ["experimental", "web_stream", "serde-json", "napi10"]

[dependencies.reqwest]
version = "0.12"
features = ["json","rustls-tls-native-roots-no-provider","gzip"]

[dependencies.tonic]
version = "0.13"
features = ["gzip","tls-native-roots","tls-webpki-roots", "tls-ring"] # since rustls update it requires to manually initialize cryptoprovider
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tonic = { version = "0.13", features = ["gzip","tls-native-roots","tls-webpki-roots","tls-ring"] }
arrow = { version = "56.0.0", features = ["arrow-json"] }
arrow-flight = { version = "56.0.0", features = ["flight-sql"] }
serde_arrow = { version = "0.13.5", features = ["arrow-56"] }

[dependencies.arrow]
version = "56.0.0"
features = ["arrow-json"]

[dependencies.arrow-flight]
version = "56.0.0"
features = ["flight-sql"]

[dependencies.serde_arrow]
version = "0.13.5"
features = ["arrow-56"]

[build-dependencies]
napi-build = "2.2.3"
Expand Down
1 change: 1 addition & 0 deletions __test__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ test('Test sql query from cloud serverless', async (t) => {

const arr = []
for await (const item of result) {
console.log(item)
arr.push(item)
}

Expand Down
2 changes: 1 addition & 1 deletion browser.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from 'influxdb3-napi-wasm32-wasi'
export * from '@badmachine/influxdb3-napi-wasm32-wasi'
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"bench": "node --import @oxc-node/core/register benchmark/bench.ts",
"build": "napi build --platform --release --no-const-enum",
"build:wasm-unknown": "napi build --platform --release -t wasm32-unknown-unknown",
"build:wasip2": "napi build --platform --release -t wasm32-wasip2",
"build:wasip1-threads": "napi build --platform --release -t wasm32-wasip1-threads",
"build:debug": "napi build --platform",
"format": "run-p format:prettier format:rs format:toml",
"format:prettier": "prettier . -w",
Expand Down Expand Up @@ -139,5 +139,8 @@
"@badmachine/influxdb3-napi-win32-arm64-msvc": "1.0.0",
"@badmachine/influxdb3-napi-win32-ia32-msvc": "1.0.0",
"@badmachine/influxdb3-napi-win32-x64-msvc": "1.0.0"
},
"dependencies": {
"@napi-rs/wasm-runtime": "^1.0.3"
}
}
5 changes: 2 additions & 3 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use influxdb3_napi::client::options::{QueryPayload, QueryType};
use napi::bindgen_prelude::Either3;
use napi::tokio;
use napi::tokio_stream::StreamExt;

#[tokio::main]
#[cfg(feature = "native")]
async fn main() {
println!("Not implemented yet for native binary");
}

#[tokio::main]
#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(feature = "native"))]
async fn main() {
println!("Not implemented yet");
Expand Down
37 changes: 37 additions & 0 deletions src/client/base.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// CURRENTLY NAPI-RS DOES NOT SUPPORT TRAIT IMPLEMENTATIONS

// use napi::bindgen_prelude::{Either3, ReadableStream};
// use napi::Env;
// use crate::client::options::{FlightOptions, QueryPayload};
// use crate::client::WriteOptions;
// use crate::serializer::library_serializer::LibraryReturnType;
// use crate::serializer::Serializer;
//
// pub trait InfluxClientTrait {
// fn new(
// addr: String,
// token: Option<String>,
// serializer: Option<Serializer>,
// options: Option<FlightOptions>,
// ) -> Self;
//
// fn query(
// &mut self,
// query_payload: QueryPayload,
// env: &Env,
// ) -> napi::Result<
// Either3<
// ReadableStream<'_, LibraryReturnType>,
// ReadableStream<'_, serde_json::Map<String, serde_json::Value>>,
// ReadableStream<'_, napi::bindgen_prelude::Buffer>,
// >,
// >;
//
// fn write(
// &mut self,
// lines: Vec<String>,
// database: String,
// write_options: Option<WriteOptions>,
// org: Option<String>,
// ) -> napi::Result<()>;
// }
106 changes: 106 additions & 0 deletions src/client/browser/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use crate::client::http_client::get_http_client;
use crate::client::options::{FlightOptions, QueryPayload, WriteOptions};
use crate::serializer::browser::{Serializer};
use napi::bindgen_prelude::{Buffer, Either, ReadableStream};
use napi::Env;
use napi::tokio_stream::wrappers::ReceiverStream;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use crate::client::browser::HttpQueryResponseV1;
use crate::query::browser::query_processor::into_stream;
use napi::bindgen_prelude::*;

#[napi_derive::napi]
pub struct InfluxDBClient {
addr: String,
http_client: Client,
serializer: Serializer,
options: FlightOptions,
}

// replace it with #[napi_derive::napi] in the future
// impl InfluxClientTrait for InfluxDBClient {
#[napi_derive::napi]
impl InfluxDBClient {
#[napi(constructor)]
pub fn new(
addr: String,
token: Option<String>,
serializer: Option<Serializer>,
options: Option<FlightOptions>,
) -> Self {
Self {
addr,
http_client: get_http_client(token.unwrap_or(String::from(""))),
serializer: serializer.unwrap_or_default(),
options: options.unwrap_or_default(),
}
}

#[napi_derive::napi]
pub fn query(
&mut self,
query_payload: QueryPayload,
env: &Env,
) -> napi::Result<
Either<
ReadableStream<'_, Map<String, Value>>,
ReadableStream<'_, Buffer>,
>,
> {
let stream = self.query_inner(query_payload, env)?;
Ok(Either::A(stream))
}

pub fn query_inner(
&mut self,
query_payload: QueryPayload,
env: &Env,
) -> Result<ReadableStream<'_, serde_json::Map<String, serde_json::Value>>> {
use napi::bindgen_prelude::block_on;

let stream: ReceiverStream<Result<serde_json::Map<String, serde_json::Value>>> = block_on(async {
let url = format!("{}/query", self.addr);

let response = self
.http_client
.get(&url)
.query(&[
("db", query_payload.database.clone()),
("q", query_payload.query.clone()),
])
.send()
.await
.map_err(|e| Error::from_reason(format!("HTTP request failed: {}", e)))?;

let status = response.status();
if !status.is_success() {
return Err(Error::from_reason(format!(
"InfluxDB returned non-success status: {}",
status
)));
}

let data: HttpQueryResponseV1 = response
.json()
.await
.map_err(|e| Error::from_reason(format!("Failed to parse JSON: {}", e)))?;

Ok(into_stream(data))
})?;

ReadableStream::new(env, stream)
}

#[napi_derive::napi]
pub fn write(
&mut self,
lines: Vec<String>,
database: String,
write_options: Option<WriteOptions>,
org: Option<String>,
) -> Result<()> {
todo!()
}
}
21 changes: 21 additions & 0 deletions src/client/browser/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use serde::{Deserialize, Serialize};

pub(crate) mod client;

#[derive(Deserialize, Serialize)]
pub struct Series {
name: String,
pub(crate)columns: Vec<String>,
pub(crate)values: Vec<Vec<serde_json::Value>>,
}

#[derive(Deserialize, Serialize)]
pub struct QueryDataV1 {
statement_id: u32,
pub(crate) series: Vec<Series>,
}

#[derive(Deserialize, Serialize)]
pub(crate) struct HttpQueryResponseV1 {
pub(crate)results: Vec<QueryDataV1>,
}
Loading
Loading