diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index 9dcf3ad4bfc..fbe5e73a1da 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -45,7 +45,7 @@ class WorkflowScheduler( // CostBasedRegionPlanGenerator considers costs to try to find an optimal plan. new CostBasedScheduleGenerator( workflowContext, - physicalPlan, + physicalPlan.copy(executionMode = workflowContext.workflowSettings.executionMode), actorId ).generate() this.schedule = generatedSchedule diff --git a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala index 50d07a98194..7f17e1aebd7 100644 --- a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala +++ b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala @@ -151,8 +151,11 @@ class WorkflowCompiler( val (physicalPlan, outputPortsNeedingStorage) = expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None) - context.workflowSettings = - WorkflowSettings(context.workflowSettings.dataTransferBatchSize, outputPortsNeedingStorage) + context.workflowSettings = WorkflowSettings( + context.workflowSettings.dataTransferBatchSize, + outputPortsNeedingStorage, + context.workflowSettings.executionMode + ) Workflow(context, logicalPlan, physicalPlan) } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala index e9b830bdfdc..b018a1110eb 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala @@ -23,7 +23,12 @@ import org.apache.pekko.actor.{ActorSystem, Props} import org.apache.pekko.testkit.{ImplicitSender, TestKit} import org.apache.pekko.util.Timeout import org.apache.texera.amber.clustering.SingleNodeListener -import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext, WorkflowSettings} +import org.apache.texera.amber.core.workflow.{ + ExecutionMode, + PortIdentity, + WorkflowContext, + WorkflowSettings +} import org.apache.texera.amber.engine.architecture.controller._ import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._ import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER @@ -119,7 +124,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for headerlessCsv workflow" in { val expectedBatchSize = 1 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.STREAMING + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -141,7 +149,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for headerlessCsv->keyword workflow" in { val expectedBatchSize = 500 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.STREAMING + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -171,7 +182,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->keyword->count workflow" in { val expectedBatchSize = 100 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.STREAMING + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -209,7 +223,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy workflow" in { val expectedBatchSize = 300 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.STREAMING + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -250,7 +267,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->(csv->)->join workflow" in { val expectedBatchSize = 1 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.STREAMING + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) diff --git a/common/config/src/main/resources/gui.conf b/common/config/src/main/resources/gui.conf index 8039441b130..5020c40b854 100644 --- a/common/config/src/main/resources/gui.conf +++ b/common/config/src/main/resources/gui.conf @@ -63,8 +63,11 @@ gui { default-data-transfer-batch-size = 400 default-data-transfer-batch-size = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_DATA_TRANSFER_BATCH_SIZE} + # default execution mode for workflows, can be either BATCH or STREAMING + default-execution-mode = STREAMING + default-execution-mode = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_EXECUTION_MODE} + # whether selecting files from datasets instead of the local file system. - # The user system must be enabled to make this flag work! selecting-files-from-datasets-enabled = true selecting-files-from-datasets-enabled = ${?GUI_WORKFLOW_WORKSPACE_SELECTING_FILES_FROM_DATASETS_ENABLED} diff --git a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala index 14016f43743..adc789c9843 100644 --- a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala @@ -47,6 +47,8 @@ object GuiConfig { conf.getBoolean("gui.workflow-workspace.auto-attribute-correction-enabled") val guiWorkflowWorkspaceDefaultDataTransferBatchSize: Int = conf.getInt("gui.workflow-workspace.default-data-transfer-batch-size") + val guiWorkflowWorkspaceDefaultExecutionMode: String = + conf.getString("gui.workflow-workspace.default-execution-mode") val guiWorkflowWorkspaceSelectingFilesFromDatasetsEnabled: Boolean = conf.getBoolean("gui.workflow-workspace.selecting-files-from-datasets-enabled") val guiWorkflowWorkspaceWorkflowExecutionsTrackingEnabled: Boolean = diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java new file mode 100644 index 00000000000..4ebdda7c2a1 --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.core.workflow; + +public enum ExecutionMode { + STREAMING, + BATCH; + + public static ExecutionMode fromString(String value) { return valueOf(value); } +} \ No newline at end of file diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala index 5a2a2a61b28..ad3f24e97bb 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala @@ -38,7 +38,8 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, IteratorHasAsScala} case class PhysicalPlan( operators: Set[PhysicalOp], - links: Set[PhysicalLink] + links: Set[PhysicalLink], + executionMode: ExecutionMode = ExecutionMode.STREAMING ) extends LazyLogging { @transient private lazy val operatorMap: Map[PhysicalOpIdentity, PhysicalOp] = @@ -245,6 +246,7 @@ case class PhysicalPlan( getOperator(physicalOp.id).isInputLinkDependee( link ) || getOperator(upstreamPhysicalOpId).isOutputLinkBlocking(link) + || executionMode == ExecutionMode.BATCH ) } } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala index ee7659d0ca9..1e68620ed5e 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala @@ -30,7 +30,8 @@ object WorkflowContext { val DEFAULT_EXECUTION_ID: ExecutionIdentity = ExecutionIdentity(1L) val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L) val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings( - dataTransferBatchSize = 400 // TODO: make this configurable + dataTransferBatchSize = 400, + executionMode = ExecutionMode.STREAMING ) } class WorkflowContext( diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala index 88ebcb068f4..f5b4a610fd3 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala @@ -21,5 +21,6 @@ package org.apache.texera.amber.core.workflow case class WorkflowSettings( dataTransferBatchSize: Int, - outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty + outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty, + executionMode: ExecutionMode ) diff --git a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala index 30c657746fb..b7517d81eb7 100644 --- a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala +++ b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala @@ -46,6 +46,7 @@ class ConfigResource { "timetravelEnabled" -> GuiConfig.guiWorkflowWorkspaceTimetravelEnabled, "productionSharedEditingServer" -> GuiConfig.guiWorkflowWorkspaceProductionSharedEditingServer, "defaultDataTransferBatchSize" -> GuiConfig.guiWorkflowWorkspaceDefaultDataTransferBatchSize, + "defaultExecutionMode" -> GuiConfig.guiWorkflowWorkspaceDefaultExecutionMode, "workflowEmailNotificationEnabled" -> GuiConfig.guiWorkflowWorkspaceWorkflowEmailNotificationEnabled, "sharingComputingUnitEnabled" -> ComputingUnitConfig.sharingComputingUnitEnabled, "operatorConsoleMessageBufferSize" -> GuiConfig.guiWorkflowWorkspaceOperatorConsoleMessageBufferSize, diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts index 73feecfdba3..9ddf4bbcc2e 100644 --- a/frontend/src/app/app.module.ts +++ b/frontend/src/app/app.module.ts @@ -181,6 +181,7 @@ import { AdminSettingsComponent } from "./dashboard/component/admin/settings/adm import { FormlyRepeatDndComponent } from "./common/formly/repeat-dnd/repeat-dnd.component"; import { NzInputNumberModule } from "ng-zorro-antd/input-number"; import { NzCheckboxModule } from "ng-zorro-antd/checkbox"; +import { NzRadioModule } from "ng-zorro-antd/radio"; registerLocaleData(en); @@ -344,6 +345,7 @@ registerLocaleData(en); NzProgressModule, NzInputNumberModule, NzCheckboxModule, + NzRadioModule, ], providers: [ provideNzI18n(en_US), diff --git a/frontend/src/app/common/service/gui-config.service.mock.ts b/frontend/src/app/common/service/gui-config.service.mock.ts index daa8adfd224..bccaf99a9f4 100644 --- a/frontend/src/app/common/service/gui-config.service.mock.ts +++ b/frontend/src/app/common/service/gui-config.service.mock.ts @@ -20,6 +20,7 @@ import { Injectable } from "@angular/core"; import { Observable, of } from "rxjs"; import { GuiConfig } from "../type/gui-config"; +import { ExecutionMode } from "../type/workflow"; /** * Mock GuiConfigService for testing purposes. @@ -42,6 +43,7 @@ export class MockGuiConfigService { productionSharedEditingServer: false, pythonLanguageServerPort: "3000", defaultDataTransferBatchSize: 100, + defaultExecutionMode: ExecutionMode.STREAMING, workflowEmailNotificationEnabled: false, sharingComputingUnitEnabled: false, operatorConsoleMessageBufferSize: 1000, diff --git a/frontend/src/app/common/type/gui-config.ts b/frontend/src/app/common/type/gui-config.ts index b47dfa0ab1b..d8786c1dc08 100644 --- a/frontend/src/app/common/type/gui-config.ts +++ b/frontend/src/app/common/type/gui-config.ts @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ +import { ExecutionMode } from "./workflow"; // Please refer to core/config/src/main/resources/gui.conf for the definition of each config item export interface GuiConfig { @@ -33,6 +34,7 @@ export interface GuiConfig { productionSharedEditingServer: boolean; pythonLanguageServerPort: string; defaultDataTransferBatchSize: number; + defaultExecutionMode: ExecutionMode; workflowEmailNotificationEnabled: boolean; sharingComputingUnitEnabled: boolean; operatorConsoleMessageBufferSize: number; diff --git a/frontend/src/app/common/type/workflow.ts b/frontend/src/app/common/type/workflow.ts index 6792df9d7d0..8bc03eed8a4 100644 --- a/frontend/src/app/common/type/workflow.ts +++ b/frontend/src/app/common/type/workflow.ts @@ -20,8 +20,14 @@ import { WorkflowMetadata } from "../../dashboard/type/workflow-metadata.interface"; import { CommentBox, OperatorLink, OperatorPredicate, Point } from "../../workspace/types/workflow-common.interface"; +export enum ExecutionMode { + STREAMING = "STREAMING", + BATCH = "BATCH", +} + export interface WorkflowSettings { dataTransferBatchSize: number; + executionMode: ExecutionMode; } /** diff --git a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts index e87b5acd22a..c5a7d9804ad 100644 --- a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts +++ b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts @@ -19,7 +19,7 @@ //All times in test Workflows are in PST because our local machine's timezone is PST -import { Workflow, WorkflowContent } from "../../common/type/workflow"; +import { ExecutionMode, Workflow, WorkflowContent } from "../../common/type/workflow"; import { DashboardEntry } from "../type/dashboard-entry"; import { DashboardProject } from "../type/dashboard-project.interface"; @@ -39,7 +39,7 @@ export const testWorkflowContent = (operatorTypes: string[]): WorkflowContent => commentBoxes: [], links: [], operatorPositions: {}, - settings: { dataTransferBatchSize: 400 }, + settings: { dataTransferBatchSize: 400, executionMode: ExecutionMode.STREAMING }, }); export const testWorkflow1: Workflow = { diff --git a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts index 4903e70bcb4..75f61a223ce 100644 --- a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts +++ b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts @@ -31,7 +31,7 @@ import { DashboardEntry, UserInfo } from "../../../type/dashboard-entry"; import { UserService } from "../../../../common/service/user/user.service"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { NotificationService } from "../../../../common/service/notification/notification.service"; -import { WorkflowContent } from "../../../../common/type/workflow"; +import { ExecutionMode, WorkflowContent } from "../../../../common/type/workflow"; import { NzUploadFile } from "ng-zorro-antd/upload"; import * as JSZip from "jszip"; import { FiltersComponent } from "../filters/filters.component"; @@ -230,7 +230,10 @@ export class UserWorkflowComponent implements AfterViewInit { commentBoxes: [], links: [], operatorPositions: {}, - settings: { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize }, + settings: { + dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize, + executionMode: this.config.env.defaultExecutionMode, + }, }; let localPid = this.pid; this.workflowPersistService diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html index af16bb0dee1..2b9cad3f8a0 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html @@ -16,12 +16,25 @@ specific language governing permissions and limitations under the License. --> -
-

Workflow Settings

+ Execution Mode: + + +
+ +
+
{ - if (this.settingsForm.valid) { - this.confirmUpdateDataTransferBatchSize(value.dataTransferBatchSize); - } - }); + ngOnInit(): void { + this.settingsForm + .get("dataTransferBatchSize")! + .valueChanges.pipe(untilDestroyed(this)) + .subscribe((batchSize: number) => { + if (this.settingsForm.get("dataTransferBatchSize")!.valid) { + this.confirmUpdateDataTransferBatchSize(batchSize); + } + }); + + this.settingsForm + .get("executionMode")! + .valueChanges.pipe(untilDestroyed(this)) + .subscribe((mode: ExecutionMode) => { + this.updateExecutionMode(mode); + }); this.workflowActionService .workflowChanged() .pipe(untilDestroyed(this)) .subscribe(() => { - this.currentDataTransferBatchSize = - this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize || - this.config.env.defaultDataTransferBatchSize; + console.log("workflow changed", this.workflowActionService.getWorkflowContent().settings.executionMode); this.settingsForm.patchValue( - { dataTransferBatchSize: this.currentDataTransferBatchSize }, + { + dataTransferBatchSize: this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, + executionMode: this.workflowActionService.getWorkflowContent().settings.executionMode, + }, { emitEvent: false } ); }); @@ -85,13 +93,18 @@ export class SettingsComponent implements OnInit { } public persistWorkflow(): void { - this.isSaving = true; this.workflowPersistService .persistWorkflow(this.workflowActionService.getWorkflow()) .pipe(untilDestroyed(this)) .subscribe({ error: (e: unknown) => this.notificationService.error((e as Error).message), - }) - .add(() => (this.isSaving = false)); + }); } + + public updateExecutionMode(mode: ExecutionMode) { + this.workflowActionService.updateExecutionMode(mode); + this.persistWorkflow(); + } + + protected readonly ExecutionMode = ExecutionMode; } diff --git a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts index ddee22465aa..32a5e3a2db4 100644 --- a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts +++ b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts @@ -21,7 +21,7 @@ import { Injectable } from "@angular/core"; import * as joint from "jointjs"; import { BehaviorSubject, merge, Observable, Subject } from "rxjs"; -import { Workflow, WorkflowContent, WorkflowSettings } from "../../../../common/type/workflow"; +import { ExecutionMode, Workflow, WorkflowContent, WorkflowSettings } from "../../../../common/type/workflow"; import { WorkflowMetadata } from "../../../../dashboard/type/workflow-metadata.interface"; import { Comment, @@ -127,6 +127,7 @@ export class WorkflowActionService { private getDefaultSettings(): WorkflowSettings { return { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize, + executionMode: this.config.env.defaultExecutionMode, }; } @@ -807,6 +808,10 @@ export class WorkflowActionService { } } + public updateExecutionMode(mode: ExecutionMode): void { + this.setWorkflowSettings({ ...this.workflowSettings, executionMode: mode }); + } + public clearWorkflow(): void { this.destroySharedModel(); this.setWorkflowMetadata(undefined);