Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions src/renderer/features/agents/main/active-chat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ import {
} from "../search"
import { agentChatStore } from "../stores/agent-chat-store"
import { EMPTY_QUEUE, useMessageQueueStore } from "../stores/message-queue-store"
import { clearSubChatCaches, isRollingBackAtom, rollbackHandlerAtom, syncMessagesWithStatusAtom } from "../stores/message-store"
import { clearSubChatCaches, isRollingBackAtom, messageTokenDataAtom, rollbackHandlerAtom, syncMessagesWithStatusAtom } from "../stores/message-store"
import { useStreamingStatusStore } from "../stores/streaming-status-store"
import {
useAgentSubChatStore,
Expand Down Expand Up @@ -2572,21 +2572,16 @@ const ChatViewInner = memo(function ChatViewInner({
[messages],
)

// Pre-compute token data for ChatInputArea to avoid passing unstable messages array
// This prevents ChatInputArea from re-rendering on every streaming chunk
const messageTokenData = useMemo(() => {
let totalInputTokens = 0
let totalOutputTokens = 0
let totalCostUsd = 0
for (const msg of messages) {
if (msg.metadata) {
totalInputTokens += msg.metadata.inputTokens || 0
totalOutputTokens += msg.metadata.outputTokens || 0
totalCostUsd += msg.metadata.totalCostUsd || 0
}
}
return { totalInputTokens, totalOutputTokens, totalCostUsd, messageCount: messages.length }
}, [messages])
// Token data from centralized store - uses cached O(1) lookup during streaming
// Only recalculates when message count changes or last message's metadata updates
const storeTokenData = useAtomValue(messageTokenDataAtom)
// Map to expected field names for ChatInputArea
const messageTokenData = useMemo(() => ({
totalInputTokens: storeTokenData.inputTokens,
totalOutputTokens: storeTokenData.outputTokens,
totalCostUsd: storeTokenData.totalCostUsd,
messageCount: storeTokenData.messageCount,
}), [storeTokenData.inputTokens, storeTokenData.outputTokens, storeTokenData.totalCostUsd, storeTokenData.messageCount])

// Track previous streaming state to detect stream stop
const prevIsStreamingRef = useRef(isStreaming)
Expand Down
24 changes: 24 additions & 0 deletions src/renderer/features/agents/stores/agent-chat-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,31 @@ export const agentChatStore = {

has: (id: string) => chats.has(id),

/**
* Abort any active stream for a sub-chat.
* Should be called before delete() to ensure proper cleanup.
*/
abort: (id: string) => {
const chat = chats.get(id)
if (chat) {
try {
// Chat.stop() aborts the current stream
chat.stop()
} catch (e) {
// Ignore errors during abort - chat may already be stopped
console.debug(`[agentChatStore] Error stopping chat ${id}:`, e)
}
}
manuallyAborted.set(id, true)
},

/**
* Delete a chat instance and all associated data.
* IMPORTANT: Call abort() first if the chat may be streaming.
*/
delete: (id: string) => {
// Abort first to ensure stream is stopped
agentChatStore.abort(id)
chats.delete(id)
streamIds.delete(id)
parentChatIds.delete(id)
Expand Down
140 changes: 128 additions & 12 deletions src/renderer/features/agents/stores/message-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import { atom } from "jotai"
import { atomFamily } from "jotai/utils"
import { agentChatStore } from "./agent-chat-store"

// Types
export interface MessagePart {
Expand Down Expand Up @@ -51,6 +52,48 @@ export const messageAtomFamily = atomFamily((_messageId: string) =>
// Track active message IDs per subChat for cleanup
const activeMessageIdsByChat = new Map<string, Set<string>>()

// Track text part cache keys per message for O(1) cleanup
const textPartKeysByMessage = new Map<string, Set<string>>()

// ============================================================================
// LRU CACHE EVICTION - Prevents memory leaks from accumulated subChat caches
// ============================================================================
// Uses Map's insertion order for O(1) LRU operations.
// Delete + set moves item to end; first key is oldest.
// ============================================================================
const MAX_CACHED_SUBCHATS = 10
const subChatLRU = new Map<string, true>()

// Track the current (most recent) subChat to avoid unnecessary LRU updates
let currentLRUSubChat: string | null = null

function touchSubChat(subChatId: string) {
// Skip if this is already the most recent subChat (common case during streaming)
if (subChatId === currentLRUSubChat) {
return
}
currentLRUSubChat = subChatId

// Delete and re-add to move to end (most recently used)
// This is O(1) because Map maintains insertion order
if (subChatLRU.has(subChatId)) {
subChatLRU.delete(subChatId)
}
subChatLRU.set(subChatId, true)

// Evict oldest subchats if we exceed the threshold
while (subChatLRU.size > MAX_CACHED_SUBCHATS) {
const oldestSubChatId = subChatLRU.keys().next().value
if (oldestSubChatId) {
subChatLRU.delete(oldestSubChatId)
// CRITICAL: Abort any active stream before clearing caches
// This prevents background streams from continuing to consume resources
agentChatStore.abort(oldestSubChatId)
clearSubChatCaches(oldestSubChatId)
}
}
}

// Ordered list of message IDs (for rendering order)
export const messageIdsAtom = atom<string[]>([])

Expand Down Expand Up @@ -114,6 +157,16 @@ export const textPartAtomFamily = atomFamily((key: string) => {
const [messageId, partIndexStr] = key.split(":")
const partIndex = parseInt(partIndexStr!, 10)

// Track this key for O(1) cleanup when message is removed
if (messageId) {
let keys = textPartKeysByMessage.get(messageId)
if (!keys) {
keys = new Set()
textPartKeysByMessage.set(messageId, keys)
}
keys.add(key)
}

return atom((get) => {
const message = get(messageAtomFamily(messageId!))
const parts = message?.parts || []
Expand Down Expand Up @@ -439,6 +492,7 @@ type TokenData = {
cacheWriteTokens: number
reasoningTokens: number
totalTokens: number
totalCostUsd: number
messageCount: number
// Track last message's output tokens to detect when streaming completes
lastMsgOutputTokens: number
Expand Down Expand Up @@ -474,6 +528,7 @@ export const messageTokenDataAtom = atom((get) => {
let cacheReadTokens = 0
let cacheWriteTokens = 0
let reasoningTokens = 0
let totalCostUsd = 0

for (const id of ids) {
const msg = get(messageAtomFamily(id))
Expand All @@ -483,6 +538,7 @@ export const messageTokenDataAtom = atom((get) => {
if (metadata) {
inputTokens += metadata.inputTokens || 0
outputTokens += metadata.outputTokens || 0
totalCostUsd += metadata.totalCostUsd || 0
// These fields are not in current MessageMetadata but kept for future compatibility
cacheReadTokens += metadata.cacheReadInputTokens || 0
cacheWriteTokens += metadata.cacheCreationInputTokens || 0
Expand All @@ -497,6 +553,7 @@ export const messageTokenDataAtom = atom((get) => {
cacheWriteTokens,
reasoningTokens,
totalTokens: inputTokens + outputTokens,
totalCostUsd,
messageCount: ids.length,
lastMsgOutputTokens,
}
Expand Down Expand Up @@ -581,6 +638,9 @@ export const syncMessagesWithStatusAtom = atom(
}
const currentSubChatId = subChatId ?? prevSubChatId

// Track this subChat in LRU list and evict old caches if needed
touchSubChat(currentSubChatId)

// Update status only if changed
const prevStatus = get(chatStatusAtom)
if (status !== prevStatus) {
Expand Down Expand Up @@ -622,47 +682,91 @@ export const syncMessagesWithStatusAtom = atom(
set(messageRolesAtom, newRoles)
}

// ========================================================================
// OPTIMIZED MESSAGE SYNC - O(1) during streaming, O(n) only when needed
// ========================================================================
// Key insight: During streaming, only the LAST message changes.
// We only need to check all messages when:
// 1. IDs changed (new message added/removed)
// 2. First sync for this subChat (atoms not populated)
//
// During streaming (same IDs, atoms populated), only check the last message.
// This reduces the loop from O(n) to O(1) for streaming updates.
// ========================================================================

const isStreaming = status === "streaming" || status === "submitted"
const previousIds = activeMessageIdsByChat.get(currentSubChatId)
const isFirstSync = !previousIds || previousIds.size === 0

// Determine which messages to check
let messagesToCheck: Message[]
if (isFirstSync || idsChanged) {
// Full sync needed - check all messages
messagesToCheck = messages
} else if (isStreaming && messages.length > 0) {
// Streaming optimization - only check last message
// The last message is the only one that changes during streaming
messagesToCheck = [messages[messages.length - 1]!]
} else {
// Not streaming and no ID changes - likely status change only
// Still check last message for metadata updates (token counts, etc.)
messagesToCheck = messages.length > 0 ? [messages[messages.length - 1]!] : []
}

// Update individual message atoms ONLY if they changed
// This is the key optimization - only changed messages trigger re-renders
// CRITICAL: AI SDK mutates objects in-place, so we MUST create a new reference
// for Jotai to detect the change (it uses Object.is() for comparison)
// We need to deep clone the message because:
// 1. msg object itself is mutated in-place
// 2. msg.parts array is mutated in-place
// 3. Individual part objects inside parts are mutated in-place
for (const msg of messages) {
for (const msg of messagesToCheck) {
const currentAtomValue = get(messageAtomFamily(msg.id))
const msgChanged = hasMessageChanged(currentSubChatId, msg.id, msg)

// CRITICAL FIX: Also update if atom is null (not yet populated)
if (msgChanged || !currentAtomValue) {
// Deep clone message with new parts array and new part objects
// Clone all nested objects (input, output, result, error) to ensure
// Jotai detects changes and components re-render properly
const clonedMsg = {
...msg,
parts: msg.parts?.map((part: any) => ({ ...part, input: part.input ? { ...part.input } : undefined })),
parts: msg.parts?.map((part: any) => ({
...part,
input: part.input ? { ...part.input } : undefined,
output: part.output ? { ...part.output } : undefined,
result: part.result ? { ...part.result } : undefined,
error: part.error ? { ...part.error } : undefined,
})),
}
set(messageAtomFamily(msg.id), clonedMsg)
}
}

// Cleanup removed message atoms to prevent memory leaks
const newIdsSet = new Set(newIds)
const previousIds = activeMessageIdsByChat.get(currentSubChatId) ?? new Set()
const prevIdsForCleanup = activeMessageIdsByChat.get(currentSubChatId) ?? new Set()

for (const oldId of previousIds) {
for (const oldId of prevIdsForCleanup) {
if (!newIdsSet.has(oldId)) {
// Message was removed - cleanup its atom and caches
messageAtomFamily.remove(oldId)
previousMessageState.delete(`${currentSubChatId}:${oldId}`)
assistantIdsCacheByChat.delete(`${currentSubChatId}:${oldId}`)
// Clean up text part cache entries for this message (O(1) using tracked keys)
const textKeys = textPartKeysByMessage.get(oldId)
if (textKeys) {
for (const key of textKeys) {
textPartCache.delete(key)
}
textPartKeysByMessage.delete(oldId)
}
// Clean up message structure cache
messageStructureCache.delete(oldId)
}
}

// Update active IDs tracking
activeMessageIdsByChat.set(currentSubChatId, newIdsSet)

// Update streaming message ID
if (status === "streaming" || status === "submitted") {
if (isStreaming) {
const lastId = newIds[newIds.length - 1] ?? null
set(streamingMessageIdAtom, lastId)
} else {
Expand All @@ -685,18 +789,30 @@ export const syncMessagesAtom = atom(

// Clear all caches for a specific subChat (call when unmounting/switching)
export function clearSubChatCaches(subChatId: string) {
// Clear message atoms
// Clear message atoms and related caches
const activeIds = activeMessageIdsByChat.get(subChatId)
if (activeIds) {
for (const id of activeIds) {
messageAtomFamily.remove(id)
previousMessageState.delete(`${subChatId}:${id}`)
assistantIdsCacheByChat.delete(`${subChatId}:${id}`)

// Clear text part cache entries for this message (O(1) using tracked keys)
const textKeys = textPartKeysByMessage.get(id)
if (textKeys) {
for (const key of textKeys) {
textPartCache.delete(key)
}
textPartKeysByMessage.delete(id)
}

// Clear message structure cache
messageStructureCache.delete(id)
}
activeMessageIdsByChat.delete(subChatId)
}

// Clear other caches
// Clear other caches keyed by subChatId
userMessageIdsCacheByChat.delete(subChatId)
messageGroupsCacheByChat.delete(subChatId)
lastAssistantCacheByChat.delete(subChatId)
Expand Down