Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}

const encoder = new TextEncoder()
let executorInstance: any = null
const abortController = new AbortController()
let isStreamClosed = false

const stream = new ReadableStream<Uint8Array>({
Expand Down Expand Up @@ -688,11 +688,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
onBlockStart,
onBlockComplete,
onStream,
onExecutorCreated: (executor) => {
executorInstance = executor
},
},
loggingSession,
abortSignal: abortController.signal,
})

if (result.status === 'paused') {
Expand Down Expand Up @@ -769,11 +767,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client aborted SSE stream, cancelling executor`)

if (executorInstance && typeof executorInstance.cancel === 'function') {
executorInstance.cancel()
}
logger.info(
`[${requestId}] Client aborted SSE stream, signalling cancellation via AbortController`
)
abortController.abort()
},
})

Expand Down
8 changes: 4 additions & 4 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)

while (this.hasWork()) {
if (this.context.isCancelled && this.executing.size === 0) {
if (this.context.abortSignal?.aborted && this.executing.size === 0) {
break
}
await this.processQueue()
Expand All @@ -54,7 +54,7 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime

if (this.context.isCancelled) {
if (this.context.abortSignal?.aborted) {
return {
success: false,
output: this.finalOutput,
Expand All @@ -75,7 +75,7 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime

if (this.context.isCancelled) {
if (this.context.abortSignal?.aborted) {
return {
success: false,
output: this.finalOutput,
Expand Down Expand Up @@ -234,7 +234,7 @@ export class ExecutionEngine {

private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if (this.context.isCancelled) {
if (this.context.abortSignal?.aborted) {
break
}
const nodeId = this.dequeue()
Expand Down
13 changes: 1 addition & 12 deletions apps/sim/executor/execution/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ export class DAGExecutor {
private workflowInput: WorkflowInput
private workflowVariables: Record<string, unknown>
private contextExtensions: ContextExtensions
private isCancelled = false
private dagBuilder: DAGBuilder

constructor(options: DAGExecutorOptions) {
Expand All @@ -54,13 +53,6 @@ export class DAGExecutor {
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)

// Link cancellation flag to context
Object.defineProperty(context, 'isCancelled', {
get: () => this.isCancelled,
enumerable: true,
configurable: true,
})

const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
loopOrchestrator.setContextExtensions(this.contextExtensions)
Expand All @@ -82,10 +74,6 @@ export class DAGExecutor {
return await engine.run(triggerBlockId)
}

cancel(): void {
this.isCancelled = true
}

async continueExecution(
_pendingBlocks: string[],
context: ExecutionContext
Expand Down Expand Up @@ -180,6 +168,7 @@ export class DAGExecutor {
onStream: this.contextExtensions.onStream,
onBlockStart: this.contextExtensions.onBlockStart,
onBlockComplete: this.contextExtensions.onBlockComplete,
abortSignal: this.contextExtensions.abortSignal,
}

if (this.contextExtensions.resumeFromSnapshot) {
Expand Down
1 change: 0 additions & 1 deletion apps/sim/executor/execution/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export interface ExecutionCallbacks {
blockType: string,
output: any
) => Promise<void>
onExecutorCreated?: (executor: any) => void
}

export interface SerializableExecutionState {
Expand Down
5 changes: 5 additions & 0 deletions apps/sim/executor/execution/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ export interface ContextExtensions {
dagIncomingEdges?: Record<string, string[]>
snapshotState?: SerializableExecutionState
metadata?: ExecutionMetadata
/**
* AbortSignal for cancellation support.
* When aborted, the execution should stop gracefully.
*/
abortSignal?: AbortSignal
onStream?: (streamingExecution: unknown) => Promise<void>
onBlockStart?: (
blockId: string,
Expand Down
50 changes: 23 additions & 27 deletions apps/sim/executor/handlers/wait/wait-handler.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,37 @@
import { createLogger } from '@/lib/logs/console/logger'
import { BlockType } from '@/executor/constants'
import type { BlockHandler, ExecutionContext } from '@/executor/types'
import type { SerializedBlock } from '@/serializer/types'

const logger = createLogger('WaitBlockHandler')

/**
* Helper function to sleep for a specified number of milliseconds
* On client-side: checks for cancellation every 100ms (non-blocking for UI)
* On server-side: simple sleep without polling (server execution can't be cancelled mid-flight)
* Helper function to sleep for a specified number of milliseconds with AbortSignal support.
* The sleep will be cancelled immediately when the AbortSignal is aborted.
*/
const sleep = async (ms: number, checkCancelled?: () => boolean): Promise<boolean> => {
const isClientSide = typeof window !== 'undefined'

if (!isClientSide) {
await new Promise((resolve) => setTimeout(resolve, ms))
return true
const sleep = async (ms: number, signal?: AbortSignal): Promise<boolean> => {
if (signal?.aborted) {
return false
}

const chunkMs = 100
let elapsed = 0
return new Promise((resolve) => {
let timeoutId: NodeJS.Timeout | undefined

while (elapsed < ms) {
if (checkCancelled?.()) {
return false
const onAbort = () => {
if (timeoutId) {
clearTimeout(timeoutId)
}
resolve(false)
}

const sleepTime = Math.min(chunkMs, ms - elapsed)
await new Promise((resolve) => setTimeout(resolve, sleepTime))
elapsed += sleepTime
}
if (signal) {
signal.addEventListener('abort', onAbort, { once: true })
}

return true
timeoutId = setTimeout(() => {
if (signal) {
signal.removeEventListener('abort', onAbort)
}
resolve(true)
}, ms)
})
}

/**
Expand Down Expand Up @@ -65,11 +65,7 @@ export class WaitBlockHandler implements BlockHandler {
throw new Error(`Wait time exceeds maximum of ${maxDisplay}`)
}

const checkCancelled = () => {
return (ctx as any).isCancelled === true
}

const completed = await sleep(waitMs, checkCancelled)
const completed = await sleep(waitMs, ctx.abortSignal)

if (!completed) {
return {
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ export class LoopOrchestrator {
}
}

if (ctx.isCancelled) {
if (ctx.abortSignal?.aborted) {
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
return this.createExitResult(ctx, loopId, scope)
}
Expand Down
8 changes: 6 additions & 2 deletions apps/sim/executor/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,12 @@ export interface ExecutionContext {
output: any
) => Promise<void>

// Cancellation support
isCancelled?: boolean
/**
* AbortSignal for cancellation support.
* When the signal is aborted, execution should stop gracefully.
* This is triggered when the SSE client disconnects.
*/
abortSignal?: AbortSignal

// Dynamically added nodes that need to be scheduled (e.g., from parallel expansion)
pendingDynamicNodes?: string[]
Expand Down
14 changes: 8 additions & 6 deletions apps/sim/lib/workflows/executor/execution-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ export interface ExecuteWorkflowCoreOptions {
callbacks: ExecutionCallbacks
loggingSession: LoggingSession
skipLogCreation?: boolean // For resume executions - reuse existing log entry
/**
* AbortSignal for cancellation support.
* When aborted (e.g., client disconnects from SSE), execution stops gracefully.
*/
abortSignal?: AbortSignal
}

function parseVariableValueByType(value: any, type: string): any {
Expand Down Expand Up @@ -98,11 +103,11 @@ function parseVariableValueByType(value: any, type: string): any {
export async function executeWorkflowCore(
options: ExecuteWorkflowCoreOptions
): Promise<ExecutionResult> {
const { snapshot, callbacks, loggingSession, skipLogCreation } = options
const { snapshot, callbacks, loggingSession, skipLogCreation, abortSignal } = options
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
metadata
const { onBlockStart, onBlockComplete, onStream, onExecutorCreated } = callbacks
const { onBlockStart, onBlockComplete, onStream } = callbacks

const providedWorkspaceId = metadata.workspaceId
if (!providedWorkspaceId) {
Expand Down Expand Up @@ -326,6 +331,7 @@ export async function executeWorkflowCore(
dagIncomingEdges: snapshot.state?.dagIncomingEdges,
snapshotState: snapshot.state,
metadata,
abortSignal,
}

const executorInstance = new Executor({
Expand All @@ -349,10 +355,6 @@ export async function executeWorkflowCore(
}
}

if (onExecutorCreated) {
onExecutorCreated(executorInstance)
}

const result = (await executorInstance.execute(
workflowId,
resolvedTriggerBlockId
Expand Down
Loading