diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index dd70158d38..0020ab00cd 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -496,7 +496,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } const encoder = new TextEncoder() - let executorInstance: any = null + const abortController = new AbortController() let isStreamClosed = false const stream = new ReadableStream({ @@ -688,11 +688,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: onBlockStart, onBlockComplete, onStream, - onExecutorCreated: (executor) => { - executorInstance = executor - }, }, loggingSession, + abortSignal: abortController.signal, }) if (result.status === 'paused') { @@ -769,11 +767,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, cancel() { isStreamClosed = true - logger.info(`[${requestId}] Client aborted SSE stream, cancelling executor`) - - if (executorInstance && typeof executorInstance.cancel === 'function') { - executorInstance.cancel() - } + logger.info( + `[${requestId}] Client aborted SSE stream, signalling cancellation via AbortController` + ) + abortController.abort() }, }) diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index bf33df5961..b7b11e3b5e 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -39,7 +39,7 @@ export class ExecutionEngine { this.initializeQueue(triggerBlockId) while (this.hasWork()) { - if (this.context.isCancelled && this.executing.size === 0) { + if (this.context.abortSignal?.aborted && this.executing.size === 0) { break } await this.processQueue() @@ -54,7 +54,7 @@ export class ExecutionEngine { this.context.metadata.endTime = new Date(endTime).toISOString() this.context.metadata.duration = endTime - startTime - if (this.context.isCancelled) { + if (this.context.abortSignal?.aborted) { return { success: false, output: this.finalOutput, @@ -75,7 +75,7 @@ export class ExecutionEngine { this.context.metadata.endTime = new Date(endTime).toISOString() this.context.metadata.duration = endTime - startTime - if (this.context.isCancelled) { + if (this.context.abortSignal?.aborted) { return { success: false, output: this.finalOutput, @@ -234,7 +234,7 @@ export class ExecutionEngine { private async processQueue(): Promise { while (this.readyQueue.length > 0) { - if (this.context.isCancelled) { + if (this.context.abortSignal?.aborted) { break } const nodeId = this.dequeue() diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index 2f6e21573a..4f13bb53c7 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -37,7 +37,6 @@ export class DAGExecutor { private workflowInput: WorkflowInput private workflowVariables: Record private contextExtensions: ContextExtensions - private isCancelled = false private dagBuilder: DAGBuilder constructor(options: DAGExecutorOptions) { @@ -54,13 +53,6 @@ export class DAGExecutor { const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges) const { context, state } = this.createExecutionContext(workflowId, triggerBlockId) - // Link cancellation flag to context - Object.defineProperty(context, 'isCancelled', { - get: () => this.isCancelled, - enumerable: true, - configurable: true, - }) - const resolver = new VariableResolver(this.workflow, this.workflowVariables, state) const loopOrchestrator = new LoopOrchestrator(dag, state, resolver) loopOrchestrator.setContextExtensions(this.contextExtensions) @@ -82,10 +74,6 @@ export class DAGExecutor { return await engine.run(triggerBlockId) } - cancel(): void { - this.isCancelled = true - } - async continueExecution( _pendingBlocks: string[], context: ExecutionContext @@ -180,6 +168,7 @@ export class DAGExecutor { onStream: this.contextExtensions.onStream, onBlockStart: this.contextExtensions.onBlockStart, onBlockComplete: this.contextExtensions.onBlockComplete, + abortSignal: this.contextExtensions.abortSignal, } if (this.contextExtensions.resumeFromSnapshot) { diff --git a/apps/sim/executor/execution/snapshot.ts b/apps/sim/executor/execution/snapshot.ts index 60ffdbd484..dfa0d1cc37 100644 --- a/apps/sim/executor/execution/snapshot.ts +++ b/apps/sim/executor/execution/snapshot.ts @@ -34,7 +34,6 @@ export interface ExecutionCallbacks { blockType: string, output: any ) => Promise - onExecutorCreated?: (executor: any) => void } export interface SerializableExecutionState { diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index 041efa6e4a..0c5ac50e31 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -22,6 +22,11 @@ export interface ContextExtensions { dagIncomingEdges?: Record snapshotState?: SerializableExecutionState metadata?: ExecutionMetadata + /** + * AbortSignal for cancellation support. + * When aborted, the execution should stop gracefully. + */ + abortSignal?: AbortSignal onStream?: (streamingExecution: unknown) => Promise onBlockStart?: ( blockId: string, diff --git a/apps/sim/executor/handlers/wait/wait-handler.ts b/apps/sim/executor/handlers/wait/wait-handler.ts index 5b7545a904..2151590f36 100644 --- a/apps/sim/executor/handlers/wait/wait-handler.ts +++ b/apps/sim/executor/handlers/wait/wait-handler.ts @@ -1,37 +1,37 @@ -import { createLogger } from '@/lib/logs/console/logger' import { BlockType } from '@/executor/constants' import type { BlockHandler, ExecutionContext } from '@/executor/types' import type { SerializedBlock } from '@/serializer/types' -const logger = createLogger('WaitBlockHandler') - /** - * Helper function to sleep for a specified number of milliseconds - * On client-side: checks for cancellation every 100ms (non-blocking for UI) - * On server-side: simple sleep without polling (server execution can't be cancelled mid-flight) + * Helper function to sleep for a specified number of milliseconds with AbortSignal support. + * The sleep will be cancelled immediately when the AbortSignal is aborted. */ -const sleep = async (ms: number, checkCancelled?: () => boolean): Promise => { - const isClientSide = typeof window !== 'undefined' - - if (!isClientSide) { - await new Promise((resolve) => setTimeout(resolve, ms)) - return true +const sleep = async (ms: number, signal?: AbortSignal): Promise => { + if (signal?.aborted) { + return false } - const chunkMs = 100 - let elapsed = 0 + return new Promise((resolve) => { + let timeoutId: NodeJS.Timeout | undefined - while (elapsed < ms) { - if (checkCancelled?.()) { - return false + const onAbort = () => { + if (timeoutId) { + clearTimeout(timeoutId) + } + resolve(false) } - const sleepTime = Math.min(chunkMs, ms - elapsed) - await new Promise((resolve) => setTimeout(resolve, sleepTime)) - elapsed += sleepTime - } + if (signal) { + signal.addEventListener('abort', onAbort, { once: true }) + } - return true + timeoutId = setTimeout(() => { + if (signal) { + signal.removeEventListener('abort', onAbort) + } + resolve(true) + }, ms) + }) } /** @@ -65,11 +65,7 @@ export class WaitBlockHandler implements BlockHandler { throw new Error(`Wait time exceeds maximum of ${maxDisplay}`) } - const checkCancelled = () => { - return (ctx as any).isCancelled === true - } - - const completed = await sleep(waitMs, checkCancelled) + const completed = await sleep(waitMs, ctx.abortSignal) if (!completed) { return { diff --git a/apps/sim/executor/orchestrators/loop.ts b/apps/sim/executor/orchestrators/loop.ts index 2817214702..f8da0d1a2d 100644 --- a/apps/sim/executor/orchestrators/loop.ts +++ b/apps/sim/executor/orchestrators/loop.ts @@ -229,7 +229,7 @@ export class LoopOrchestrator { } } - if (ctx.isCancelled) { + if (ctx.abortSignal?.aborted) { logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration }) return this.createExitResult(ctx, loopId, scope) } diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index cdfdd2478b..f33b49195f 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -222,8 +222,12 @@ export interface ExecutionContext { output: any ) => Promise - // Cancellation support - isCancelled?: boolean + /** + * AbortSignal for cancellation support. + * When the signal is aborted, execution should stop gracefully. + * This is triggered when the SSE client disconnects. + */ + abortSignal?: AbortSignal // Dynamically added nodes that need to be scheduled (e.g., from parallel expansion) pendingDynamicNodes?: string[] diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 26673e831b..79b5c6d504 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -32,6 +32,11 @@ export interface ExecuteWorkflowCoreOptions { callbacks: ExecutionCallbacks loggingSession: LoggingSession skipLogCreation?: boolean // For resume executions - reuse existing log entry + /** + * AbortSignal for cancellation support. + * When aborted (e.g., client disconnects from SSE), execution stops gracefully. + */ + abortSignal?: AbortSignal } function parseVariableValueByType(value: any, type: string): any { @@ -98,11 +103,11 @@ function parseVariableValueByType(value: any, type: string): any { export async function executeWorkflowCore( options: ExecuteWorkflowCoreOptions ): Promise { - const { snapshot, callbacks, loggingSession, skipLogCreation } = options + const { snapshot, callbacks, loggingSession, skipLogCreation, abortSignal } = options const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } = metadata - const { onBlockStart, onBlockComplete, onStream, onExecutorCreated } = callbacks + const { onBlockStart, onBlockComplete, onStream } = callbacks const providedWorkspaceId = metadata.workspaceId if (!providedWorkspaceId) { @@ -326,6 +331,7 @@ export async function executeWorkflowCore( dagIncomingEdges: snapshot.state?.dagIncomingEdges, snapshotState: snapshot.state, metadata, + abortSignal, } const executorInstance = new Executor({ @@ -349,10 +355,6 @@ export async function executeWorkflowCore( } } - if (onExecutorCreated) { - onExecutorCreated(executorInstance) - } - const result = (await executorInstance.execute( workflowId, resolvedTriggerBlockId