diff --git a/src/renderer/features/agents/main/active-chat.tsx b/src/renderer/features/agents/main/active-chat.tsx index 80c949bb..70cf9b28 100644 --- a/src/renderer/features/agents/main/active-chat.tsx +++ b/src/renderer/features/agents/main/active-chat.tsx @@ -183,7 +183,7 @@ import { } from "../search" import { agentChatStore } from "../stores/agent-chat-store" import { EMPTY_QUEUE, useMessageQueueStore } from "../stores/message-queue-store" -import { clearSubChatCaches, isRollingBackAtom, rollbackHandlerAtom, syncMessagesWithStatusAtom } from "../stores/message-store" +import { clearSubChatCaches, isRollingBackAtom, messageTokenDataAtom, rollbackHandlerAtom, syncMessagesWithStatusAtom } from "../stores/message-store" import { useStreamingStatusStore } from "../stores/streaming-status-store" import { useAgentSubChatStore, @@ -2572,21 +2572,16 @@ const ChatViewInner = memo(function ChatViewInner({ [messages], ) - // Pre-compute token data for ChatInputArea to avoid passing unstable messages array - // This prevents ChatInputArea from re-rendering on every streaming chunk - const messageTokenData = useMemo(() => { - let totalInputTokens = 0 - let totalOutputTokens = 0 - let totalCostUsd = 0 - for (const msg of messages) { - if (msg.metadata) { - totalInputTokens += msg.metadata.inputTokens || 0 - totalOutputTokens += msg.metadata.outputTokens || 0 - totalCostUsd += msg.metadata.totalCostUsd || 0 - } - } - return { totalInputTokens, totalOutputTokens, totalCostUsd, messageCount: messages.length } - }, [messages]) + // Token data from centralized store - uses cached O(1) lookup during streaming + // Only recalculates when message count changes or last message's metadata updates + const storeTokenData = useAtomValue(messageTokenDataAtom) + // Map to expected field names for ChatInputArea + const messageTokenData = useMemo(() => ({ + totalInputTokens: storeTokenData.inputTokens, + totalOutputTokens: storeTokenData.outputTokens, + totalCostUsd: storeTokenData.totalCostUsd, + messageCount: storeTokenData.messageCount, + }), [storeTokenData.inputTokens, storeTokenData.outputTokens, storeTokenData.totalCostUsd, storeTokenData.messageCount]) // Track previous streaming state to detect stream stop const prevIsStreamingRef = useRef(isStreaming) diff --git a/src/renderer/features/agents/stores/agent-chat-store.ts b/src/renderer/features/agents/stores/agent-chat-store.ts index 40334128..206b31c9 100644 --- a/src/renderer/features/agents/stores/agent-chat-store.ts +++ b/src/renderer/features/agents/stores/agent-chat-store.ts @@ -20,7 +20,31 @@ export const agentChatStore = { has: (id: string) => chats.has(id), + /** + * Abort any active stream for a sub-chat. + * Should be called before delete() to ensure proper cleanup. + */ + abort: (id: string) => { + const chat = chats.get(id) + if (chat) { + try { + // Chat.stop() aborts the current stream + chat.stop() + } catch (e) { + // Ignore errors during abort - chat may already be stopped + console.debug(`[agentChatStore] Error stopping chat ${id}:`, e) + } + } + manuallyAborted.set(id, true) + }, + + /** + * Delete a chat instance and all associated data. + * IMPORTANT: Call abort() first if the chat may be streaming. + */ delete: (id: string) => { + // Abort first to ensure stream is stopped + agentChatStore.abort(id) chats.delete(id) streamIds.delete(id) parentChatIds.delete(id) diff --git a/src/renderer/features/agents/stores/message-store.ts b/src/renderer/features/agents/stores/message-store.ts index 9bc93a3e..8a1528db 100644 --- a/src/renderer/features/agents/stores/message-store.ts +++ b/src/renderer/features/agents/stores/message-store.ts @@ -2,6 +2,7 @@ import { atom } from "jotai" import { atomFamily } from "jotai/utils" +import { agentChatStore } from "./agent-chat-store" // Types export interface MessagePart { @@ -51,6 +52,48 @@ export const messageAtomFamily = atomFamily((_messageId: string) => // Track active message IDs per subChat for cleanup const activeMessageIdsByChat = new Map>() +// Track text part cache keys per message for O(1) cleanup +const textPartKeysByMessage = new Map>() + +// ============================================================================ +// LRU CACHE EVICTION - Prevents memory leaks from accumulated subChat caches +// ============================================================================ +// Uses Map's insertion order for O(1) LRU operations. +// Delete + set moves item to end; first key is oldest. +// ============================================================================ +const MAX_CACHED_SUBCHATS = 10 +const subChatLRU = new Map() + +// Track the current (most recent) subChat to avoid unnecessary LRU updates +let currentLRUSubChat: string | null = null + +function touchSubChat(subChatId: string) { + // Skip if this is already the most recent subChat (common case during streaming) + if (subChatId === currentLRUSubChat) { + return + } + currentLRUSubChat = subChatId + + // Delete and re-add to move to end (most recently used) + // This is O(1) because Map maintains insertion order + if (subChatLRU.has(subChatId)) { + subChatLRU.delete(subChatId) + } + subChatLRU.set(subChatId, true) + + // Evict oldest subchats if we exceed the threshold + while (subChatLRU.size > MAX_CACHED_SUBCHATS) { + const oldestSubChatId = subChatLRU.keys().next().value + if (oldestSubChatId) { + subChatLRU.delete(oldestSubChatId) + // CRITICAL: Abort any active stream before clearing caches + // This prevents background streams from continuing to consume resources + agentChatStore.abort(oldestSubChatId) + clearSubChatCaches(oldestSubChatId) + } + } +} + // Ordered list of message IDs (for rendering order) export const messageIdsAtom = atom([]) @@ -114,6 +157,16 @@ export const textPartAtomFamily = atomFamily((key: string) => { const [messageId, partIndexStr] = key.split(":") const partIndex = parseInt(partIndexStr!, 10) + // Track this key for O(1) cleanup when message is removed + if (messageId) { + let keys = textPartKeysByMessage.get(messageId) + if (!keys) { + keys = new Set() + textPartKeysByMessage.set(messageId, keys) + } + keys.add(key) + } + return atom((get) => { const message = get(messageAtomFamily(messageId!)) const parts = message?.parts || [] @@ -439,6 +492,7 @@ type TokenData = { cacheWriteTokens: number reasoningTokens: number totalTokens: number + totalCostUsd: number messageCount: number // Track last message's output tokens to detect when streaming completes lastMsgOutputTokens: number @@ -474,6 +528,7 @@ export const messageTokenDataAtom = atom((get) => { let cacheReadTokens = 0 let cacheWriteTokens = 0 let reasoningTokens = 0 + let totalCostUsd = 0 for (const id of ids) { const msg = get(messageAtomFamily(id)) @@ -483,6 +538,7 @@ export const messageTokenDataAtom = atom((get) => { if (metadata) { inputTokens += metadata.inputTokens || 0 outputTokens += metadata.outputTokens || 0 + totalCostUsd += metadata.totalCostUsd || 0 // These fields are not in current MessageMetadata but kept for future compatibility cacheReadTokens += metadata.cacheReadInputTokens || 0 cacheWriteTokens += metadata.cacheCreationInputTokens || 0 @@ -497,6 +553,7 @@ export const messageTokenDataAtom = atom((get) => { cacheWriteTokens, reasoningTokens, totalTokens: inputTokens + outputTokens, + totalCostUsd, messageCount: ids.length, lastMsgOutputTokens, } @@ -581,6 +638,9 @@ export const syncMessagesWithStatusAtom = atom( } const currentSubChatId = subChatId ?? prevSubChatId + // Track this subChat in LRU list and evict old caches if needed + touchSubChat(currentSubChatId) + // Update status only if changed const prevStatus = get(chatStatusAtom) if (status !== prevStatus) { @@ -622,24 +682,58 @@ export const syncMessagesWithStatusAtom = atom( set(messageRolesAtom, newRoles) } + // ======================================================================== + // OPTIMIZED MESSAGE SYNC - O(1) during streaming, O(n) only when needed + // ======================================================================== + // Key insight: During streaming, only the LAST message changes. + // We only need to check all messages when: + // 1. IDs changed (new message added/removed) + // 2. First sync for this subChat (atoms not populated) + // + // During streaming (same IDs, atoms populated), only check the last message. + // This reduces the loop from O(n) to O(1) for streaming updates. + // ======================================================================== + + const isStreaming = status === "streaming" || status === "submitted" + const previousIds = activeMessageIdsByChat.get(currentSubChatId) + const isFirstSync = !previousIds || previousIds.size === 0 + + // Determine which messages to check + let messagesToCheck: Message[] + if (isFirstSync || idsChanged) { + // Full sync needed - check all messages + messagesToCheck = messages + } else if (isStreaming && messages.length > 0) { + // Streaming optimization - only check last message + // The last message is the only one that changes during streaming + messagesToCheck = [messages[messages.length - 1]!] + } else { + // Not streaming and no ID changes - likely status change only + // Still check last message for metadata updates (token counts, etc.) + messagesToCheck = messages.length > 0 ? [messages[messages.length - 1]!] : [] + } + // Update individual message atoms ONLY if they changed - // This is the key optimization - only changed messages trigger re-renders // CRITICAL: AI SDK mutates objects in-place, so we MUST create a new reference // for Jotai to detect the change (it uses Object.is() for comparison) - // We need to deep clone the message because: - // 1. msg object itself is mutated in-place - // 2. msg.parts array is mutated in-place - // 3. Individual part objects inside parts are mutated in-place - for (const msg of messages) { + for (const msg of messagesToCheck) { const currentAtomValue = get(messageAtomFamily(msg.id)) const msgChanged = hasMessageChanged(currentSubChatId, msg.id, msg) // CRITICAL FIX: Also update if atom is null (not yet populated) if (msgChanged || !currentAtomValue) { // Deep clone message with new parts array and new part objects + // Clone all nested objects (input, output, result, error) to ensure + // Jotai detects changes and components re-render properly const clonedMsg = { ...msg, - parts: msg.parts?.map((part: any) => ({ ...part, input: part.input ? { ...part.input } : undefined })), + parts: msg.parts?.map((part: any) => ({ + ...part, + input: part.input ? { ...part.input } : undefined, + output: part.output ? { ...part.output } : undefined, + result: part.result ? { ...part.result } : undefined, + error: part.error ? { ...part.error } : undefined, + })), } set(messageAtomFamily(msg.id), clonedMsg) } @@ -647,14 +741,24 @@ export const syncMessagesWithStatusAtom = atom( // Cleanup removed message atoms to prevent memory leaks const newIdsSet = new Set(newIds) - const previousIds = activeMessageIdsByChat.get(currentSubChatId) ?? new Set() + const prevIdsForCleanup = activeMessageIdsByChat.get(currentSubChatId) ?? new Set() - for (const oldId of previousIds) { + for (const oldId of prevIdsForCleanup) { if (!newIdsSet.has(oldId)) { // Message was removed - cleanup its atom and caches messageAtomFamily.remove(oldId) previousMessageState.delete(`${currentSubChatId}:${oldId}`) assistantIdsCacheByChat.delete(`${currentSubChatId}:${oldId}`) + // Clean up text part cache entries for this message (O(1) using tracked keys) + const textKeys = textPartKeysByMessage.get(oldId) + if (textKeys) { + for (const key of textKeys) { + textPartCache.delete(key) + } + textPartKeysByMessage.delete(oldId) + } + // Clean up message structure cache + messageStructureCache.delete(oldId) } } @@ -662,7 +766,7 @@ export const syncMessagesWithStatusAtom = atom( activeMessageIdsByChat.set(currentSubChatId, newIdsSet) // Update streaming message ID - if (status === "streaming" || status === "submitted") { + if (isStreaming) { const lastId = newIds[newIds.length - 1] ?? null set(streamingMessageIdAtom, lastId) } else { @@ -685,18 +789,30 @@ export const syncMessagesAtom = atom( // Clear all caches for a specific subChat (call when unmounting/switching) export function clearSubChatCaches(subChatId: string) { - // Clear message atoms + // Clear message atoms and related caches const activeIds = activeMessageIdsByChat.get(subChatId) if (activeIds) { for (const id of activeIds) { messageAtomFamily.remove(id) previousMessageState.delete(`${subChatId}:${id}`) assistantIdsCacheByChat.delete(`${subChatId}:${id}`) + + // Clear text part cache entries for this message (O(1) using tracked keys) + const textKeys = textPartKeysByMessage.get(id) + if (textKeys) { + for (const key of textKeys) { + textPartCache.delete(key) + } + textPartKeysByMessage.delete(id) + } + + // Clear message structure cache + messageStructureCache.delete(id) } activeMessageIdsByChat.delete(subChatId) } - // Clear other caches + // Clear other caches keyed by subChatId userMessageIdsCacheByChat.delete(subChatId) messageGroupsCacheByChat.delete(subChatId) lastAssistantCacheByChat.delete(subChatId)