Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class WorkflowScheduler(
// CostBasedRegionPlanGenerator considers costs to try to find an optimal plan.
new CostBasedScheduleGenerator(
workflowContext,
physicalPlan,
physicalPlan.copy(executionMode = workflowContext.workflowSettings.executionMode),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this setting should be part of Physical plan. The information is already in workflowContext, why duplicate it again in physical plan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, please do not add such configuration to physical plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before doing this PR, Xiaozhen, Chen, and I discussed this and agreed that this flag should be part of the physical plan. Given how the CostBasedScheduleGenerator uses blocking output links in the physical plan to reason about regions, this information belongs in the physical plan. We are not duplicating information in the physical plan; we use this information to generate a new physical plan. In fact, the physical plans for streaming mode and batch mode are different, because the physical plan includes the links, and in batch mode every output link becomes a blocking link.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot agree.

I think physical plan should be exactly same, no matter how we execute it. If it got changed then we need to revisit previous decisions. The execution plan will be different, based on how we cut the regions and how we do materializations.

Copy link
Contributor Author

@aglinxinyuan aglinxinyuan Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think it’s true that the physical plan will be exactly the same regardless of how we execute it. If you look at the line 52, this.physicalPlan = updatedPhysicalPlan, the physical plan clearly changes before and after CostBasedScheduleGenerator, so the physical plan changes based on the execution plan.
We may need input from @Xiao-zhen-Liu to understand why Pasta needs to modify the physical plan and whether this behavior is intentional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pasta does not modify the physical plan. We can still keep the physical plan immutable for this PR by pushing the logic of marking all edges as materialized inside Pasta.

actorId
).generate()
this.schedule = generatedSchedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,11 @@ class WorkflowCompiler(
val (physicalPlan, outputPortsNeedingStorage) =
expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None)

context.workflowSettings =
WorkflowSettings(context.workflowSettings.dataTransferBatchSize, outputPortsNeedingStorage)
context.workflowSettings = WorkflowSettings(
context.workflowSettings.dataTransferBatchSize,
outputPortsNeedingStorage,
context.workflowSettings.executionMode
)

Workflow(context, logicalPlan, physicalPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
import org.apache.pekko.util.Timeout
import org.apache.texera.amber.clustering.SingleNodeListener
import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext, WorkflowSettings}
import org.apache.texera.amber.core.workflow.{
ExecutionMode,
PortIdentity,
WorkflowContext,
WorkflowSettings
}
import org.apache.texera.amber.engine.architecture.controller._
import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
Expand Down Expand Up @@ -119,7 +124,10 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for headerlessCsv workflow" in {
val expectedBatchSize = 1

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
val customWorkflowSettings = WorkflowSettings(
dataTransferBatchSize = expectedBatchSize,
executionMode = ExecutionMode.STREAMING
)

val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
Expand All @@ -141,7 +149,10 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for headerlessCsv->keyword workflow" in {
val expectedBatchSize = 500

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
val customWorkflowSettings = WorkflowSettings(
dataTransferBatchSize = expectedBatchSize,
executionMode = ExecutionMode.STREAMING
)

val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
Expand Down Expand Up @@ -171,7 +182,10 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for csv->keyword->count workflow" in {
val expectedBatchSize = 100

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
val customWorkflowSettings = WorkflowSettings(
dataTransferBatchSize = expectedBatchSize,
executionMode = ExecutionMode.STREAMING
)

val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
Expand Down Expand Up @@ -209,7 +223,10 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy workflow" in {
val expectedBatchSize = 300

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
val customWorkflowSettings = WorkflowSettings(
dataTransferBatchSize = expectedBatchSize,
executionMode = ExecutionMode.STREAMING
)

val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
Expand Down Expand Up @@ -250,7 +267,10 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for csv->(csv->)->join workflow" in {
val expectedBatchSize = 1

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
val customWorkflowSettings = WorkflowSettings(
dataTransferBatchSize = expectedBatchSize,
executionMode = ExecutionMode.STREAMING
)

val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
Expand Down
5 changes: 4 additions & 1 deletion common/config/src/main/resources/gui.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ gui {
default-data-transfer-batch-size = 400
default-data-transfer-batch-size = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_DATA_TRANSFER_BATCH_SIZE}

# default execution mode for workflows, can be either BATCH or STREAMING
default-execution-mode = STREAMING
default-execution-mode = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_EXECUTION_MODE}

# whether selecting files from datasets instead of the local file system.
# The user system must be enabled to make this flag work!
selecting-files-from-datasets-enabled = true
selecting-files-from-datasets-enabled = ${?GUI_WORKFLOW_WORKSPACE_SELECTING_FILES_FROM_DATASETS_ENABLED}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ object GuiConfig {
conf.getBoolean("gui.workflow-workspace.auto-attribute-correction-enabled")
val guiWorkflowWorkspaceDefaultDataTransferBatchSize: Int =
conf.getInt("gui.workflow-workspace.default-data-transfer-batch-size")
val guiWorkflowWorkspaceDefaultExecutionMode: String =
conf.getString("gui.workflow-workspace.default-execution-mode")
val guiWorkflowWorkspaceSelectingFilesFromDatasetsEnabled: Boolean =
conf.getBoolean("gui.workflow-workspace.selecting-files-from-datasets-enabled")
val guiWorkflowWorkspaceWorkflowExecutionsTrackingEnabled: Boolean =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.core.workflow;

public enum ExecutionMode {
STREAMING,
BATCH;

public static ExecutionMode fromString(String value) { return valueOf(value); }
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, IteratorHasAsScala}

case class PhysicalPlan(
operators: Set[PhysicalOp],
links: Set[PhysicalLink]
links: Set[PhysicalLink],
executionMode: ExecutionMode = ExecutionMode.STREAMING
) extends LazyLogging {

@transient private lazy val operatorMap: Map[PhysicalOpIdentity, PhysicalOp] =
Expand Down Expand Up @@ -245,6 +246,7 @@ case class PhysicalPlan(
getOperator(physicalOp.id).isInputLinkDependee(
link
) || getOperator(upstreamPhysicalOpId).isOutputLinkBlocking(link)
|| executionMode == ExecutionMode.BATCH
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ object WorkflowContext {
val DEFAULT_EXECUTION_ID: ExecutionIdentity = ExecutionIdentity(1L)
val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L)
val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings(
dataTransferBatchSize = 400 // TODO: make this configurable
dataTransferBatchSize = 400,
executionMode = ExecutionMode.STREAMING
)
}
class WorkflowContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ package org.apache.texera.amber.core.workflow

case class WorkflowSettings(
dataTransferBatchSize: Int,
outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty
outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty,
executionMode: ExecutionMode
)
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ConfigResource {
"timetravelEnabled" -> GuiConfig.guiWorkflowWorkspaceTimetravelEnabled,
"productionSharedEditingServer" -> GuiConfig.guiWorkflowWorkspaceProductionSharedEditingServer,
"defaultDataTransferBatchSize" -> GuiConfig.guiWorkflowWorkspaceDefaultDataTransferBatchSize,
"defaultExecutionMode" -> GuiConfig.guiWorkflowWorkspaceDefaultExecutionMode,
"workflowEmailNotificationEnabled" -> GuiConfig.guiWorkflowWorkspaceWorkflowEmailNotificationEnabled,
"sharingComputingUnitEnabled" -> ComputingUnitConfig.sharingComputingUnitEnabled,
"operatorConsoleMessageBufferSize" -> GuiConfig.guiWorkflowWorkspaceOperatorConsoleMessageBufferSize,
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ import { AdminSettingsComponent } from "./dashboard/component/admin/settings/adm
import { FormlyRepeatDndComponent } from "./common/formly/repeat-dnd/repeat-dnd.component";
import { NzInputNumberModule } from "ng-zorro-antd/input-number";
import { NzCheckboxModule } from "ng-zorro-antd/checkbox";
import { NzRadioModule } from "ng-zorro-antd/radio";

registerLocaleData(en);

Expand Down Expand Up @@ -344,6 +345,7 @@ registerLocaleData(en);
NzProgressModule,
NzInputNumberModule,
NzCheckboxModule,
NzRadioModule,
],
providers: [
provideNzI18n(en_US),
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/app/common/service/gui-config.service.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import { Injectable } from "@angular/core";
import { Observable, of } from "rxjs";
import { GuiConfig } from "../type/gui-config";
import { ExecutionMode } from "../type/workflow";

/**
* Mock GuiConfigService for testing purposes.
Expand All @@ -42,6 +43,7 @@ export class MockGuiConfigService {
productionSharedEditingServer: false,
pythonLanguageServerPort: "3000",
defaultDataTransferBatchSize: 100,
defaultExecutionMode: ExecutionMode.STREAMING,
workflowEmailNotificationEnabled: false,
sharingComputingUnitEnabled: false,
operatorConsoleMessageBufferSize: 1000,
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/app/common/type/gui-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
import { ExecutionMode } from "./workflow";

// Please refer to core/config/src/main/resources/gui.conf for the definition of each config item
export interface GuiConfig {
Expand All @@ -33,6 +34,7 @@ export interface GuiConfig {
productionSharedEditingServer: boolean;
pythonLanguageServerPort: string;
defaultDataTransferBatchSize: number;
defaultExecutionMode: ExecutionMode;
workflowEmailNotificationEnabled: boolean;
sharingComputingUnitEnabled: boolean;
operatorConsoleMessageBufferSize: number;
Expand Down
6 changes: 6 additions & 0 deletions frontend/src/app/common/type/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@
import { WorkflowMetadata } from "../../dashboard/type/workflow-metadata.interface";
import { CommentBox, OperatorLink, OperatorPredicate, Point } from "../../workspace/types/workflow-common.interface";

export enum ExecutionMode {
STREAMING = "STREAMING",
BATCH = "BATCH",
}

export interface WorkflowSettings {
dataTransferBatchSize: number;
executionMode: ExecutionMode;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

//All times in test Workflows are in PST because our local machine's timezone is PST

import { Workflow, WorkflowContent } from "../../common/type/workflow";
import { ExecutionMode, Workflow, WorkflowContent } from "../../common/type/workflow";
import { DashboardEntry } from "../type/dashboard-entry";
import { DashboardProject } from "../type/dashboard-project.interface";

Expand All @@ -39,7 +39,7 @@ export const testWorkflowContent = (operatorTypes: string[]): WorkflowContent =>
commentBoxes: [],
links: [],
operatorPositions: {},
settings: { dataTransferBatchSize: 400 },
settings: { dataTransferBatchSize: 400, executionMode: ExecutionMode.STREAMING },
});

export const testWorkflow1: Workflow = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import { DashboardEntry, UserInfo } from "../../../type/dashboard-entry";
import { UserService } from "../../../../common/service/user/user.service";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
import { NotificationService } from "../../../../common/service/notification/notification.service";
import { WorkflowContent } from "../../../../common/type/workflow";
import { ExecutionMode, WorkflowContent } from "../../../../common/type/workflow";
import { NzUploadFile } from "ng-zorro-antd/upload";
import * as JSZip from "jszip";
import { FiltersComponent } from "../filters/filters.component";
Expand Down Expand Up @@ -230,7 +230,10 @@ export class UserWorkflowComponent implements AfterViewInit {
commentBoxes: [],
links: [],
operatorPositions: {},
settings: { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize },
settings: {
dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize,
executionMode: this.config.env.defaultExecutionMode,
},
};
let localPid = this.pid;
this.workflowPersistService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,25 @@
specific language governing permissions and limitations
under the License.
-->

<div class="settings-container">
<h4>Workflow Settings</h4>
<form
[formGroup]="settingsForm"
class="form-inline">
<b>Execution Mode:</b>
<nz-radio-group formControlName="executionMode">
<label
nz-radio
[nzValue]="ExecutionMode.STREAMING"
>Streaming</label
>
<br />
<label
nz-radio
[nzValue]="ExecutionMode.BATCH"
>Batch</label
>
</nz-radio-group>
<br />
<div class="form-group">
<label for="dataTransferBatchSize">Data Transfer Batch Size:</label>
<input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
padding: 10px;
}

h4 {
margin-bottom: 15px;
}

.form-inline {
display: flex;
flex-direction: column;
Expand Down
Loading
Loading