Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .beads/issues.jsonl
Empty file.
43 changes: 33 additions & 10 deletions src/api/middleware/load-shedding.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class LoadShedder {
constructor(options = {}) {
// Thresholds
this.memoryThreshold = options.memoryThreshold || 0.85; // 85%
this.heapThreshold = options.heapThreshold || 0.90; // 90%
this.heapThreshold = options.heapThreshold || 0.95; // 95% (increased from 90% to prevent false positives from temporary allocation spikes)
this.activeRequestsThreshold = options.activeRequestsThreshold || 1000;

// State
Expand Down Expand Up @@ -96,7 +96,7 @@ function getLoadShedder(options) {
if (!instance) {
// Read from environment variables if not provided
const defaultOptions = {
heapThreshold: Number.parseFloat(process.env.LOAD_SHEDDING_HEAP_THRESHOLD || "0.90"),
heapThreshold: Number.parseFloat(process.env.LOAD_SHEDDING_HEAP_THRESHOLD || "0.95"),
memoryThreshold: Number.parseFloat(process.env.LOAD_SHEDDING_MEMORY_THRESHOLD || "0.85"),
activeRequestsThreshold: Number.parseInt(
process.env.LOAD_SHEDDING_ACTIVE_REQUESTS_THRESHOLD || "1000",
Expand All @@ -108,6 +108,26 @@ function getLoadShedder(options) {
return instance;
}

/**
* Initialize load shedder and log configuration
* Call this at server startup to ensure configuration is logged
*/
function initializeLoadShedder(options) {
const shedder = getLoadShedder(options);

// Log configuration
logger.info({
enabled: true,
thresholds: {
heapThreshold: (shedder.heapThreshold * 100).toFixed(2),
memoryThreshold: (shedder.memoryThreshold * 100).toFixed(2),
activeRequestsThreshold: shedder.activeRequestsThreshold,
}
}, "Load shedding initialized");

return shedder;
}

/**
* Load shedding middleware
*/
Expand All @@ -132,22 +152,25 @@ function loadSheddingMiddleware(req, res, next) {
// Track active request
shedder.activeRequests++;

// Decrement on response finish
res.on("finish", () => {
shedder.activeRequests--;
});

res.on("close", () => {
if (shedder.activeRequests > 0) {
// Use flag to prevent double-decrement race condition
let decremented = false;
const decrementOnce = () => {
if (!decremented) {
decremented = true;
shedder.activeRequests--;
}
});
};

// Both events might fire, but only decrement once
res.on("finish", decrementOnce);
res.on("close", decrementOnce);

next();
}

module.exports = {
LoadShedder,
getLoadShedder,
initializeLoadShedder,
loadSheddingMiddleware,
};
35 changes: 30 additions & 5 deletions src/api/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,24 @@ router.post("/v1/messages", rateLimiter, async (req, res, next) => {
// Parse SSE stream from provider and forward to client
const reader = result.stream.getReader();
const decoder = new TextDecoder();
let buffer = '';
const bufferChunks = []; // Use array to avoid string concatenation overhead

try {
while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const chunk = decoder.decode(value, { stream: true });
bufferChunks.push(chunk);

// Join buffer and split by lines
const buffer = bufferChunks.join('');
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer

// Keep last incomplete line in buffer chunks
const remaining = lines.pop() || '';
bufferChunks.length = 0;
if (remaining) bufferChunks.push(remaining);

for (const line of lines) {
if (line.trim()) {
Expand All @@ -162,21 +170,38 @@ router.post("/v1/messages", rateLimiter, async (req, res, next) => {
}

// Send any remaining buffer
if (buffer.trim()) {
res.write(buffer + '\n');
const remaining = bufferChunks.join('');
if (remaining.trim()) {
res.write(remaining + '\n');
}

metrics.recordResponse(200);
res.end();
return;
} catch (streamError) {
logger.error({ error: streamError }, "Error streaming response");

// Cancel stream on error
try {
await reader.cancel();
} catch (cancelError) {
logger.debug({ error: cancelError }, "Failed to cancel stream");
}

if (!res.headersSent) {
res.status(500).json({ error: "Streaming error" });
} else {
res.end();
}
return;
} finally {
// CRITICAL: Always release lock
try {
reader.releaseLock();
} catch (releaseError) {
// Lock may already be released, ignore
logger.debug({ error: releaseError }, "Stream lock already released");
}
}
}

Expand Down
36 changes: 36 additions & 0 deletions src/cache/prompt.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,17 @@ class PromptCache {
this.ttlMs =
Number.isInteger(options.ttlMs) && options.ttlMs > 0 ? options.ttlMs : 300000;

// Add pruning interval (default: 5 minutes)
this.pruneIntervalMs =
Number.isInteger(options.pruneIntervalMs) && options.pruneIntervalMs > 0
? options.pruneIntervalMs
: 300000;
this.pruneTimer = null;

// Initialize persistent cache database
if (this.enabled) {
this.initDatabase();
this.startPruning();
}
}

Expand Down Expand Up @@ -325,10 +333,38 @@ class PromptCache {
}
}

startPruning() {
if (this.pruneTimer) return; // Already started

this.pruneTimer = setInterval(() => {
try {
this.pruneExpired();
logger.debug("Prompt cache pruning completed");
} catch (error) {
logger.warn({ error }, "Failed to prune cache in background");
}
}, this.pruneIntervalMs);

// Don't prevent process exit
this.pruneTimer.unref();

logger.info({ intervalMs: this.pruneIntervalMs }, "Prompt cache pruning started");
}

stopPruning() {
if (this.pruneTimer) {
clearInterval(this.pruneTimer);
this.pruneTimer = null;
logger.debug("Prompt cache pruning stopped");
}
}

// Cleanup method
close() {
this.stopPruning(); // Stop pruning first
if (this.db) {
try {
this.pruneExpired(); // Final cleanup
this.db.close();
} catch (error) {
logger.warn({ err: error }, "Failed to close cache database");
Expand Down
18 changes: 18 additions & 0 deletions src/clients/databricks.js
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,24 @@ function estimateCostSavings(inputTokens, outputTokens) {
return inputCost + outputCost;
}

/**
* Destroy HTTP agents (for graceful shutdown)
*/
function destroyHttpAgents() {
try {
if (httpAgent) {
httpAgent.destroy();
}
if (httpsAgent) {
httpsAgent.destroy();
}
logger.info("HTTP agents destroyed");
} catch (error) {
logger.warn({ error }, "Failed to destroy HTTP agents");
}
}

module.exports = {
invokeModel,
destroyHttpAgents,
};
2 changes: 1 addition & 1 deletion src/context/budget.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ function enforceBudget(payload, options = {}) {
};
}

// Clone payload to avoid modifying original
// Clone payload only when compression is needed (avoids unnecessary allocation)
let optimized = JSON.parse(JSON.stringify(payload));
let strategy = [];

Expand Down
68 changes: 65 additions & 3 deletions src/context/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ function compressHistory(messages, options = {}) {

const finalMessages = [...compressed, ...recentCompressed];

// Log compression stats
const originalLength = JSON.stringify(messages).length;
const compressedLength = JSON.stringify(finalMessages).length;
// Log compression stats - estimate sizes without expensive JSON.stringify
const originalLength = estimateMessagesSize(messages);
const compressedLength = estimateMessagesSize(finalMessages);
const saved = originalLength - compressedLength;

if (saved > 1000) {
Expand Down Expand Up @@ -387,6 +387,68 @@ function needsCompression(messages, threshold = 15) {
return messages && messages.length > threshold;
}

/**
* Estimate size of messages array without full JSON serialization
*
* Provides a rough size estimation that's much faster than JSON.stringify
* while being accurate enough for compression statistics.
*
* @param {Array} messages - Messages to estimate
* @returns {number} Estimated size in characters
*/
function estimateMessagesSize(messages) {
if (!messages || !Array.isArray(messages)) return 0;

let totalSize = 0;

for (const msg of messages) {
// Base overhead for message structure
totalSize += 50;

// Role field
if (msg.role) totalSize += msg.role.length;

// Content estimation
if (typeof msg.content === 'string') {
totalSize += msg.content.length;
} else if (Array.isArray(msg.content)) {
for (const block of msg.content) {
totalSize += 20; // Block overhead

if (block.type) totalSize += block.type.length;

if (block.text) {
totalSize += block.text.length;
} else if (block.content) {
if (typeof block.content === 'string') {
totalSize += block.content.length;
} else if (Array.isArray(block.content)) {
for (const item of block.content) {
if (typeof item === 'string') {
totalSize += item.length;
} else if (item.text) {
totalSize += item.text.length;
}
}
}
}

// Tool use fields
if (block.name) totalSize += block.name.length;
if (block.id) totalSize += block.id.length;
if (block.tool_use_id) totalSize += block.tool_use_id.length;

// Input estimation (rough)
if (block.input) {
totalSize += JSON.stringify(block.input).length;
}
}
}
}

return totalSize;
}

module.exports = {
compressHistory,
compressMessage,
Expand Down
6 changes: 5 additions & 1 deletion src/memory/extractor.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ async function extractMemories(assistantResponse, conversationMessages, context

for (const entityName of entities) {
// Track entity
store.trackEntity('code', entityName, { source: 'extraction' });
store.trackEntity({
type: 'code',
name: entityName,
context: { source: 'extraction' }
});

const memory = await createMemoryWithSurprise({
content: `Entity: ${entityName}`,
Expand Down
14 changes: 9 additions & 5 deletions src/memory/search.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,20 @@ function prepareFTS5Query(query) {
// Step 3: Check if query contains FTS5 operators (AND, OR, NOT)
const hasFTS5Operators = /\b(AND|OR|NOT)\b/i.test(cleaned);

// Step 4: Remove or escape remaining FTS5 special characters
// Characters: * ( ) < > - : [ ]
// Strategy: Remove them since they're rarely useful in memory search
cleaned = cleaned.replace(/[*()<>\-:\[\]]/g, ' ');
// Step 4: ENHANCED - Remove ALL special characters that could break FTS5
// Keep only: letters, numbers, spaces
// Remove: * ( ) < > - : [ ] | , + = ? ! ; / \ @ # $ % ^ & { }
cleaned = cleaned.replace(/[*()<>\-:\[\]|,+=?!;\/\\@#$%^&{}]/g, ' ');
cleaned = cleaned.replace(/\s+/g, ' ').trim();

// Step 5: Escape double quotes (FTS5 uses "" for literal quote)
cleaned = cleaned.replace(/"/g, '""');

// Step 6: Wrap in quotes for phrase search (safest approach)
// Step 6: Additional safety - remove any remaining non-alphanumeric except spaces
cleaned = cleaned.replace(/[^\w\s""]/g, ' ');
cleaned = cleaned.replace(/\s+/g, ' ').trim();

// Step 7: Wrap in quotes for phrase search (safest approach)
if (!hasFTS5Operators) {
// Treat as literal phrase search
cleaned = `"${cleaned}"`;
Expand Down
Loading
Loading