diff --git a/workspaces/server/src/websocket/commands/remove.ts b/workspaces/server/src/websocket/commands/remove.ts index 28eb7a80..b8fc84d2 100644 --- a/workspaces/server/src/websocket/commands/remove.ts +++ b/workspaces/server/src/websocket/commands/remove.ts @@ -2,107 +2,129 @@ import type { PayloadsList } from "@nodesecure/cache"; // Import Internal Dependencies +import { context } from "../websocket.als.js"; import type { - WebSocketContext, - WebSocketResponse + WebSocketResponse, + WebSocketContext } from "../websocket.types.js"; export async function* remove( - spec: string, - context: WebSocketContext + spec: string ): AsyncGenerator { - const { cache, logger } = context; + const ctx = context.getStore()!; - const { mru, lru, current, lastUsed, root, availables } = await cache.payloadsList(); - delete lastUsed[spec]; - if (availables.includes(spec)) { - logger.info("[ws|command.remove] remove from availables"); - cache.removePayload(spec); - const updatedList: PayloadsList = { - mru, - current, - lru, - lastUsed: { - ...lastUsed - }, - root, - availables: availables.filter((iterSpec) => iterSpec !== spec) - }; - await cache.updatePayloadsList(updatedList); - - yield { - status: "RELOAD", - cache: updatedList - }; - - return; + const cacheList = await ctx.cache.payloadsList(); + let updatedList: PayloadsList; + if (cacheList.availables.includes(spec)) { + updatedList = await removeFromAvailables(spec, ctx, cacheList); + } + else { + updatedList = await removeFromMruOrLru(spec, ctx, cacheList); } - logger.debug(`[ws|command.remove](lru: ${lru}|current: ${current})`); + yield { + status: "RELOAD", + cache: updatedList + }; +} + +async function removeFromAvailables( + spec: string, + context: WebSocketContext, + cacheList: PayloadsList +): Promise { + const { cache, logger } = context; + const { availables, lastUsed, ...rest } = cacheList; + logger.info("[ws|command.remove] remove from availables"); + const { [spec]: _, ...updatedLastUsed } = lastUsed; + const updatedList: PayloadsList = { + ...rest, + lastUsed: updatedLastUsed, + availables: availables.filter((iterSpec) => iterSpec !== spec) + }; + + await cache.updatePayloadsList(updatedList); + cache.removePayload(spec); + + return updatedList; +} + +async function removeFromMruOrLru( + spec: string, + context: WebSocketContext, + cacheList: PayloadsList +): Promise { + const { logger, cache } = context; + const { mru, lru, current } = cacheList; + + logger.debug(`[ws|command.remove](lru: ${lru}|current: ${current})`); if (mru.length === 1 && lru.length === 0) { throw new Error("Cannot remove the last package."); } const mruIndex = mru.findIndex((iterSpec) => iterSpec === spec); const lruIndex = lru.findIndex((iterSpec) => iterSpec === spec); - if (mruIndex === -1 && lruIndex === -1) { throw new Error("Package not found in cache."); } - if (mruIndex > -1) { - logger.info("[ws|command.remove] removed from mru"); - const updatedMru = mru.filter((iterSpec) => iterSpec !== spec); - if (lru.length > 0) { - // We need to move the first lru package to the mru list - const olderLruPkg = lru.sort((a, b) => { - const aDate = lastUsed[a]; - const bDate = lastUsed[b]; - - return aDate - bDate; - }); - updatedMru.push(olderLruPkg[0]); - lru.splice(lru.indexOf(olderLruPkg[0]), 1); - } - - const updatedList: PayloadsList = { - mru: updatedMru, - lru, - lastUsed: { - ...lastUsed - }, - current: current === spec ? updatedMru[0] : current, - root, - availables - }; - await cache.updatePayloadsList(updatedList); - - yield { - status: "RELOAD", - cache: updatedList - }; - } - else { - logger.info("[ws|command.remove] removed from lru"); - const updatedLru = lru.filter((iterSpec) => iterSpec !== spec); - const updatedList: PayloadsList = { - mru, - lru: updatedLru, - availables, - lastUsed: { - ...lastUsed - }, - current, - root - }; - await cache.updatePayloadsList(updatedList); - - yield { - status: "RELOAD", - cache: updatedList - }; - } + const isInMru = mruIndex > -1; + logger.info(`[ws|command.remove] removing from ${isInMru ? "MRU" : "LRU"}`); + const updatedList = isInMru ? + removeFromMru(spec, cacheList) : + removeFromLru(spec, cacheList); + await cache.updatePayloadsList(updatedList); cache.removePayload(spec); + + return updatedList; +} + +function removeFromMru( + spec: string, + cacheList: PayloadsList +): PayloadsList { + const { mru, lru, current, lastUsed, root, availables } = cacheList; + + const updatedMru = mru.filter((iterSpec) => iterSpec !== spec); + let updatedLru = lru; + + if (lru.length > 0) { + const sortedLru = [...lru].sort((a, b) => lastUsed[a] - lastUsed[b]); + const olderLruPkg = sortedLru[0]; + updatedMru.push(olderLruPkg); + updatedLru = lru.filter((iterSpec) => iterSpec !== olderLruPkg); + } + + const { [spec]: _, ...updatedLastUsed } = lastUsed; + const updatedList: PayloadsList = { + mru: updatedMru, + lru: updatedLru, + lastUsed: updatedLastUsed, + current: current === spec ? updatedMru[0] : current, + root, + availables + }; + + return updatedList; +} + +function removeFromLru( + spec: string, + cacheList: PayloadsList +): PayloadsList { + const { mru, lru, current, lastUsed, root, availables } = cacheList; + + const { [spec]: _, ...updatedLastUsed } = lastUsed; + const updatedList: PayloadsList = { + mru, + lru: lru.filter((iterSpec) => iterSpec !== spec), + availables, + lastUsed: updatedLastUsed, + current, + root + }; + + return updatedList; } diff --git a/workspaces/server/src/websocket/commands/search.ts b/workspaces/server/src/websocket/commands/search.ts index 8dbec753..3296a205 100644 --- a/workspaces/server/src/websocket/commands/search.ts +++ b/workspaces/server/src/websocket/commands/search.ts @@ -1,110 +1,146 @@ // Import Third-party Dependencies import * as scanner from "@nodesecure/scanner"; -import type { PayloadsList } from "@nodesecure/cache"; +import type { PayloadsList, appCache } from "@nodesecure/cache"; // Import Internal Dependencies +import { context } from "../websocket.als.js"; import type { - WebSocketContext, WebSocketResponse } from "../websocket.types.js"; export async function* search( - spec: string, - context: WebSocketContext + spec: string ): AsyncGenerator { - const { logger, cache } = context; - - const cachedPayload = cache.getPayloadOrNull(spec); - if (cachedPayload) { - logger.info("[ws|command.search] one entry found in cache"); - const cacheList = await cache.payloadsList(); - if (cacheList.mru.includes(spec)) { - logger.info("[ws|command.search] payload is already in the MRU"); - const updatedList: PayloadsList = { - ...cacheList, - current: spec, - lastUsed: { ...cacheList.lastUsed, [spec]: Date.now() } - }; - await cache.updatePayloadsList(updatedList); - yield { - status: "PAYLOAD" as const, - payload: cachedPayload - }; - - if (cache.startFromZero) { - yield { - status: "RELOAD" as const, - cache: updatedList - }; - cache.startFromZero = false; - } - - return; - } - - const { mru, lru, availables, lastUsed, ...updatedCache } = await cache.removeLastMRU(); - const updatedList: PayloadsList = { - ...updatedCache, - mru: [...new Set([...mru, spec])], - current: spec, - lru: lru.filter((pckg) => pckg !== spec), - availables: availables.filter((pckg) => pckg !== spec), - lastUsed: { ...lastUsed, [spec]: Date.now() } - }; - await cache.updatePayloadsList(updatedList); + const foundInCache = yield* searchInCache(spec); + if (foundInCache) { + return; + } - yield { - status: "PAYLOAD" as const, - payload: cachedPayload - }; - yield { - status: "RELOAD" as const, - cache: updatedList - }; + const { logger } = context.getStore()!; + logger.info("[ws|command.search] scan starting"); + yield { + status: "SCAN" as const, + spec + }; - cache.startFromZero = false; + const payload = await scanner.from( + spec, + { maxDepth: 4 } + ); + logger.info("[ws|command.search] scan completed"); - return; + yield* saveInCache(payload); +} + +async function* searchInCache( + spec: string +): AsyncGenerator { + const { logger, cache } = context.getStore()!; + + const payload = cache.getPayloadOrNull(spec); + if (!payload) { + return false; } - // at this point we don't have the payload in cache so we have to scan it. - logger.info(`[ws|command.search](scan ${spec} in progress)`); - yield { status: "SCAN" as const, spec }; + logger.info("[ws|command.search] fetching cache list"); + const cacheList = await cache.payloadsList(); - const payload = await scanner.from(spec, { maxDepth: 4 }); - const name = payload.rootDependencyName; - const version = Object.keys(payload.dependencies[name].versions)[0]; + const isInMru = cacheList.mru.includes(spec); + logger.info(`[ws|command.search] payload detected in ${isInMru ? "MRU" : "LRU/Availables"}`); - { - // save the payload in cache - const inScanPackageSpec = `${name}@${version}`; - logger.info(`[ws|command.search](scan ${inScanPackageSpec} done|cache: updated)`); - - // update the payloads list - const { mru, lru, availables, lastUsed, ...appCache } = await cache.removeLastMRU(); - mru.push(inScanPackageSpec); - cache.updatePayload(inScanPackageSpec, payload); - const updatedList: PayloadsList = { - ...appCache, - mru: [...new Set(mru)], - lru, - availables, - lastUsed: { ...lastUsed, [inScanPackageSpec]: Date.now() }, - current: inScanPackageSpec - }; - await cache.updatePayloadsList(updatedList); + let cachePayloadList: PayloadsList; + if (isInMru) { + cachePayloadList = await handleMruCache(spec, cache, cacheList); + } + else { + cachePayloadList = await handleLruOrAvailableCache(spec, cache); + } - yield { - status: "PAYLOAD" as const, - payload - }; + yield { + status: "PAYLOAD" as const, + payload + }; + if (!isInMru || cache.startFromZero) { yield { status: "RELOAD" as const, - cache: updatedList + cache: cachePayloadList }; + } - cache.startFromZero = false; + return true; +} - logger.info("[ws|command.search](data sent to client|cache: updated)"); - } +async function handleMruCache( + spec: string, + cache: typeof appCache, + cacheList: PayloadsList +): Promise { + const updatedList: PayloadsList = { + ...cacheList, + current: spec, + lastUsed: { ...cacheList.lastUsed, [spec]: Date.now() } + }; + + await cache.updatePayloadsList(updatedList); + + return updatedList; +} + +async function handleLruOrAvailableCache( + spec: string, + cache: typeof appCache +): Promise { + const { + mru, lru, availables, lastUsed, + ...updatedCache + } = await cache.removeLastMRU(); + const updatedList: PayloadsList = { + ...updatedCache, + mru: [...new Set([...mru, spec])], + current: spec, + lru: lru.filter((pckg) => pckg !== spec), + availables: availables.filter((pckg) => pckg !== spec), + lastUsed: { ...lastUsed, [spec]: Date.now() } + }; + + await cache.updatePayloadsList(updatedList); + cache.startFromZero = false; + + return updatedList; +} + +async function* saveInCache( + payload: scanner.Payload +): AsyncGenerator { + const { logger, cache } = context.getStore()!; + + const name = payload.rootDependencyName; + const version = Object.keys(payload.dependencies[name].versions)[0]; + const spec = `${name}@${version}`; + + const { mru, lru, availables, lastUsed, ...appCache } = await cache.removeLastMRU(); + mru.push(spec); + cache.updatePayload(spec, payload); + const updatedList: PayloadsList = { + ...appCache, + mru: [...new Set(mru)], + lru, + availables, + lastUsed: { ...lastUsed, [spec]: Date.now() }, + current: spec + }; + await cache.updatePayloadsList(updatedList); + + yield { + status: "PAYLOAD" as const, + payload + }; + yield { + status: "RELOAD" as const, + cache: updatedList + }; + + cache.startFromZero = false; + + logger.info("[ws|command.search] cache updated"); } diff --git a/workspaces/server/src/websocket/index.ts b/workspaces/server/src/websocket/index.ts index b45b2e7c..bef4ad27 100644 --- a/workspaces/server/src/websocket/index.ts +++ b/workspaces/server/src/websocket/index.ts @@ -7,6 +7,7 @@ import { appCache } from "@nodesecure/cache"; import { logger } from "../logger.js"; import { search } from "./commands/search.js"; import { remove } from "./commands/remove.js"; +import { context } from "./websocket.als.js"; import type { WebSocketResponse, WebSocketContext, @@ -43,22 +44,24 @@ export class WebSocketServerInstanciator { const commandName = message.commandName; logger.info(`[ws|command.${commandName.toLowerCase()}] ${message.spec}`); - try { - const socketMessages = match(message) - .with({ commandName: "SEARCH" }, (command) => search(command.spec, ctx)) - .with({ commandName: "REMOVE" }, (command) => remove(command.spec, ctx)) - .exhaustive(); + context.run(ctx, async() => { + try { + const socketMessages = match(message) + .with({ commandName: "SEARCH" }, (command) => search(command.spec)) + .with({ commandName: "REMOVE" }, (command) => remove(command.spec)) + .exhaustive(); - for await (const message of socketMessages) { - sendSocketResponse(socket, message); + for await (const message of socketMessages) { + sendSocketResponse(socket, message); + } } - } - catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); + catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); - logger.error(`[ws|command.${commandName}](error: ${errorMessage})`); - logger.debug(error); - } + logger.error(`[ws|command.${commandName}](error: ${errorMessage})`); + logger.debug(error); + } + }); } async initializeServer( diff --git a/workspaces/server/src/websocket/websocket.als.ts b/workspaces/server/src/websocket/websocket.als.ts new file mode 100644 index 00000000..ad00d7ee --- /dev/null +++ b/workspaces/server/src/websocket/websocket.als.ts @@ -0,0 +1,9 @@ +// Import Node.js Dependencies +import { AsyncLocalStorage } from "node:async_hooks"; + +// Import Internal Dependencies +import type { + WebSocketContext +} from "./websocket.types.js"; + +export const context = new AsyncLocalStorage();