diff --git a/.vscode/settings.json b/.vscode/settings.json index 1d680c1..e69fb38 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,6 +8,7 @@ }, "deno.enablePaths": [ "scripts", - "src/clients/deno" + "src/clients/deno", + "tests" ], } diff --git a/src/bin/proxy_json.rs b/src/bin/proxy_json.rs new file mode 100644 index 0000000..d1c6985 --- /dev/null +++ b/src/bin/proxy_json.rs @@ -0,0 +1,175 @@ +use actson::feeder::BufReaderJsonFeeder; +use actson::options::JsonParserOptionsBuilder; +use actson::{JsonEvent, JsonParser}; +use std::env; +use std::fs::File; +use std::io::{self, BufReader, BufWriter, Read, Write}; +use std::process::{Command, Stdio}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, Sender}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; + +fn should_add_comma(json_string: &str) -> bool { + !json_string.ends_with('{') && !json_string.ends_with('[') && !json_string.ends_with(':') +} + +fn append_value(json_string: &mut String, value: &str) { + if should_add_comma(json_string) { + json_string.push(','); + } + json_string.push_str(value); +} + +fn process_and_forward_json( + reader: BufReader, + output_file: Option, + forward_to: Sender, +) -> JoinHandle<()> { + thread::spawn(move || { + let feeder = BufReaderJsonFeeder::new(reader); + let mut parser = JsonParser::new_with_options( + feeder, + JsonParserOptionsBuilder::default() + .with_streaming(true) + .build(), + ); + + let mut json_string = String::new(); + let mut depth = 0; + + while let Some(event) = parser.next_event().unwrap_or(None) { + match event { + JsonEvent::NeedMoreInput => { + if parser.feeder.fill_buf().is_err() { + break; + } + } + JsonEvent::StartObject => { + depth += 1; + json_string.push('{'); + } + JsonEvent::EndObject => { + depth -= 1; + json_string.push('}'); + + if depth == 0 { + if forward_to.send(json_string.clone()).is_err() { + break; + } + + if let Some(file_path) = &output_file { + if let Ok(mut file) = + File::options().create(true).append(true).open(file_path) + { + let _ = writeln!(file, "{}", json_string); + } + } + + json_string.clear(); + } + } + JsonEvent::StartArray => { + depth += 1; + json_string.push('['); + } + JsonEvent::EndArray => { + depth -= 1; + json_string.push(']'); + } + JsonEvent::FieldName => { + if json_string.ends_with('{') { + json_string.push('"'); + } else { + json_string.push_str(",\""); + } + json_string.push_str(parser.current_str().unwrap_or_default()); + json_string.push_str("\":"); + } + JsonEvent::ValueString => { + append_value( + &mut json_string, + &format!("\"{}\"", parser.current_str().unwrap_or_default()), + ); + } + JsonEvent::ValueInt => { + append_value( + &mut json_string, + &parser.current_int::().unwrap_or_default().to_string(), + ); + } + JsonEvent::ValueFloat => { + append_value( + &mut json_string, + &parser.current_float().unwrap_or_default().to_string(), + ); + } + JsonEvent::ValueTrue => append_value(&mut json_string, "true"), + JsonEvent::ValueFalse => append_value(&mut json_string, "false"), + JsonEvent::ValueNull => append_value(&mut json_string, "null"), + } + } + }) +} + +fn forward_to_writer( + writer: W, + receiver: mpsc::Receiver, + done: Arc, +) -> JoinHandle<()> { + thread::spawn(move || { + let mut writer = BufWriter::new(writer); + while !done.load(Ordering::SeqCst) { + if let Ok(json) = receiver.recv() { + if writeln!(writer, "{}", json).is_err() || writer.flush().is_err() { + break; + } + } else { + break; + } + } + }) +} + +fn main() -> io::Result<()> { + let proxy_to = env::var("PROXY_TO").expect("PROXY_TO environment variable must be set"); + let output_file = + env::var("OUTPUT_FILE").expect("OUTPUT_FILE environment variable must be set"); + + File::create(&output_file)?; + + let shell = if cfg!(target_os = "windows") { + ("cmd", "/C") + } else { + ("sh", "-c") + }; + + let mut child = Command::new(shell.0) + .arg(shell.1) + .arg(&proxy_to) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; + + let done_stdin = Arc::new(AtomicBool::new(false)); + let done_stdout = Arc::new(AtomicBool::new(false)); + + let (tx_stdin, rx_stdin) = mpsc::channel(); + let child_stdin = child.stdin.take().unwrap(); + let _stdin_thread = forward_to_writer(child_stdin, rx_stdin, done_stdin.clone()); + + let (tx_stdout, rx_stdout) = mpsc::channel(); + let stdout = io::stdout(); + let _stdout_thread = forward_to_writer(stdout, rx_stdout, done_stdout.clone()); + + let stdin_reader = BufReader::new(io::stdin()); + let _stdin_process = process_and_forward_json(stdin_reader, Some(output_file), tx_stdin); + + let child_stdout = child.stdout.take().unwrap(); + let stdout_reader = BufReader::new(child_stdout); + let _stdout_process = process_and_forward_json(stdout_reader, None, tx_stdout); + + child.wait()?; + + Ok(()) +} diff --git a/tests/proxy_json.test.ts b/tests/proxy_json.test.ts new file mode 100644 index 0000000..6dcffea --- /dev/null +++ b/tests/proxy_json.test.ts @@ -0,0 +1,128 @@ +import { assertEquals } from "jsr:@std/assert"; + +async function withTempFile(fn: (path: string) => Promise) { + const tempFile = await Deno.makeTempFile(); + try { + await fn(tempFile); + } finally { + try { + await Deno.remove(tempFile); + } catch { + // Ignore cleanup errors + } + } +} + +Deno.test("process single JSON object", async () => { + await withTempFile(async (outputFile) => { + // Start the proxy process + const proxy = new Deno.Command("cargo", { + args: ["run", "--bin", "proxy_json"], + stdin: "piped", + stdout: "piped", + stderr: "piped", + env: { + PROXY_TO: "cat", + OUTPUT_FILE: outputFile, + }, + }); + + const proxyProcess = proxy.spawn(); + const testJson = { test: "value", number: 42 }; + + // Send JSON to proxy + const writer = proxyProcess.stdin.getWriter(); + await writer.write( + new TextEncoder().encode(JSON.stringify(testJson) + "\n"), + ); + await writer.close(); + + // Wait for completion + const { success, stdout, stderr } = await proxyProcess.output(); + assertEquals(success, true); + + // Read and verify output file + const content = await Deno.readTextFile(outputFile); + assertEquals(JSON.parse(content.trim()), testJson); + }); +}); + +Deno.test("process multiple JSON objects", async () => { + await withTempFile(async (outputFile) => { + const proxy = new Deno.Command("cargo", { + args: ["run", "--bin", "proxy_json"], + stdin: "piped", + stdout: "piped", + stderr: "piped", + env: { + PROXY_TO: "cat", + OUTPUT_FILE: outputFile, + }, + }); + + const proxyProcess = proxy.spawn(); + const testJsons = [ + { first: "object" }, + { second: 42 }, + { third: true }, + ]; + + // Send each JSON object + const writer = proxyProcess.stdin.getWriter(); + for (const obj of testJsons) { + await writer.write(new TextEncoder().encode(JSON.stringify(obj) + "\n")); + } + await writer.close(); + + // Wait for completion + const { success } = await proxyProcess.output(); + assertEquals(success, true); + + // Read and verify output file + const content = await Deno.readTextFile(outputFile); + const lines = content.trim().split("\n"); + assertEquals( + lines.map((line) => JSON.parse(line)), + testJsons, + ); + }); +}); + +Deno.test("process nested JSON", async () => { + await withTempFile(async (outputFile) => { + const proxy = new Deno.Command("cargo", { + args: ["run", "--bin", "proxy_json"], + stdin: "piped", + stdout: "piped", + stderr: "piped", + env: { + PROXY_TO: "cat", + OUTPUT_FILE: outputFile, + }, + }); + + const proxyProcess = proxy.spawn(); + const testJson = { + nested: { + object: { deep: true }, + array: [1, 2, 3], + }, + }; + + // Send JSON to proxy + const writer = proxyProcess.stdin.getWriter(); + await writer.write( + new TextEncoder().encode(JSON.stringify(testJson) + "\n"), + ); + await writer.close(); + + // Wait for completion + const { success } = await proxyProcess.output(); + assertEquals(success, true); + + // Read and verify output file + const content = await Deno.readTextFile(outputFile); + const parsed = JSON.parse(content.trim()); + assertEquals(parsed, testJson); + }); +});