Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions apps/sim/app/api/workflows/[id]/execute/cancel/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { requestCancellation } from '@/lib/execution/cancellation'

const CancelExecutionSchema = z.object({
executionId: z.string().uuid(),
})

export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'

export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
await params

const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}

let body: any = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Avoid using any type. Use unknown instead for better type safety

Suggested change
let body: any = {}
let body: unknown = {}

Context Used: Context from dashboard - TypeScript conventions and type safety (source)

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/workflows/[id]/execute/cancel/route.ts
Line: 21:21

Comment:
**style:** Avoid using `any` type. Use `unknown` instead for better type safety

```suggestion
  let body: unknown = {}
```

**Context Used:** Context from `dashboard` - TypeScript conventions and type safety ([source](https://app.greptile.com/review/custom-context?memory=b4f0be8d-a787-4d5a-9098-a66b1449df25))

How can I resolve this? If you propose a fix, please make it concise.

try {
const text = await req.text()
if (text) {
body = JSON.parse(text)
}
} catch {
return NextResponse.json({ error: 'Invalid request body' }, { status: 400 })
}

const validation = CancelExecutionSchema.safeParse(body)
if (!validation.success) {
return NextResponse.json({ error: 'Invalid request body' }, { status: 400 })
}

const { executionId } = validation.data
const success = await requestCancellation(executionId)
return NextResponse.json({ success })
}
17 changes: 4 additions & 13 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { clearCancellation } from '@/lib/execution/cancellation'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
Expand Down Expand Up @@ -496,7 +497,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}

const encoder = new TextEncoder()
let executorInstance: any = null
let isStreamClosed = false

const stream = new ReadableStream<Uint8Array>({
Expand Down Expand Up @@ -688,9 +688,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
onBlockStart,
onBlockComplete,
onStream,
onExecutorCreated: (executor) => {
executorInstance = executor
},
},
loggingSession,
})
Expand Down Expand Up @@ -757,24 +754,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
})
} finally {
await clearCancellation(executionId)

if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
} catch {
// Stream already closed - nothing to do
// Stream already closed
}
}
}
},
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client aborted SSE stream, cancelling executor`)

if (executorInstance && typeof executorInstance.cancel === 'function') {
executorInstance.cancel()
}
},
})

return new NextResponse(stream, {
Expand Down
145 changes: 94 additions & 51 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { db } from '@sim/db'
import { mcpServers } from '@sim/db/schema'
import { and, eq, inArray, isNull } from 'drizzle-orm'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { isCancellationRequested } from '@/lib/execution/cancellation'
import { createLogger } from '@/lib/logs/console/logger'
import {
BlockType,
Expand Down Expand Up @@ -32,6 +33,8 @@ import type { SubflowType } from '@/stores/workflows/workflow/types'

const logger = createLogger('BlockExecutor')

const CANCELLATION_CHECK_INTERVAL_MS = 1000

export class BlockExecutor {
constructor(
private blockHandlers: BlockHandler[],
Expand Down Expand Up @@ -548,10 +551,16 @@ export class BlockExecutor {
return
}

const [clientStream, executorStream] = stream.tee()
const { clientStream: controlledClientStream, consume } = this.createControlledStream(
ctx,
stream,
blockId,
responseFormat,
streamingExec
)

const processedClientStream = streamingResponseFormatProcessor.processStream(
clientStream,
controlledClientStream,
blockId,
selectedOutputs,
responseFormat
Expand All @@ -562,13 +571,6 @@ export class BlockExecutor {
stream: processedClientStream,
}

const executorConsumption = this.consumeExecutorStream(
executorStream,
streamingExec,
blockId,
responseFormat
)

const clientConsumption = (async () => {
try {
await ctx.onStream?.(clientStreamingExec)
Expand All @@ -577,7 +579,7 @@ export class BlockExecutor {
}
})()

await Promise.all([clientConsumption, executorConsumption])
await Promise.all([clientConsumption, consume()])
}

private async forwardStream(
Expand Down Expand Up @@ -605,57 +607,98 @@ export class BlockExecutor {
}
}

private async consumeExecutorStream(
stream: ReadableStream,
streamingExec: { execution: any },
private createControlledStream(
ctx: ExecutionContext,
sourceStream: ReadableStream,
blockId: string,
responseFormat: any
): Promise<void> {
const reader = stream.getReader()
const decoder = new TextDecoder()
responseFormat: any,
streamingExec: { execution: any }
): { clientStream: ReadableStream; consume: () => Promise<void> } {
let clientController: ReadableStreamDefaultController<Uint8Array> | null = null
let fullContent = ''

try {
while (true) {
const { done, value } = await reader.read()
if (done) break
fullContent += decoder.decode(value, { stream: true })
}
} catch (error) {
logger.error('Error reading executor stream for block', { blockId, error })
} finally {
try {
reader.releaseLock()
} catch {}
}

if (!fullContent) {
return
}
const clientStream = new ReadableStream<Uint8Array>({
start(controller) {
clientController = controller
},
})

const executionOutput = streamingExec.execution?.output
if (!executionOutput || typeof executionOutput !== 'object') {
return
}
const consume = async () => {
const reader = sourceStream.getReader()
const decoder = new TextDecoder()
let lastCancellationCheck = Date.now()

if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())

streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
while (true) {
const now = Date.now()
if (ctx.executionId && now - lastCancellationCheck >= CANCELLATION_CHECK_INTERVAL_MS) {
lastCancellationCheck = now
const cancelled = await isCancellationRequested(ctx.executionId)
if (cancelled) {
ctx.isCancelled = true
try {
clientController?.close()
} catch {}
reader.cancel()
break
}
}

const { done, value } = await reader.read()
if (done) {
try {
clientController?.close()
} catch {}
break
}

fullContent += decoder.decode(value, { stream: true })
try {
clientController?.enqueue(value)
} catch {}
}
return
} catch (error) {
logger.warn('Failed to parse streamed content for response format', { blockId, error })
if (!ctx.isCancelled) {
logger.error('Error reading stream for block', { blockId, error })
}
try {
clientController?.close()
} catch {}
} finally {
try {
reader.releaseLock()
} catch {}
}

if (!fullContent) {
return
}

const executionOutput = streamingExec.execution?.output
if (!executionOutput || typeof executionOutput !== 'object') {
return
}

if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())
streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
}
return
} catch (error) {
logger.warn('Failed to parse streamed content for response format', { blockId, error })
}
}

executionOutput.content = fullContent
}

executionOutput.content = fullContent
return { clientStream, consume }
}
}
16 changes: 14 additions & 2 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { isCancellationRequested } from '@/lib/execution/cancellation'
import { createLogger } from '@/lib/logs/console/logger'
import { BlockType } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
Expand Down Expand Up @@ -33,13 +34,24 @@ export class ExecutionEngine {
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
}

private async checkCancellation(): Promise<boolean> {
if (this.context.isCancelled) return true
const executionId = this.context.executionId
if (!executionId) return false
const cancelled = await isCancellationRequested(executionId)
if (cancelled) {
this.context.isCancelled = true
}
return cancelled
}

async run(triggerBlockId?: string): Promise<ExecutionResult> {
const startTime = Date.now()
try {
this.initializeQueue(triggerBlockId)

while (this.hasWork()) {
if (this.context.isCancelled && this.executing.size === 0) {
if ((await this.checkCancellation()) && this.executing.size === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the best place to put this?

break
}
await this.processQueue()
Expand Down Expand Up @@ -234,7 +246,7 @@ export class ExecutionEngine {

private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if (this.context.isCancelled) {
if (await this.checkCancellation()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check needs to move into redis

break
}
const nodeId = this.dequeue()
Expand Down
4 changes: 3 additions & 1 deletion apps/sim/executor/execution/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ export class DAGExecutor {
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)

// Link cancellation flag to context
Object.defineProperty(context, 'isCancelled', {
get: () => this.isCancelled,
set: (value: boolean) => {
this.isCancelled = value
},
enumerable: true,
configurable: true,
})
Expand Down
1 change: 0 additions & 1 deletion apps/sim/executor/execution/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export interface ExecutionCallbacks {
blockType: string,
output: any
) => Promise<void>
onExecutorCreated?: (executor: any) => void
}

export interface SerializableExecutionState {
Expand Down
Loading
Loading