diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl new file mode 100644 index 0000000..e69de29 diff --git a/src/api/middleware/load-shedding.js b/src/api/middleware/load-shedding.js index bc7b75b..2c1901a 100644 --- a/src/api/middleware/load-shedding.js +++ b/src/api/middleware/load-shedding.js @@ -15,7 +15,7 @@ class LoadShedder { constructor(options = {}) { // Thresholds this.memoryThreshold = options.memoryThreshold || 0.85; // 85% - this.heapThreshold = options.heapThreshold || 0.90; // 90% + this.heapThreshold = options.heapThreshold || 0.95; // 95% (increased from 90% to prevent false positives from temporary allocation spikes) this.activeRequestsThreshold = options.activeRequestsThreshold || 1000; // State @@ -96,7 +96,7 @@ function getLoadShedder(options) { if (!instance) { // Read from environment variables if not provided const defaultOptions = { - heapThreshold: Number.parseFloat(process.env.LOAD_SHEDDING_HEAP_THRESHOLD || "0.90"), + heapThreshold: Number.parseFloat(process.env.LOAD_SHEDDING_HEAP_THRESHOLD || "0.95"), memoryThreshold: Number.parseFloat(process.env.LOAD_SHEDDING_MEMORY_THRESHOLD || "0.85"), activeRequestsThreshold: Number.parseInt( process.env.LOAD_SHEDDING_ACTIVE_REQUESTS_THRESHOLD || "1000", @@ -108,6 +108,26 @@ function getLoadShedder(options) { return instance; } +/** + * Initialize load shedder and log configuration + * Call this at server startup to ensure configuration is logged + */ +function initializeLoadShedder(options) { + const shedder = getLoadShedder(options); + + // Log configuration + logger.info({ + enabled: true, + thresholds: { + heapThreshold: (shedder.heapThreshold * 100).toFixed(2), + memoryThreshold: (shedder.memoryThreshold * 100).toFixed(2), + activeRequestsThreshold: shedder.activeRequestsThreshold, + } + }, "Load shedding initialized"); + + return shedder; +} + /** * Load shedding middleware */ @@ -132,16 +152,18 @@ function loadSheddingMiddleware(req, res, next) { // Track active request shedder.activeRequests++; - // Decrement on response finish - res.on("finish", () => { - shedder.activeRequests--; - }); - - res.on("close", () => { - if (shedder.activeRequests > 0) { + // Use flag to prevent double-decrement race condition + let decremented = false; + const decrementOnce = () => { + if (!decremented) { + decremented = true; shedder.activeRequests--; } - }); + }; + + // Both events might fire, but only decrement once + res.on("finish", decrementOnce); + res.on("close", decrementOnce); next(); } @@ -149,5 +171,6 @@ function loadSheddingMiddleware(req, res, next) { module.exports = { LoadShedder, getLoadShedder, + initializeLoadShedder, loadSheddingMiddleware, }; diff --git a/src/api/router.js b/src/api/router.js index 7e8fb93..c7304f6 100644 --- a/src/api/router.js +++ b/src/api/router.js @@ -138,16 +138,24 @@ router.post("/v1/messages", rateLimiter, async (req, res, next) => { // Parse SSE stream from provider and forward to client const reader = result.stream.getReader(); const decoder = new TextDecoder(); - let buffer = ''; + const bufferChunks = []; // Use array to avoid string concatenation overhead try { while (true) { const { done, value } = await reader.read(); if (done) break; - buffer += decoder.decode(value, { stream: true }); + const chunk = decoder.decode(value, { stream: true }); + bufferChunks.push(chunk); + + // Join buffer and split by lines + const buffer = bufferChunks.join(''); const lines = buffer.split('\n'); - buffer = lines.pop() || ''; // Keep incomplete line in buffer + + // Keep last incomplete line in buffer chunks + const remaining = lines.pop() || ''; + bufferChunks.length = 0; + if (remaining) bufferChunks.push(remaining); for (const line of lines) { if (line.trim()) { @@ -162,8 +170,9 @@ router.post("/v1/messages", rateLimiter, async (req, res, next) => { } // Send any remaining buffer - if (buffer.trim()) { - res.write(buffer + '\n'); + const remaining = bufferChunks.join(''); + if (remaining.trim()) { + res.write(remaining + '\n'); } metrics.recordResponse(200); @@ -171,12 +180,28 @@ router.post("/v1/messages", rateLimiter, async (req, res, next) => { return; } catch (streamError) { logger.error({ error: streamError }, "Error streaming response"); + + // Cancel stream on error + try { + await reader.cancel(); + } catch (cancelError) { + logger.debug({ error: cancelError }, "Failed to cancel stream"); + } + if (!res.headersSent) { res.status(500).json({ error: "Streaming error" }); } else { res.end(); } return; + } finally { + // CRITICAL: Always release lock + try { + reader.releaseLock(); + } catch (releaseError) { + // Lock may already be released, ignore + logger.debug({ error: releaseError }, "Stream lock already released"); + } } } diff --git a/src/cache/prompt.js b/src/cache/prompt.js index 65df13a..4fb8e72 100644 --- a/src/cache/prompt.js +++ b/src/cache/prompt.js @@ -40,9 +40,17 @@ class PromptCache { this.ttlMs = Number.isInteger(options.ttlMs) && options.ttlMs > 0 ? options.ttlMs : 300000; + // Add pruning interval (default: 5 minutes) + this.pruneIntervalMs = + Number.isInteger(options.pruneIntervalMs) && options.pruneIntervalMs > 0 + ? options.pruneIntervalMs + : 300000; + this.pruneTimer = null; + // Initialize persistent cache database if (this.enabled) { this.initDatabase(); + this.startPruning(); } } @@ -325,10 +333,38 @@ class PromptCache { } } + startPruning() { + if (this.pruneTimer) return; // Already started + + this.pruneTimer = setInterval(() => { + try { + this.pruneExpired(); + logger.debug("Prompt cache pruning completed"); + } catch (error) { + logger.warn({ error }, "Failed to prune cache in background"); + } + }, this.pruneIntervalMs); + + // Don't prevent process exit + this.pruneTimer.unref(); + + logger.info({ intervalMs: this.pruneIntervalMs }, "Prompt cache pruning started"); + } + + stopPruning() { + if (this.pruneTimer) { + clearInterval(this.pruneTimer); + this.pruneTimer = null; + logger.debug("Prompt cache pruning stopped"); + } + } + // Cleanup method close() { + this.stopPruning(); // Stop pruning first if (this.db) { try { + this.pruneExpired(); // Final cleanup this.db.close(); } catch (error) { logger.warn({ err: error }, "Failed to close cache database"); diff --git a/src/clients/databricks.js b/src/clients/databricks.js index 82daecc..a29d795 100644 --- a/src/clients/databricks.js +++ b/src/clients/databricks.js @@ -1123,6 +1123,24 @@ function estimateCostSavings(inputTokens, outputTokens) { return inputCost + outputCost; } +/** + * Destroy HTTP agents (for graceful shutdown) + */ +function destroyHttpAgents() { + try { + if (httpAgent) { + httpAgent.destroy(); + } + if (httpsAgent) { + httpsAgent.destroy(); + } + logger.info("HTTP agents destroyed"); + } catch (error) { + logger.warn({ error }, "Failed to destroy HTTP agents"); + } +} + module.exports = { invokeModel, + destroyHttpAgents, }; diff --git a/src/context/budget.js b/src/context/budget.js index 77e1c61..8702996 100644 --- a/src/context/budget.js +++ b/src/context/budget.js @@ -75,7 +75,7 @@ function enforceBudget(payload, options = {}) { }; } - // Clone payload to avoid modifying original + // Clone payload only when compression is needed (avoids unnecessary allocation) let optimized = JSON.parse(JSON.stringify(payload)); let strategy = []; diff --git a/src/context/compression.js b/src/context/compression.js index 60fa7b5..518aaba 100644 --- a/src/context/compression.js +++ b/src/context/compression.js @@ -67,9 +67,9 @@ function compressHistory(messages, options = {}) { const finalMessages = [...compressed, ...recentCompressed]; - // Log compression stats - const originalLength = JSON.stringify(messages).length; - const compressedLength = JSON.stringify(finalMessages).length; + // Log compression stats - estimate sizes without expensive JSON.stringify + const originalLength = estimateMessagesSize(messages); + const compressedLength = estimateMessagesSize(finalMessages); const saved = originalLength - compressedLength; if (saved > 1000) { @@ -387,6 +387,68 @@ function needsCompression(messages, threshold = 15) { return messages && messages.length > threshold; } +/** + * Estimate size of messages array without full JSON serialization + * + * Provides a rough size estimation that's much faster than JSON.stringify + * while being accurate enough for compression statistics. + * + * @param {Array} messages - Messages to estimate + * @returns {number} Estimated size in characters + */ +function estimateMessagesSize(messages) { + if (!messages || !Array.isArray(messages)) return 0; + + let totalSize = 0; + + for (const msg of messages) { + // Base overhead for message structure + totalSize += 50; + + // Role field + if (msg.role) totalSize += msg.role.length; + + // Content estimation + if (typeof msg.content === 'string') { + totalSize += msg.content.length; + } else if (Array.isArray(msg.content)) { + for (const block of msg.content) { + totalSize += 20; // Block overhead + + if (block.type) totalSize += block.type.length; + + if (block.text) { + totalSize += block.text.length; + } else if (block.content) { + if (typeof block.content === 'string') { + totalSize += block.content.length; + } else if (Array.isArray(block.content)) { + for (const item of block.content) { + if (typeof item === 'string') { + totalSize += item.length; + } else if (item.text) { + totalSize += item.text.length; + } + } + } + } + + // Tool use fields + if (block.name) totalSize += block.name.length; + if (block.id) totalSize += block.id.length; + if (block.tool_use_id) totalSize += block.tool_use_id.length; + + // Input estimation (rough) + if (block.input) { + totalSize += JSON.stringify(block.input).length; + } + } + } + } + + return totalSize; +} + module.exports = { compressHistory, compressMessage, diff --git a/src/memory/extractor.js b/src/memory/extractor.js index c675fde..2c7e8b7 100644 --- a/src/memory/extractor.js +++ b/src/memory/extractor.js @@ -102,7 +102,11 @@ async function extractMemories(assistantResponse, conversationMessages, context for (const entityName of entities) { // Track entity - store.trackEntity('code', entityName, { source: 'extraction' }); + store.trackEntity({ + type: 'code', + name: entityName, + context: { source: 'extraction' } + }); const memory = await createMemoryWithSurprise({ content: `Entity: ${entityName}`, diff --git a/src/memory/search.js b/src/memory/search.js index 8d40bd9..42a6ee0 100644 --- a/src/memory/search.js +++ b/src/memory/search.js @@ -113,16 +113,20 @@ function prepareFTS5Query(query) { // Step 3: Check if query contains FTS5 operators (AND, OR, NOT) const hasFTS5Operators = /\b(AND|OR|NOT)\b/i.test(cleaned); - // Step 4: Remove or escape remaining FTS5 special characters - // Characters: * ( ) < > - : [ ] - // Strategy: Remove them since they're rarely useful in memory search - cleaned = cleaned.replace(/[*()<>\-:\[\]]/g, ' '); + // Step 4: ENHANCED - Remove ALL special characters that could break FTS5 + // Keep only: letters, numbers, spaces + // Remove: * ( ) < > - : [ ] | , + = ? ! ; / \ @ # $ % ^ & { } + cleaned = cleaned.replace(/[*()<>\-:\[\]|,+=?!;\/\\@#$%^&{}]/g, ' '); cleaned = cleaned.replace(/\s+/g, ' ').trim(); // Step 5: Escape double quotes (FTS5 uses "" for literal quote) cleaned = cleaned.replace(/"/g, '""'); - // Step 6: Wrap in quotes for phrase search (safest approach) + // Step 6: Additional safety - remove any remaining non-alphanumeric except spaces + cleaned = cleaned.replace(/[^\w\s""]/g, ' '); + cleaned = cleaned.replace(/\s+/g, ' ').trim(); + + // Step 7: Wrap in quotes for phrase search (safest approach) if (!hasFTS5Operators) { // Treat as literal phrase search cleaned = `"${cleaned}"`; diff --git a/src/observability/metrics.js b/src/observability/metrics.js index 2dece6f..c36645c 100644 --- a/src/observability/metrics.js +++ b/src/observability/metrics.js @@ -56,8 +56,18 @@ class MetricsCollector { // Histogram buckets for latency (in ms) this.latencyBuckets = [10, 50, 100, 200, 500, 1000, 2000, 5000, 10000]; - // Performance: Pre-allocate latency buffer - this.maxLatencyBuffer = 10000; + // Performance: Circular buffer for latency (reduced from 10000 to 1000) + this.maxLatencyBuffer = 1000; + } + + /** + * Add value to circular buffer (prevents unbounded growth) + */ + addToBuffer(buffer, value, maxSize) { + buffer.push(value); + if (buffer.length > maxSize) { + buffer.shift(); // Remove oldest entry + } } /** @@ -79,10 +89,8 @@ class MetricsCollector { const endpointCount = this.requestsByEndpoint.get(endpoint) || 0; this.requestsByEndpoint.set(endpoint, endpointCount + 1); - // Record latency (with buffer limit for memory) - if (this.requestLatencies.length < this.maxLatencyBuffer) { - this.requestLatencies.push(durationMs); - } + // Record latency with circular buffer + this.addToBuffer(this.requestLatencies, durationMs, this.maxLatencyBuffer); } /** @@ -141,8 +149,8 @@ class MetricsCollector { const count = this.providerSuccesses.get(provider) || 0; this.providerSuccesses.set(provider, count + 1); - if (provider === "ollama" && this.ollamaLatencies.length < 10000) { - this.ollamaLatencies.push(latencyMs); + if (provider === "ollama") { + this.addToBuffer(this.ollamaLatencies, latencyMs, this.maxLatencyBuffer); } } @@ -168,9 +176,7 @@ class MetricsCollector { */ recordFallbackSuccess(latencyMs) { this.fallbackSuccesses++; - if (this.fallbackLatencies.length < 10000) { - this.fallbackLatencies.push(latencyMs); - } + this.addToBuffer(this.fallbackLatencies, latencyMs, this.maxLatencyBuffer); } /** diff --git a/src/orchestrator/index.js b/src/orchestrator/index.js index ddbc873..8ac6639 100644 --- a/src/orchestrator/index.js +++ b/src/orchestrator/index.js @@ -2604,8 +2604,8 @@ async function processMessage({ payload, headers, session, options = {} }) { let cacheKey = null; let cachedResponse = null; if (promptCache.isEnabled()) { - const cacheSeedPayload = JSON.parse(JSON.stringify(cleanPayload)); - const { key, entry } = promptCache.lookup(cacheSeedPayload); + // cleanPayload is already a deep clone from sanitizePayload, no need to clone again + const { key, entry } = promptCache.lookup(cleanPayload); cacheKey = key; if (entry?.value) { try { diff --git a/src/server.js b/src/server.js index b5cb777..78e944b 100644 --- a/src/server.js +++ b/src/server.js @@ -8,7 +8,7 @@ const { budgetMiddleware } = require("./api/middleware/budget"); const { metricsMiddleware } = require("./api/middleware/metrics"); const { requestLoggingMiddleware } = require("./api/middleware/request-logging"); const { errorHandlingMiddleware, notFoundHandler } = require("./api/middleware/error-handling"); -const { loadSheddingMiddleware } = require("./api/middleware/load-shedding"); +const { loadSheddingMiddleware, initializeLoadShedder } = require("./api/middleware/load-shedding"); const { livenessCheck, readinessCheck } = require("./api/health"); const { getMetricsCollector } = require("./observability/metrics"); const { getShutdownManager } = require("./server/shutdown"); @@ -44,6 +44,9 @@ registerAgentTaskTool(); function createApp() { const app = express(); + // Initialize load shedder (log configuration) + initializeLoadShedder(); + // Load shedding (protect against overload) app.use(loadSheddingMiddleware); @@ -100,6 +103,12 @@ function createApp() { res.json(registry.getAll()); }); + app.get("/metrics/load-shedding", (req, res) => { + const { getLoadShedder } = require("./api/middleware/load-shedding"); + const shedder = getLoadShedder(); + res.json(shedder.getMetrics()); + }); + app.use(router); // 404 handler (must be after all routes) @@ -117,6 +126,11 @@ function start() { console.log(`Claude→Databricks proxy listening on http://localhost:${config.port}`); }); + // Start session cleanup manager + const { getSessionCleanupManager } = require("./sessions/cleanup"); + const sessionCleanup = getSessionCleanupManager(); + sessionCleanup.start(); + // Setup graceful shutdown const shutdownManager = getShutdownManager(); shutdownManager.registerServer(server); diff --git a/src/server/shutdown.js b/src/server/shutdown.js index 1e2440f..e617cc0 100644 --- a/src/server/shutdown.js +++ b/src/server/shutdown.js @@ -107,8 +107,31 @@ class ShutdownManager { } } - // Step 3: Close database connections - logger.info("Step 3: Closing database connections"); + // Step 3: Stop background tasks + logger.info("Step 3: Stopping background tasks"); + try { + const { getSessionCleanupManager } = require("../sessions/cleanup"); + const sessionCleanup = getSessionCleanupManager(); + if (sessionCleanup) { + sessionCleanup.stop(); + } + } catch (err) { + logger.warn({ err }, "Error stopping session cleanup"); + } + + // Step 4: Close cache databases + logger.info("Step 4: Closing cache databases"); + try { + const promptCache = require("../cache/prompt"); + if (promptCache && typeof promptCache.close === 'function') { + promptCache.close(); + } + } catch (err) { + logger.warn({ err }, "Error closing prompt cache"); + } + + // Step 5: Close database connections + logger.info("Step 5: Closing database connections"); try { const budgetManager = getBudgetManager(); if (budgetManager) { @@ -118,8 +141,19 @@ class ShutdownManager { logger.warn({ err }, "Error closing budget manager"); } - // Step 4: Final cleanup - logger.info("Step 4: Final cleanup"); + // Step 6: Destroy HTTP agents + logger.info("Step 6: Destroying HTTP agents"); + try { + const databricks = require("../clients/databricks"); + if (databricks && typeof databricks.destroyHttpAgents === 'function') { + databricks.destroyHttpAgents(); + } + } catch (err) { + logger.warn({ err }, "Error destroying HTTP agents"); + } + + // Step 7: Final cleanup + logger.info("Step 7: Final cleanup"); clearTimeout(forceTimer); const duration = Date.now() - startTime; diff --git a/src/sessions/cleanup.js b/src/sessions/cleanup.js new file mode 100644 index 0000000..50f0b65 --- /dev/null +++ b/src/sessions/cleanup.js @@ -0,0 +1,53 @@ +const logger = require("../logger"); +const { cleanupOldSessions, cleanupOldHistory } = require("./store"); + +class SessionCleanupManager { + constructor(options = {}) { + this.enabled = options.enabled !== false; + this.intervalMs = options.intervalMs || 3600000; // 1 hour + this.sessionMaxAgeMs = options.sessionMaxAgeMs || 7 * 24 * 60 * 60 * 1000; // 7 days + this.historyMaxAgeMs = options.historyMaxAgeMs || 30 * 24 * 60 * 60 * 1000; // 30 days + this.timer = null; + } + + start() { + if (!this.enabled || this.timer) return; + + this.runCleanup(); // Run immediately + + this.timer = setInterval(() => this.runCleanup(), this.intervalMs); + this.timer.unref(); + + logger.info({ + intervalMs: this.intervalMs, + sessionMaxAgeMs: this.sessionMaxAgeMs + }, "Session cleanup started"); + } + + runCleanup() { + try { + const sessionsDeleted = cleanupOldSessions(this.sessionMaxAgeMs); + const historyDeleted = cleanupOldHistory(this.historyMaxAgeMs); + logger.info({ sessionsDeleted, historyDeleted }, "Session cleanup completed"); + } catch (error) { + logger.error({ error }, "Session cleanup failed"); + } + } + + stop() { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + logger.info("Session cleanup stopped"); + } + } +} + +let instance = null; + +function getSessionCleanupManager(options) { + if (!instance) instance = new SessionCleanupManager(options); + return instance; +} + +module.exports = { SessionCleanupManager, getSessionCleanupManager }; diff --git a/src/sessions/store.js b/src/sessions/store.js index e112d13..fc0bd4f 100644 --- a/src/sessions/store.js +++ b/src/sessions/store.js @@ -25,6 +25,14 @@ const insertHistoryStmt = db.prepare( `INSERT INTO session_history (session_id, role, type, status, content, metadata, timestamp) VALUES (@session_id, @role, @type, @status, @content, @metadata, @timestamp)`, ); +const cleanupOldSessionsStmt = db.prepare(` + DELETE FROM sessions + WHERE updated_at < ? +`); +const cleanupOldHistoryStmt = db.prepare(` + DELETE FROM session_history + WHERE timestamp < ? +`); function parseJSON(value, fallback) { if (value === null || value === undefined) return fallback; @@ -170,10 +178,26 @@ function deleteSession(sessionId) { deleteHistory(sessionId); } +function cleanupOldSessions(maxAgeMs = 7 * 24 * 60 * 60 * 1000) { + const cutoffTime = Date.now() - maxAgeMs; + const result = cleanupOldSessionsStmt.run(cutoffTime); + logger.info({ deleted: result.changes, maxAgeMs }, "Cleaned up old sessions"); + return result.changes; +} + +function cleanupOldHistory(maxAgeMs = 30 * 24 * 60 * 60 * 1000) { + const cutoffTime = Date.now() - maxAgeMs; + const result = cleanupOldHistoryStmt.run(cutoffTime); + logger.info({ deleted: result.changes, maxAgeMs }, "Cleaned up old history"); + return result.changes; +} + module.exports = { getSession, getOrCreateSession, upsertSession, appendSessionTurn, deleteSession, + cleanupOldSessions, + cleanupOldHistory, };