diff --git a/bun.lock b/bun.lock index 4539f24..9745ed2 100644 --- a/bun.lock +++ b/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "ai", diff --git a/packages/fluent-ai/src/job/fal.ts b/packages/fluent-ai/src/job/fal.ts index db5e3b7..02273c9 100644 --- a/packages/fluent-ai/src/job/fal.ts +++ b/packages/fluent-ai/src/job/fal.ts @@ -1,12 +1,13 @@ import type { ImageJob } from "~/src/job/schema"; import { createHTTPJob, downloadImages } from "~/src/job/http"; +import { getApiKey } from "~/src/job/utils"; // TODO: switch to fal queue api const BASE_URL = "https://fal.run"; export const runner = { image: async (input: ImageJob["input"], options?: ImageJob["options"]) => { - const apiKey = options?.apiKey || process.env.FAL_API_KEY; + const apiKey = getApiKey(options, "FAL_API_KEY"); const request = new Request(`${BASE_URL}/${input.model}`, { method: "POST", diff --git a/packages/fluent-ai/src/job/openai.ts b/packages/fluent-ai/src/job/openai.ts index 32699f0..5e618f7 100644 --- a/packages/fluent-ai/src/job/openai.ts +++ b/packages/fluent-ai/src/job/openai.ts @@ -1,21 +1,19 @@ -import { EventSourceParserStream } from "eventsource-parser/stream"; -import type { ChatJob, ChatTool, ModelsJob } from "~/src/job/schema"; +import type { ChatJob, ModelsJob } from "~/src/job/schema"; import { createHTTPJob } from "~/src/job/http"; +import { + getApiKey, + transformToolsToFunctions, + transformUsageData, + createStreamingGenerator, +} from "~/src/job/utils"; const BASE_URL = "https://api.openai.com/v1"; export const runner = { chat: async (input: ChatJob["input"], options?: ChatJob["options"]) => { - const apiKey = options?.apiKey || process.env.OPENAI_API_KEY; + const apiKey = getApiKey(options, "OPENAI_API_KEY"); - const tools = input.tools?.map((tool: ChatTool) => ({ - type: "function", - function: { - name: tool.name, - description: tool.description, - parameters: tool.input, - }, - })); + const tools = transformToolsToFunctions(input.tools); const request = new Request(`${BASE_URL}/chat/completions`, { method: "POST", @@ -35,25 +33,7 @@ export const runner = { return createHTTPJob(request, async (response: Response) => { if (input.stream) { - return (async function* () { - const eventStream = response - .body!.pipeThrough(new TextDecoderStream()) - .pipeThrough(new EventSourceParserStream()); - const reader = eventStream.getReader(); - - try { - for (;;) { - const { done, value } = await reader.read(); - if (done || value.data === "[DONE]") { - break; - } - const chunk = JSON.parse(value.data); - yield { raw: chunk }; - } - } finally { - reader.releaseLock(); - } - })(); + return createStreamingGenerator(response); } const data = await response.json(); @@ -66,13 +46,7 @@ export const runner = { tool_calls: data.choices[0].message.tool_calls, }, ], - usage: data.usage - ? { - promptTokens: data.usage.prompt_tokens, - completionTokens: data.usage.completion_tokens, - totalTokens: data.usage.total_tokens, - } - : undefined, + usage: transformUsageData(data.usage), }; }); }, @@ -81,7 +55,7 @@ export const runner = { input?: ModelsJob["input"], options?: ModelsJob["options"], ) => { - const apiKey = options?.apiKey || process.env.OPENAI_API_KEY; + const apiKey = getApiKey(options, "OPENAI_API_KEY"); const request = new Request(`${BASE_URL}/models`, { method: "GET", diff --git a/packages/fluent-ai/src/job/openrouter.ts b/packages/fluent-ai/src/job/openrouter.ts index 1d03633..ea7e584 100644 --- a/packages/fluent-ai/src/job/openrouter.ts +++ b/packages/fluent-ai/src/job/openrouter.ts @@ -1,21 +1,19 @@ -import { EventSourceParserStream } from "eventsource-parser/stream"; -import type { ChatJob, ChatTool } from "~/src/job/schema"; +import type { ChatJob } from "~/src/job/schema"; import { createHTTPJob } from "~/src/job/http"; +import { + getApiKey, + transformToolsToFunctions, + transformUsageData, + createStreamingGenerator, +} from "~/src/job/utils"; const BASE_URL = "https://openrouter.ai/api/v1"; export const runner = { chat: async (input: ChatJob["input"], options?: ChatJob["options"]) => { - const apiKey = options?.apiKey || process.env.OPENROUTER_API_KEY; + const apiKey = getApiKey(options, "OPENROUTER_API_KEY"); - const tools = input.tools?.map((tool: ChatTool) => ({ - type: "function", - function: { - name: tool.name, - description: tool.description, - parameters: tool.input, - }, - })); + const tools = transformToolsToFunctions(input.tools); const request = new Request(`${BASE_URL}/chat/completions`, { method: "POST", @@ -35,25 +33,7 @@ export const runner = { return createHTTPJob(request, async (response: Response) => { if (input.stream) { - return (async function* () { - const eventStream = response - .body!.pipeThrough(new TextDecoderStream()) - .pipeThrough(new EventSourceParserStream()); - const reader = eventStream.getReader(); - - try { - for (;;) { - const { done, value } = await reader.read(); - if (done || value.data === "[DONE]") { - break; - } - const chunk = JSON.parse(value.data); - yield { raw: chunk }; - } - } finally { - reader.releaseLock(); - } - })(); + return createStreamingGenerator(response); } const data = await response.json(); @@ -66,13 +46,7 @@ export const runner = { tool_calls: data.choices[0].message.tool_calls, }, ], - usage: data.usage - ? { - promptTokens: data.usage.prompt_tokens, - completionTokens: data.usage.completion_tokens, - totalTokens: data.usage.total_tokens, - } - : undefined, + usage: transformUsageData(data.usage), }; }); }, diff --git a/packages/fluent-ai/src/job/utils.ts b/packages/fluent-ai/src/job/utils.ts new file mode 100644 index 0000000..92ec3f0 --- /dev/null +++ b/packages/fluent-ai/src/job/utils.ts @@ -0,0 +1,50 @@ +import { EventSourceParserStream } from "eventsource-parser/stream"; +import type { ChatTool } from "~/src/job/schema"; + +export function getApiKey( + options: { apiKey?: string } | undefined, + envVarName: string, +): string | undefined { + return options?.apiKey || process.env[envVarName]; +} + +export function transformToolsToFunctions(tools?: ChatTool[]) { + return tools?.map((tool: ChatTool) => ({ + type: "function", + function: { + name: tool.name, + description: tool.description, + parameters: tool.input, + }, + })); +} + +export function transformUsageData(usage?: any) { + return usage + ? { + promptTokens: usage.prompt_tokens, + completionTokens: usage.completion_tokens, + totalTokens: usage.total_tokens, + } + : undefined; +} + +export async function* createStreamingGenerator(response: Response) { + const eventStream = response + .body!.pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()); + const reader = eventStream.getReader(); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done || value.data === "[DONE]") { + break; + } + const chunk = JSON.parse(value.data); + yield { raw: chunk }; + } + } finally { + reader.releaseLock(); + } +} diff --git a/packages/fluent-ai/src/job/voyage.ts b/packages/fluent-ai/src/job/voyage.ts index 747a782..b4d431f 100644 --- a/packages/fluent-ai/src/job/voyage.ts +++ b/packages/fluent-ai/src/job/voyage.ts @@ -1,5 +1,6 @@ import type { EmbeddingJob } from "~/src/job/schema"; import { createHTTPJob } from "~/src/job/http"; +import { getApiKey } from "~/src/job/utils"; const BASE_URL = "https://api.voyageai.com/v1"; @@ -8,7 +9,7 @@ export const runner = { input: EmbeddingJob["input"], options?: EmbeddingJob["options"], ) => { - const apiKey = options?.apiKey || process.env.VOYAGE_API_KEY; + const apiKey = getApiKey(options, "VOYAGE_API_KEY"); const request = new Request(`${BASE_URL}/embeddings`, { method: "POST",