From 35cc64a46188663cc6aef3834ccda3f9019bfb68 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 16 Nov 2025 03:12:18 +0000 Subject: [PATCH 1/4] Initial plan From ab4622c7fe79d8edbfce0d10c022932826c21354 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 16 Nov 2025 03:15:30 +0000 Subject: [PATCH 2/4] Initial plan for refactoring duplicated code Co-authored-by: wangzuo <1039026+wangzuo@users.noreply.github.com> --- bun.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/bun.lock b/bun.lock index 4539f24..9745ed2 100644 --- a/bun.lock +++ b/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "ai", From 04f75b54bef26c52c06163903f456e4b864c75e0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 16 Nov 2025 03:17:48 +0000 Subject: [PATCH 3/4] Refactor: Extract duplicated code into shared utilities Co-authored-by: wangzuo <1039026+wangzuo@users.noreply.github.com> --- packages/fluent-ai/src/job/fal.ts | 3 +- packages/fluent-ai/src/job/openai.ts | 50 +++++-------------- packages/fluent-ai/src/job/openrouter.ts | 48 +++++------------- packages/fluent-ai/src/job/utils.ts | 62 ++++++++++++++++++++++++ packages/fluent-ai/src/job/voyage.ts | 3 +- 5 files changed, 89 insertions(+), 77 deletions(-) create mode 100644 packages/fluent-ai/src/job/utils.ts diff --git a/packages/fluent-ai/src/job/fal.ts b/packages/fluent-ai/src/job/fal.ts index db5e3b7..02273c9 100644 --- a/packages/fluent-ai/src/job/fal.ts +++ b/packages/fluent-ai/src/job/fal.ts @@ -1,12 +1,13 @@ import type { ImageJob } from "~/src/job/schema"; import { createHTTPJob, downloadImages } from "~/src/job/http"; +import { getApiKey } from "~/src/job/utils"; // TODO: switch to fal queue api const BASE_URL = "https://fal.run"; export const runner = { image: async (input: ImageJob["input"], options?: ImageJob["options"]) => { - const apiKey = options?.apiKey || process.env.FAL_API_KEY; + const apiKey = getApiKey(options, "FAL_API_KEY"); const request = new Request(`${BASE_URL}/${input.model}`, { method: "POST", diff --git a/packages/fluent-ai/src/job/openai.ts b/packages/fluent-ai/src/job/openai.ts index 32699f0..5e618f7 100644 --- a/packages/fluent-ai/src/job/openai.ts +++ b/packages/fluent-ai/src/job/openai.ts @@ -1,21 +1,19 @@ -import { EventSourceParserStream } from "eventsource-parser/stream"; -import type { ChatJob, ChatTool, ModelsJob } from "~/src/job/schema"; +import type { ChatJob, ModelsJob } from "~/src/job/schema"; import { createHTTPJob } from "~/src/job/http"; +import { + getApiKey, + transformToolsToFunctions, + transformUsageData, + createStreamingGenerator, +} from "~/src/job/utils"; const BASE_URL = "https://api.openai.com/v1"; export const runner = { chat: async (input: ChatJob["input"], options?: ChatJob["options"]) => { - const apiKey = options?.apiKey || process.env.OPENAI_API_KEY; + const apiKey = getApiKey(options, "OPENAI_API_KEY"); - const tools = input.tools?.map((tool: ChatTool) => ({ - type: "function", - function: { - name: tool.name, - description: tool.description, - parameters: tool.input, - }, - })); + const tools = transformToolsToFunctions(input.tools); const request = new Request(`${BASE_URL}/chat/completions`, { method: "POST", @@ -35,25 +33,7 @@ export const runner = { return createHTTPJob(request, async (response: Response) => { if (input.stream) { - return (async function* () { - const eventStream = response - .body!.pipeThrough(new TextDecoderStream()) - .pipeThrough(new EventSourceParserStream()); - const reader = eventStream.getReader(); - - try { - for (;;) { - const { done, value } = await reader.read(); - if (done || value.data === "[DONE]") { - break; - } - const chunk = JSON.parse(value.data); - yield { raw: chunk }; - } - } finally { - reader.releaseLock(); - } - })(); + return createStreamingGenerator(response); } const data = await response.json(); @@ -66,13 +46,7 @@ export const runner = { tool_calls: data.choices[0].message.tool_calls, }, ], - usage: data.usage - ? { - promptTokens: data.usage.prompt_tokens, - completionTokens: data.usage.completion_tokens, - totalTokens: data.usage.total_tokens, - } - : undefined, + usage: transformUsageData(data.usage), }; }); }, @@ -81,7 +55,7 @@ export const runner = { input?: ModelsJob["input"], options?: ModelsJob["options"], ) => { - const apiKey = options?.apiKey || process.env.OPENAI_API_KEY; + const apiKey = getApiKey(options, "OPENAI_API_KEY"); const request = new Request(`${BASE_URL}/models`, { method: "GET", diff --git a/packages/fluent-ai/src/job/openrouter.ts b/packages/fluent-ai/src/job/openrouter.ts index 1d03633..ea7e584 100644 --- a/packages/fluent-ai/src/job/openrouter.ts +++ b/packages/fluent-ai/src/job/openrouter.ts @@ -1,21 +1,19 @@ -import { EventSourceParserStream } from "eventsource-parser/stream"; -import type { ChatJob, ChatTool } from "~/src/job/schema"; +import type { ChatJob } from "~/src/job/schema"; import { createHTTPJob } from "~/src/job/http"; +import { + getApiKey, + transformToolsToFunctions, + transformUsageData, + createStreamingGenerator, +} from "~/src/job/utils"; const BASE_URL = "https://openrouter.ai/api/v1"; export const runner = { chat: async (input: ChatJob["input"], options?: ChatJob["options"]) => { - const apiKey = options?.apiKey || process.env.OPENROUTER_API_KEY; + const apiKey = getApiKey(options, "OPENROUTER_API_KEY"); - const tools = input.tools?.map((tool: ChatTool) => ({ - type: "function", - function: { - name: tool.name, - description: tool.description, - parameters: tool.input, - }, - })); + const tools = transformToolsToFunctions(input.tools); const request = new Request(`${BASE_URL}/chat/completions`, { method: "POST", @@ -35,25 +33,7 @@ export const runner = { return createHTTPJob(request, async (response: Response) => { if (input.stream) { - return (async function* () { - const eventStream = response - .body!.pipeThrough(new TextDecoderStream()) - .pipeThrough(new EventSourceParserStream()); - const reader = eventStream.getReader(); - - try { - for (;;) { - const { done, value } = await reader.read(); - if (done || value.data === "[DONE]") { - break; - } - const chunk = JSON.parse(value.data); - yield { raw: chunk }; - } - } finally { - reader.releaseLock(); - } - })(); + return createStreamingGenerator(response); } const data = await response.json(); @@ -66,13 +46,7 @@ export const runner = { tool_calls: data.choices[0].message.tool_calls, }, ], - usage: data.usage - ? { - promptTokens: data.usage.prompt_tokens, - completionTokens: data.usage.completion_tokens, - totalTokens: data.usage.total_tokens, - } - : undefined, + usage: transformUsageData(data.usage), }; }); }, diff --git a/packages/fluent-ai/src/job/utils.ts b/packages/fluent-ai/src/job/utils.ts new file mode 100644 index 0000000..148759d --- /dev/null +++ b/packages/fluent-ai/src/job/utils.ts @@ -0,0 +1,62 @@ +import { EventSourceParserStream } from "eventsource-parser/stream"; +import type { ChatTool } from "~/src/job/schema"; + +/** + * Gets API key from options or environment variable + */ +export function getApiKey( + options: { apiKey?: string } | undefined, + envVarName: string, +): string | undefined { + return options?.apiKey || process.env[envVarName]; +} + +/** + * Transforms internal ChatTool format to OpenAI-compatible function calling format + */ +export function transformToolsToFunctions(tools?: ChatTool[]) { + return tools?.map((tool: ChatTool) => ({ + type: "function", + function: { + name: tool.name, + description: tool.description, + parameters: tool.input, + }, + })); +} + +/** + * Transforms OpenAI-style usage data to internal format + */ +export function transformUsageData(usage?: any) { + return usage + ? { + promptTokens: usage.prompt_tokens, + completionTokens: usage.completion_tokens, + totalTokens: usage.total_tokens, + } + : undefined; +} + +/** + * Creates an async generator for OpenAI-compatible streaming responses + */ +export async function* createStreamingGenerator(response: Response) { + const eventStream = response + .body!.pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()); + const reader = eventStream.getReader(); + + try { + for (;;) { + const { done, value } = await reader.read(); + if (done || value.data === "[DONE]") { + break; + } + const chunk = JSON.parse(value.data); + yield { raw: chunk }; + } + } finally { + reader.releaseLock(); + } +} diff --git a/packages/fluent-ai/src/job/voyage.ts b/packages/fluent-ai/src/job/voyage.ts index 747a782..b4d431f 100644 --- a/packages/fluent-ai/src/job/voyage.ts +++ b/packages/fluent-ai/src/job/voyage.ts @@ -1,5 +1,6 @@ import type { EmbeddingJob } from "~/src/job/schema"; import { createHTTPJob } from "~/src/job/http"; +import { getApiKey } from "~/src/job/utils"; const BASE_URL = "https://api.voyageai.com/v1"; @@ -8,7 +9,7 @@ export const runner = { input: EmbeddingJob["input"], options?: EmbeddingJob["options"], ) => { - const apiKey = options?.apiKey || process.env.VOYAGE_API_KEY; + const apiKey = getApiKey(options, "VOYAGE_API_KEY"); const request = new Request(`${BASE_URL}/embeddings`, { method: "POST", From acdc7a72d08373a049a94cc3377698f08c54b1c1 Mon Sep 17 00:00:00 2001 From: Zuo Wang Date: Tue, 25 Nov 2025 23:59:50 +0800 Subject: [PATCH 4/4] refactor: remove unnecessary comments and improve loop syntax in utils --- packages/fluent-ai/src/job/utils.ts | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/packages/fluent-ai/src/job/utils.ts b/packages/fluent-ai/src/job/utils.ts index 148759d..92ec3f0 100644 --- a/packages/fluent-ai/src/job/utils.ts +++ b/packages/fluent-ai/src/job/utils.ts @@ -1,9 +1,6 @@ import { EventSourceParserStream } from "eventsource-parser/stream"; import type { ChatTool } from "~/src/job/schema"; -/** - * Gets API key from options or environment variable - */ export function getApiKey( options: { apiKey?: string } | undefined, envVarName: string, @@ -11,9 +8,6 @@ export function getApiKey( return options?.apiKey || process.env[envVarName]; } -/** - * Transforms internal ChatTool format to OpenAI-compatible function calling format - */ export function transformToolsToFunctions(tools?: ChatTool[]) { return tools?.map((tool: ChatTool) => ({ type: "function", @@ -25,9 +19,6 @@ export function transformToolsToFunctions(tools?: ChatTool[]) { })); } -/** - * Transforms OpenAI-style usage data to internal format - */ export function transformUsageData(usage?: any) { return usage ? { @@ -38,9 +29,6 @@ export function transformUsageData(usage?: any) { : undefined; } -/** - * Creates an async generator for OpenAI-compatible streaming responses - */ export async function* createStreamingGenerator(response: Response) { const eventStream = response .body!.pipeThrough(new TextDecoderStream()) @@ -48,7 +36,7 @@ export async function* createStreamingGenerator(response: Response) { const reader = eventStream.getReader(); try { - for (;;) { + while (true) { const { done, value } = await reader.read(); if (done || value.data === "[DONE]") { break;