From 0b429396a4ab2f02280094d013a25b723feb9437 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Fri, 2 Jan 2026 21:19:36 -0800 Subject: [PATCH 01/22] init --- .../amber/core/workflow/WorkflowSettings.scala | 3 ++- frontend/src/app/common/type/workflow.ts | 1 + .../user-workflow/user-workflow.component.ts | 5 ++++- .../left-panel/settings/settings.component.html | 3 +-- .../left-panel/settings/settings.component.scss | 4 ---- .../left-panel/settings/settings.component.ts | 17 ++++++++++++++--- .../model/workflow-action.service.ts | 5 +++++ 7 files changed, 27 insertions(+), 11 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala index 88ebcb068f4..3b1ab8ae8b6 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala @@ -21,5 +21,6 @@ package org.apache.texera.amber.core.workflow case class WorkflowSettings( dataTransferBatchSize: Int, - outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty + outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty, + batchProcessing: Boolean = false ) diff --git a/frontend/src/app/common/type/workflow.ts b/frontend/src/app/common/type/workflow.ts index 6792df9d7d0..c959183f574 100644 --- a/frontend/src/app/common/type/workflow.ts +++ b/frontend/src/app/common/type/workflow.ts @@ -22,6 +22,7 @@ import { CommentBox, OperatorLink, OperatorPredicate, Point } from "../../worksp export interface WorkflowSettings { dataTransferBatchSize: number; + batchProcessing: boolean; } /** diff --git a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts index 4903e70bcb4..0000567c365 100644 --- a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts +++ b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts @@ -230,7 +230,10 @@ export class UserWorkflowComponent implements AfterViewInit { commentBoxes: [], links: [], operatorPositions: {}, - settings: { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize }, + settings: { + dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize, + batchProcessing: false + }, }; let localPid = this.pid; this.workflowPersistService diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html index af16bb0dee1..85f4b2af092 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html @@ -16,9 +16,7 @@ specific language governing permissions and limitations under the License. --> -
-

Workflow Settings

@@ -36,5 +34,6 @@

Workflow Settings

Data Transfer Batch Size size must be at least 1.
+ diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss b/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss index 3dfd30e0462..16a468e92df 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss @@ -21,10 +21,6 @@ padding: 10px; } -h4 { - margin-bottom: 15px; -} - .form-inline { display: flex; flex-direction: column; diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts index ead28857ff5..dfd4b845a93 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts @@ -35,7 +35,6 @@ import { GuiConfigService } from "../../../../common/service/gui-config.service" export class SettingsComponent implements OnInit { settingsForm!: FormGroup; currentDataTransferBatchSize!: number; - isSaving: boolean = false; constructor( private fb: FormBuilder, @@ -53,6 +52,7 @@ export class SettingsComponent implements OnInit { this.settingsForm = this.fb.group({ dataTransferBatchSize: [this.currentDataTransferBatchSize, [Validators.required, Validators.min(1)]], + batchProcessing: [this.workflowActionService.getWorkflowContent().settings.batchProcessing], }); this.settingsForm.valueChanges.pipe(untilDestroyed(this)).subscribe(value => { @@ -61,6 +61,14 @@ export class SettingsComponent implements OnInit { } }); + this.settingsForm + .get('batchProcessing')! + .valueChanges + .pipe(untilDestroyed(this)) + .subscribe((enabled: boolean) => { + this.updateBatchProcessing(enabled); + }); + this.workflowActionService .workflowChanged() .pipe(untilDestroyed(this)) @@ -85,13 +93,16 @@ export class SettingsComponent implements OnInit { } public persistWorkflow(): void { - this.isSaving = true; this.workflowPersistService .persistWorkflow(this.workflowActionService.getWorkflow()) .pipe(untilDestroyed(this)) .subscribe({ error: (e: unknown) => this.notificationService.error((e as Error).message), }) - .add(() => (this.isSaving = false)); + } + + public updateBatchProcessing(enabled: boolean) { + this.workflowActionService.updateBatchProcessing(enabled); + this.persistWorkflow(); } } diff --git a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts index ddee22465aa..4c960d027c5 100644 --- a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts +++ b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts @@ -127,6 +127,7 @@ export class WorkflowActionService { private getDefaultSettings(): WorkflowSettings { return { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize, + batchProcessing: false, }; } @@ -807,6 +808,10 @@ export class WorkflowActionService { } } + public updateBatchProcessing(enabled: boolean): void { + this.setWorkflowSettings({ ...this.workflowSettings, batchProcessing: enabled }); + } + public clearWorkflow(): void { this.destroySharedModel(); this.setWorkflowMetadata(undefined); From 151aa938ed694dd06edd209e38ef199728b92d9d Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 6 Jan 2026 20:16:40 -0800 Subject: [PATCH 02/22] update --- .../texera/workflow/WorkflowCompiler.scala | 2 +- .../amber/core/workflow/WorkflowContext.scala | 3 ++- .../core/workflow/WorkflowSettings.scala | 2 +- .../left-panel/settings/settings.component.ts | 26 +++++++++---------- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala index 50d07a98194..4e53bc170dd 100644 --- a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala +++ b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala @@ -152,7 +152,7 @@ class WorkflowCompiler( expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None) context.workflowSettings = - WorkflowSettings(context.workflowSettings.dataTransferBatchSize, outputPortsNeedingStorage) + WorkflowSettings(context.workflowSettings.dataTransferBatchSize, outputPortsNeedingStorage, context.workflowSettings.batchProcessing) Workflow(context, logicalPlan, physicalPlan) } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala index ee7659d0ca9..04ce26d7cb1 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala @@ -30,7 +30,8 @@ object WorkflowContext { val DEFAULT_EXECUTION_ID: ExecutionIdentity = ExecutionIdentity(1L) val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L) val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings( - dataTransferBatchSize = 400 // TODO: make this configurable + dataTransferBatchSize = 400, + batchProcessing = false ) } class WorkflowContext( diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala index 3b1ab8ae8b6..17885c5db11 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala @@ -22,5 +22,5 @@ package org.apache.texera.amber.core.workflow case class WorkflowSettings( dataTransferBatchSize: Int, outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty, - batchProcessing: Boolean = false + batchProcessing: Boolean ) diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts index dfd4b845a93..953c18ce97e 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts @@ -46,20 +46,20 @@ export class SettingsComponent implements OnInit { ) {} ngOnInit(): void { - this.currentDataTransferBatchSize = - this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize || - this.config.env.defaultDataTransferBatchSize; - this.settingsForm = this.fb.group({ - dataTransferBatchSize: [this.currentDataTransferBatchSize, [Validators.required, Validators.min(1)]], + dataTransferBatchSize: [this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, [Validators.required, Validators.min(1)]], batchProcessing: [this.workflowActionService.getWorkflowContent().settings.batchProcessing], }); - this.settingsForm.valueChanges.pipe(untilDestroyed(this)).subscribe(value => { - if (this.settingsForm.valid) { - this.confirmUpdateDataTransferBatchSize(value.dataTransferBatchSize); - } - }); + this.settingsForm + .get('dataTransferBatchSize')! + .valueChanges + .pipe(untilDestroyed(this)) + .subscribe((batchSize: number) => { + if (this.settingsForm.get('dataTransferBatchSize')!.valid) { + this.confirmUpdateDataTransferBatchSize(batchSize); + } + }); this.settingsForm .get('batchProcessing')! @@ -73,11 +73,9 @@ export class SettingsComponent implements OnInit { .workflowChanged() .pipe(untilDestroyed(this)) .subscribe(() => { - this.currentDataTransferBatchSize = - this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize || - this.config.env.defaultDataTransferBatchSize; this.settingsForm.patchValue( - { dataTransferBatchSize: this.currentDataTransferBatchSize }, + { dataTransferBatchSize: this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, + batchProcessing: this.workflowActionService.getWorkflowContent().settings.batchProcessing}, { emitEvent: false } ); }); From 9b3b253efc0b4833316f9ddc33cc329ac61715eb Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 6 Jan 2026 21:46:26 -0800 Subject: [PATCH 03/22] update --- .../engine/architecture/controller/WorkflowScheduler.scala | 2 +- .../org/apache/texera/amber/core/workflow/PhysicalPlan.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index 9dcf3ad4bfc..dec3fc3c7a9 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -45,7 +45,7 @@ class WorkflowScheduler( // CostBasedRegionPlanGenerator considers costs to try to find an optimal plan. new CostBasedScheduleGenerator( workflowContext, - physicalPlan, + physicalPlan.copy(batchProcessing = workflowContext.workflowSettings.batchProcessing), actorId ).generate() this.schedule = generatedSchedule diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala index 5a2a2a61b28..31d61edcbdb 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala @@ -38,7 +38,8 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, IteratorHasAsScala} case class PhysicalPlan( operators: Set[PhysicalOp], - links: Set[PhysicalLink] + links: Set[PhysicalLink], + batchProcessing: Boolean = false ) extends LazyLogging { @transient private lazy val operatorMap: Map[PhysicalOpIdentity, PhysicalOp] = @@ -245,6 +246,7 @@ case class PhysicalPlan( getOperator(physicalOp.id).isInputLinkDependee( link ) || getOperator(upstreamPhysicalOpId).isOutputLinkBlocking(link) + || batchProcessing ) } } From 75dc0ddbe78f02046dce5035f7efd6454f43c3dd Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 6 Jan 2026 21:47:39 -0800 Subject: [PATCH 04/22] update --- .../org/apache/texera/workflow/WorkflowCompiler.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala index 4e53bc170dd..4d76743c30d 100644 --- a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala +++ b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala @@ -151,8 +151,11 @@ class WorkflowCompiler( val (physicalPlan, outputPortsNeedingStorage) = expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None) - context.workflowSettings = - WorkflowSettings(context.workflowSettings.dataTransferBatchSize, outputPortsNeedingStorage, context.workflowSettings.batchProcessing) + context.workflowSettings = WorkflowSettings( + context.workflowSettings.dataTransferBatchSize, + outputPortsNeedingStorage, + context.workflowSettings.batchProcessing + ) Workflow(context, logicalPlan, physicalPlan) } From 6a24f867919154da8af7d1364a9be9fe94f29d12 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 6 Jan 2026 21:48:19 -0800 Subject: [PATCH 05/22] update --- .../user-workflow/user-workflow.component.ts | 2 +- .../settings/settings.component.html | 6 ++++- .../left-panel/settings/settings.component.ts | 25 +++++++++++-------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts index 0000567c365..28db9a2dfe4 100644 --- a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts +++ b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts @@ -232,7 +232,7 @@ export class UserWorkflowComponent implements AfterViewInit { operatorPositions: {}, settings: { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize, - batchProcessing: false + batchProcessing: false, }, }; let localPid = this.pid; diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html index 85f4b2af092..577a9db7437 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html @@ -34,6 +34,10 @@ Data Transfer Batch Size size must be at least 1. - + diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts index 953c18ce97e..6524e3d2cf7 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts @@ -47,24 +47,25 @@ export class SettingsComponent implements OnInit { ngOnInit(): void { this.settingsForm = this.fb.group({ - dataTransferBatchSize: [this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, [Validators.required, Validators.min(1)]], + dataTransferBatchSize: [ + this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, + [Validators.required, Validators.min(1)], + ], batchProcessing: [this.workflowActionService.getWorkflowContent().settings.batchProcessing], }); this.settingsForm - .get('dataTransferBatchSize')! - .valueChanges - .pipe(untilDestroyed(this)) + .get("dataTransferBatchSize")! + .valueChanges.pipe(untilDestroyed(this)) .subscribe((batchSize: number) => { - if (this.settingsForm.get('dataTransferBatchSize')!.valid) { + if (this.settingsForm.get("dataTransferBatchSize")!.valid) { this.confirmUpdateDataTransferBatchSize(batchSize); } }); this.settingsForm - .get('batchProcessing')! - .valueChanges - .pipe(untilDestroyed(this)) + .get("batchProcessing")! + .valueChanges.pipe(untilDestroyed(this)) .subscribe((enabled: boolean) => { this.updateBatchProcessing(enabled); }); @@ -74,8 +75,10 @@ export class SettingsComponent implements OnInit { .pipe(untilDestroyed(this)) .subscribe(() => { this.settingsForm.patchValue( - { dataTransferBatchSize: this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, - batchProcessing: this.workflowActionService.getWorkflowContent().settings.batchProcessing}, + { + dataTransferBatchSize: this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, + batchProcessing: this.workflowActionService.getWorkflowContent().settings.batchProcessing, + }, { emitEvent: false } ); }); @@ -96,7 +99,7 @@ export class SettingsComponent implements OnInit { .pipe(untilDestroyed(this)) .subscribe({ error: (e: unknown) => this.notificationService.error((e as Error).message), - }) + }); } public updateBatchProcessing(enabled: boolean) { From 548b2f832028ac43a83622017a988439165f4b38 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 7 Jan 2026 00:21:09 -0800 Subject: [PATCH 06/22] update --- .../amber/engine/e2e/BatchSizePropagationSpec.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala index e9b830bdfdc..8b0bd201662 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala @@ -119,7 +119,7 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for headerlessCsv workflow" in { val expectedBatchSize = 1 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, batchProcessing = false) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -141,7 +141,7 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for headerlessCsv->keyword workflow" in { val expectedBatchSize = 500 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, batchProcessing = false) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -171,7 +171,7 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->keyword->count workflow" in { val expectedBatchSize = 100 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, batchProcessing = false) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -209,7 +209,7 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy workflow" in { val expectedBatchSize = 300 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, batchProcessing = false) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -250,7 +250,7 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->(csv->)->join workflow" in { val expectedBatchSize = 1 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, batchProcessing = false) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) From 3b5f52fe54fc8a6f6d0bf35244c3760cfd75ed62 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Thu, 8 Jan 2026 17:22:26 -0800 Subject: [PATCH 07/22] update --- frontend/src/app/app.module.ts | 138 +++++++++--------- .../settings/settings.component.html | 14 +- 2 files changed, 79 insertions(+), 73 deletions(-) diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts index 73feecfdba3..cb1848c455e 100644 --- a/frontend/src/app/app.module.ts +++ b/frontend/src/app/app.module.ts @@ -181,6 +181,7 @@ import { AdminSettingsComponent } from "./dashboard/component/admin/settings/adm import { FormlyRepeatDndComponent } from "./common/formly/repeat-dnd/repeat-dnd.component"; import { NzInputNumberModule } from "ng-zorro-antd/input-number"; import { NzCheckboxModule } from "ng-zorro-antd/checkbox"; +import { NzRadioModule } from "ng-zorro-antd/radio"; registerLocaleData(en); @@ -277,74 +278,75 @@ registerLocaleData(en); ComputingUnitSelectionComponent, AdminSettingsComponent, ], - imports: [ - BrowserModule, - AppRoutingModule, - HttpClientModule, - JwtModule.forRoot({ - config: { - tokenGetter: AuthService.getAccessToken, - skipWhenExpired: false, - throwNoTokenError: false, - disallowedRoutes: ["forum/api/users"], - }, - }), - BrowserAnimationsModule, - RouterModule, - FormsModule, - ReactiveFormsModule, - FormlyModule.forRoot(TEXERA_FORMLY_CONFIG), - FormlyNgZorroAntdModule, - OverlayModule, - NzDatePickerModule, - NzDropDownModule, - NzButtonModule, - NzAutocompleteModule, - NzIconModule, - NzFormModule, - NzListModule, - NzInputModule, - NzPopoverModule, - NzCollapseModule, - NzToolTipModule, - NzTableModule, - NzSelectModule, - NzSpaceModule, - NzBadgeModule, - NzUploadModule, - NgxJsonViewerModule, - NzMessageModule, - NzModalModule, - NzDescriptionsModule, - NzCardModule, - NzTagModule, - NzPopconfirmModule, - NzAvatarModule, - NzTabsModule, - NzPaginationModule, - NzCommentModule, - ColorPickerModule, - NzSwitchModule, - NzLayoutModule, - NzSliderModule, - MarkdownModule.forRoot(), - DragDropModule, - NzAlertModule, - NzResizableModule, - NzSpinModule, - NgxFileDropModule, - NzTreeModule, - NzTreeViewModule, - NzNoAnimationModule, - TreeModule, - SocialLoginModule, - GoogleSigninButtonModule, - NzEmptyModule, - NzDividerModule, - NzProgressModule, - NzInputNumberModule, - NzCheckboxModule, - ], + imports: [ + BrowserModule, + AppRoutingModule, + HttpClientModule, + JwtModule.forRoot({ + config: { + tokenGetter: AuthService.getAccessToken, + skipWhenExpired: false, + throwNoTokenError: false, + disallowedRoutes: ["forum/api/users"], + }, + }), + BrowserAnimationsModule, + RouterModule, + FormsModule, + ReactiveFormsModule, + FormlyModule.forRoot(TEXERA_FORMLY_CONFIG), + FormlyNgZorroAntdModule, + OverlayModule, + NzDatePickerModule, + NzDropDownModule, + NzButtonModule, + NzAutocompleteModule, + NzIconModule, + NzFormModule, + NzListModule, + NzInputModule, + NzPopoverModule, + NzCollapseModule, + NzToolTipModule, + NzTableModule, + NzSelectModule, + NzSpaceModule, + NzBadgeModule, + NzUploadModule, + NgxJsonViewerModule, + NzMessageModule, + NzModalModule, + NzDescriptionsModule, + NzCardModule, + NzTagModule, + NzPopconfirmModule, + NzAvatarModule, + NzTabsModule, + NzPaginationModule, + NzCommentModule, + ColorPickerModule, + NzSwitchModule, + NzLayoutModule, + NzSliderModule, + MarkdownModule.forRoot(), + DragDropModule, + NzAlertModule, + NzResizableModule, + NzSpinModule, + NgxFileDropModule, + NzTreeModule, + NzTreeViewModule, + NzNoAnimationModule, + TreeModule, + SocialLoginModule, + GoogleSigninButtonModule, + NzEmptyModule, + NzDividerModule, + NzProgressModule, + NzInputNumberModule, + NzCheckboxModule, + NzRadioModule, + ], providers: [ provideNzI18n(en_US), AuthGuardService, diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html index 577a9db7437..d069e9469f7 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html @@ -20,6 +20,15 @@
+ Execution Mode: + + + + +
+
+
+
-
From a1119bc0abc8ce3206df14c6e1ec40837c6fd528 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 10 Jan 2026 01:34:58 -0800 Subject: [PATCH 08/22] update --- .../controller/WorkflowScheduler.scala | 2 +- .../texera/workflow/WorkflowCompiler.scala | 2 +- .../engine/e2e/BatchSizePropagationSpec.scala | 12 ++--- .../amber/core/workflow/ExecutionMode.java | 6 +++ .../amber/core/workflow/PhysicalPlan.scala | 4 +- .../amber/core/workflow/WorkflowContext.scala | 2 +- .../core/workflow/WorkflowSettings.scala | 8 +-- frontend/src/app/common/type/workflow.ts | 7 ++- .../user-workflow/user-workflow.component.ts | 52 ++++++++++--------- .../settings/settings.component.html | 10 ++-- .../left-panel/settings/settings.component.ts | 20 +++---- .../model/workflow-action.service.ts | 38 +++++++------- 12 files changed, 89 insertions(+), 74 deletions(-) create mode 100644 common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index dec3fc3c7a9..fbe5e73a1da 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -45,7 +45,7 @@ class WorkflowScheduler( // CostBasedRegionPlanGenerator considers costs to try to find an optimal plan. new CostBasedScheduleGenerator( workflowContext, - physicalPlan.copy(batchProcessing = workflowContext.workflowSettings.batchProcessing), + physicalPlan.copy(executionMode = workflowContext.workflowSettings.executionMode), actorId ).generate() this.schedule = generatedSchedule diff --git a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala index 4d76743c30d..7f17e1aebd7 100644 --- a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala +++ b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala @@ -154,7 +154,7 @@ class WorkflowCompiler( context.workflowSettings = WorkflowSettings( context.workflowSettings.dataTransferBatchSize, outputPortsNeedingStorage, - context.workflowSettings.batchProcessing + context.workflowSettings.executionMode ) Workflow(context, logicalPlan, physicalPlan) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala index 8b0bd201662..35a81752bad 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala @@ -23,7 +23,7 @@ import org.apache.pekko.actor.{ActorSystem, Props} import org.apache.pekko.testkit.{ImplicitSender, TestKit} import org.apache.pekko.util.Timeout import org.apache.texera.amber.clustering.SingleNodeListener -import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext, WorkflowSettings} +import org.apache.texera.amber.core.workflow.{ExecutionMode, PortIdentity, WorkflowContext, WorkflowSettings} import org.apache.texera.amber.engine.architecture.controller._ import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._ import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER @@ -119,7 +119,7 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for headerlessCsv workflow" in { val expectedBatchSize = 1 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, batchProcessing = false) + val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, executionMode = ExecutionMode.STREAMING) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -141,7 +141,7 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for headerlessCsv->keyword workflow" in { val expectedBatchSize = 500 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, batchProcessing = false) + val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, executionMode = ExecutionMode.STREAMING) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -171,7 +171,7 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->keyword->count workflow" in { val expectedBatchSize = 100 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, batchProcessing = false) + val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, executionMode = ExecutionMode.STREAMING) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -209,7 +209,7 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy workflow" in { val expectedBatchSize = 300 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, batchProcessing = false) + val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, executionMode = ExecutionMode.STREAMING) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -250,7 +250,7 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->(csv->)->join workflow" in { val expectedBatchSize = 1 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, batchProcessing = false) + val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, executionMode = ExecutionMode.STREAMING) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java new file mode 100644 index 00000000000..3731c72b2b2 --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java @@ -0,0 +1,6 @@ +package org.apache.texera.amber.core.workflow; + +public enum ExecutionMode { + STREAMING, + BATCH +} \ No newline at end of file diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala index 31d61edcbdb..ad3f24e97bb 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala @@ -39,7 +39,7 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, IteratorHasAsScala} case class PhysicalPlan( operators: Set[PhysicalOp], links: Set[PhysicalLink], - batchProcessing: Boolean = false + executionMode: ExecutionMode = ExecutionMode.STREAMING ) extends LazyLogging { @transient private lazy val operatorMap: Map[PhysicalOpIdentity, PhysicalOp] = @@ -246,7 +246,7 @@ case class PhysicalPlan( getOperator(physicalOp.id).isInputLinkDependee( link ) || getOperator(upstreamPhysicalOpId).isOutputLinkBlocking(link) - || batchProcessing + || executionMode == ExecutionMode.BATCH ) } } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala index 04ce26d7cb1..1e68620ed5e 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala @@ -31,7 +31,7 @@ object WorkflowContext { val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L) val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings( dataTransferBatchSize = 400, - batchProcessing = false + executionMode = ExecutionMode.STREAMING ) } class WorkflowContext( diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala index 17885c5db11..e71f16932f6 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala @@ -19,8 +19,10 @@ package org.apache.texera.amber.core.workflow + + case class WorkflowSettings( - dataTransferBatchSize: Int, - outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty, - batchProcessing: Boolean + dataTransferBatchSize: Int, + outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty, + executionMode: ExecutionMode ) diff --git a/frontend/src/app/common/type/workflow.ts b/frontend/src/app/common/type/workflow.ts index c959183f574..7965aa9d48d 100644 --- a/frontend/src/app/common/type/workflow.ts +++ b/frontend/src/app/common/type/workflow.ts @@ -20,9 +20,14 @@ import { WorkflowMetadata } from "../../dashboard/type/workflow-metadata.interface"; import { CommentBox, OperatorLink, OperatorPredicate, Point } from "../../workspace/types/workflow-common.interface"; +export enum ExecutionMode { + STREAMING, + BATCH, +} + export interface WorkflowSettings { dataTransferBatchSize: number; - batchProcessing: boolean; + executionMode: ExecutionMode; } /** diff --git a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts index 28db9a2dfe4..0e706a659d7 100644 --- a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts +++ b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts @@ -17,34 +17,38 @@ * under the License. */ -import { AfterViewInit, Component, Input, ViewChild } from "@angular/core"; -import { Router } from "@angular/router"; -import { NzModalService } from "ng-zorro-antd/modal"; -import { firstValueFrom, from, lastValueFrom, Observable, of } from "rxjs"; +import {AfterViewInit, Component, Input, ViewChild} from "@angular/core"; +import {Router} from "@angular/router"; +import {NzModalService} from "ng-zorro-antd/modal"; +import {firstValueFrom, from, lastValueFrom, Observable, of} from "rxjs"; import { DEFAULT_WORKFLOW_NAME, WorkflowPersistService, } from "../../../../common/service/workflow-persist/workflow-persist.service"; -import { NgbdModalAddProjectWorkflowComponent } from "../user-project/user-project-section/ngbd-modal-add-project-workflow/ngbd-modal-add-project-workflow.component"; -import { NgbdModalRemoveProjectWorkflowComponent } from "../user-project/user-project-section/ngbd-modal-remove-project-workflow/ngbd-modal-remove-project-workflow.component"; -import { DashboardEntry, UserInfo } from "../../../type/dashboard-entry"; -import { UserService } from "../../../../common/service/user/user.service"; -import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; -import { NotificationService } from "../../../../common/service/notification/notification.service"; -import { WorkflowContent } from "../../../../common/type/workflow"; -import { NzUploadFile } from "ng-zorro-antd/upload"; +import { + NgbdModalAddProjectWorkflowComponent +} from "../user-project/user-project-section/ngbd-modal-add-project-workflow/ngbd-modal-add-project-workflow.component"; +import { + NgbdModalRemoveProjectWorkflowComponent +} from "../user-project/user-project-section/ngbd-modal-remove-project-workflow/ngbd-modal-remove-project-workflow.component"; +import {DashboardEntry, UserInfo} from "../../../type/dashboard-entry"; +import {UserService} from "../../../../common/service/user/user.service"; +import {UntilDestroy, untilDestroyed} from "@ngneat/until-destroy"; +import {NotificationService} from "../../../../common/service/notification/notification.service"; +import {ExecutionMode, WorkflowContent} from "../../../../common/type/workflow"; +import {NzUploadFile} from "ng-zorro-antd/upload"; import * as JSZip from "jszip"; -import { FiltersComponent } from "../filters/filters.component"; -import { SearchResultsComponent } from "../search-results/search-results.component"; -import { SearchService } from "../../../service/user/search.service"; -import { SortMethod } from "../../../type/sort-method"; -import { isDefined } from "../../../../common/util/predicate"; -import { UserProjectService } from "../../../service/user/project/user-project.service"; -import { map, mergeMap, switchMap, tap } from "rxjs/operators"; -import { DashboardWorkflow } from "../../../type/dashboard-workflow.interface"; -import { DownloadService } from "../../../service/user/download/download.service"; -import { DASHBOARD_USER_WORKSPACE } from "../../../../app-routing.constant"; -import { GuiConfigService } from "../../../../common/service/gui-config.service"; +import {FiltersComponent} from "../filters/filters.component"; +import {SearchResultsComponent} from "../search-results/search-results.component"; +import {SearchService} from "../../../service/user/search.service"; +import {SortMethod} from "../../../type/sort-method"; +import {isDefined} from "../../../../common/util/predicate"; +import {UserProjectService} from "../../../service/user/project/user-project.service"; +import {map, mergeMap, switchMap, tap} from "rxjs/operators"; +import {DashboardWorkflow} from "../../../type/dashboard-workflow.interface"; +import {DownloadService} from "../../../service/user/download/download.service"; +import {DASHBOARD_USER_WORKSPACE} from "../../../../app-routing.constant"; +import {GuiConfigService} from "../../../../common/service/gui-config.service"; /** * Saved-workflow-section component contains information and functionality @@ -232,7 +236,7 @@ export class UserWorkflowComponent implements AfterViewInit { operatorPositions: {}, settings: { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize, - batchProcessing: false, + executionMode: ExecutionMode.STREAMING, }, }; let localPid = this.pid; diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html index d069e9469f7..c5a52f1c6ba 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html @@ -21,14 +21,12 @@ [formGroup]="settingsForm" class="form-inline"> Execution Mode: - - - + + +
+

-
-
-
{ - this.updateBatchProcessing(enabled); + .subscribe((mode: ExecutionMode) => { + this.updateExecutionMode(mode); }); this.workflowActionService @@ -77,7 +75,7 @@ export class SettingsComponent implements OnInit { this.settingsForm.patchValue( { dataTransferBatchSize: this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, - batchProcessing: this.workflowActionService.getWorkflowContent().settings.batchProcessing, + executionMode: this.workflowActionService.getWorkflowContent().settings.executionMode, }, { emitEvent: false } ); @@ -102,8 +100,10 @@ export class SettingsComponent implements OnInit { }); } - public updateBatchProcessing(enabled: boolean) { - this.workflowActionService.updateBatchProcessing(enabled); + public updateExecutionMode(mode: ExecutionMode) { + this.workflowActionService.updateExecutionMode(mode); this.persistWorkflow(); } + + protected readonly ExecutionMode = ExecutionMode; } diff --git a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts index 4c960d027c5..4b3ce64b33f 100644 --- a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts +++ b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts @@ -17,12 +17,12 @@ * under the License. */ -import { Injectable } from "@angular/core"; +import {Injectable} from "@angular/core"; import * as joint from "jointjs"; -import { BehaviorSubject, merge, Observable, Subject } from "rxjs"; -import { Workflow, WorkflowContent, WorkflowSettings } from "../../../../common/type/workflow"; -import { WorkflowMetadata } from "../../../../dashboard/type/workflow-metadata.interface"; +import {BehaviorSubject, merge, Observable, Subject} from "rxjs"; +import {ExecutionMode, Workflow, WorkflowContent, WorkflowSettings} from "../../../../common/type/workflow"; +import {WorkflowMetadata} from "../../../../dashboard/type/workflow-metadata.interface"; import { Comment, CommentBox, @@ -32,18 +32,18 @@ import { Point, PortDescription, } from "../../../types/workflow-common.interface"; -import { JointUIService } from "../../joint-ui/joint-ui.service"; -import { OperatorMetadataService } from "../../operator-metadata/operator-metadata.service"; -import { UndoRedoService } from "../../undo-redo/undo-redo.service"; -import { WorkflowUtilService } from "../util/workflow-util.service"; -import { JointGraphWrapper } from "./joint-graph-wrapper"; -import { SyncTexeraModel } from "./sync-texera-model"; -import { WorkflowGraph, WorkflowGraphReadonly } from "./workflow-graph"; -import { filter } from "rxjs/operators"; -import { isDefined } from "../../../../common/util/predicate"; -import { User } from "../../../../common/type/user"; -import { SharedModelChangeHandler } from "./shared-model-change-handler"; -import { GuiConfigService } from "../../../../common/service/gui-config.service"; +import {JointUIService} from "../../joint-ui/joint-ui.service"; +import {OperatorMetadataService} from "../../operator-metadata/operator-metadata.service"; +import {UndoRedoService} from "../../undo-redo/undo-redo.service"; +import {WorkflowUtilService} from "../util/workflow-util.service"; +import {JointGraphWrapper} from "./joint-graph-wrapper"; +import {SyncTexeraModel} from "./sync-texera-model"; +import {WorkflowGraph, WorkflowGraphReadonly} from "./workflow-graph"; +import {filter} from "rxjs/operators"; +import {isDefined} from "../../../../common/util/predicate"; +import {User} from "../../../../common/type/user"; +import {SharedModelChangeHandler} from "./shared-model-change-handler"; +import {GuiConfigService} from "../../../../common/service/gui-config.service"; export const DEFAULT_WORKFLOW_NAME = "Untitled Workflow"; export const DEFAULT_WORKFLOW = { @@ -127,7 +127,7 @@ export class WorkflowActionService { private getDefaultSettings(): WorkflowSettings { return { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize, - batchProcessing: false, + executionMode: ExecutionMode.STREAMING, }; } @@ -808,8 +808,8 @@ export class WorkflowActionService { } } - public updateBatchProcessing(enabled: boolean): void { - this.setWorkflowSettings({ ...this.workflowSettings, batchProcessing: enabled }); + public updateExecutionMode(mode: ExecutionMode): void { + this.setWorkflowSettings({ ...this.workflowSettings, executionMode: mode }); } public clearWorkflow(): void { From 7a7c82b3da096213f09a0f3b0e29fce523013134 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 10 Jan 2026 01:37:42 -0800 Subject: [PATCH 09/22] update --- .../left-panel/settings/settings.component.ts | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts index ab0c267fa40..1f7234a8f36 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts @@ -33,7 +33,7 @@ import {ExecutionMode} from "../../../../common/type/workflow"; styleUrls: ["./settings.component.scss"], }) export class SettingsComponent implements OnInit { - settingsForm!: FormGroup; + settingsForm: FormGroup; constructor( private fb: FormBuilder, @@ -41,17 +41,14 @@ export class SettingsComponent implements OnInit { private workflowPersistService: WorkflowPersistService, private userService: UserService, private notificationService: NotificationService, - ) {} - - ngOnInit(): void { + ) { this.settingsForm = this.fb.group({ - dataTransferBatchSize: [ - this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, - [Validators.required, Validators.min(1)], - ], - executionMode: [this.workflowActionService.getWorkflowContent().settings.executionMode], + dataTransferBatchSize: [[Validators.required, Validators.min(1)],], + executionMode: [], }); + } + ngOnInit(): void { this.settingsForm .get("dataTransferBatchSize")! .valueChanges.pipe(untilDestroyed(this)) From fcd9ceb4a0e661a168131bb9e09536582309ff49 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Mon, 12 Jan 2026 12:13:24 -0800 Subject: [PATCH 10/22] update --- .../component/left-panel/settings/settings.component.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts index 1f7234a8f36..61782167e40 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts @@ -43,8 +43,8 @@ export class SettingsComponent implements OnInit { private notificationService: NotificationService, ) { this.settingsForm = this.fb.group({ - dataTransferBatchSize: [[Validators.required, Validators.min(1)],], - executionMode: [], + dataTransferBatchSize: [this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, [Validators.required, Validators.min(1)],], + executionMode: [this.workflowActionService.getWorkflowContent().settings.executionMode], }); } From 4b9877fb40cd134e6b818f38d221e23a2fe4f936 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 13 Jan 2026 16:27:01 -0800 Subject: [PATCH 11/22] update --- .../amber/core/workflow/ExecutionMode.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java index 3731c72b2b2..1c0212c1c4a 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.texera.amber.core.workflow; public enum ExecutionMode { From ef3255135156289239728346ac95ade15e4ef342 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 13 Jan 2026 16:29:37 -0800 Subject: [PATCH 12/22] fix fmt --- .../engine/e2e/BatchSizePropagationSpec.scala | 25 +++++++++++++++---- .../core/workflow/WorkflowSettings.scala | 8 +++--- .../settings/settings.component.html | 18 +++++++++---- .../left-panel/settings/settings.component.ts | 9 ++++--- 4 files changed, 42 insertions(+), 18 deletions(-) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala index 35a81752bad..2f022d7431a 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala @@ -119,7 +119,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for headerlessCsv workflow" in { val expectedBatchSize = 1 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, executionMode = ExecutionMode.STREAMING) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.STREAMING + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -141,7 +144,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for headerlessCsv->keyword workflow" in { val expectedBatchSize = 500 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, executionMode = ExecutionMode.STREAMING) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.STREAMING + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -171,7 +177,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->keyword->count workflow" in { val expectedBatchSize = 100 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, executionMode = ExecutionMode.STREAMING) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.STREAMING + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -209,7 +218,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy workflow" in { val expectedBatchSize = 300 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, executionMode = ExecutionMode.STREAMING) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.STREAMING + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -250,7 +262,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->(csv->)->join workflow" in { val expectedBatchSize = 1 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize, executionMode = ExecutionMode.STREAMING) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.STREAMING + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala index e71f16932f6..f5b4a610fd3 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala @@ -19,10 +19,8 @@ package org.apache.texera.amber.core.workflow - - case class WorkflowSettings( - dataTransferBatchSize: Int, - outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty, - executionMode: ExecutionMode + dataTransferBatchSize: Int, + outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty, + executionMode: ExecutionMode ) diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html index c5a52f1c6ba..611022b967a 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html @@ -21,12 +21,20 @@ [formGroup]="settingsForm" class="form-inline"> Execution Mode: - - -
- + + +
+
-
+
Date: Tue, 13 Jan 2026 16:32:41 -0800 Subject: [PATCH 13/22] fix fmt --- .../engine/e2e/BatchSizePropagationSpec.scala | 7 +- frontend/src/app/app.module.ts | 138 +++++++++--------- .../user-workflow/user-workflow.component.ts | 50 +++---- .../model/workflow-action.service.ts | 32 ++-- 4 files changed, 114 insertions(+), 113 deletions(-) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala index 2f022d7431a..b018a1110eb 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala @@ -23,7 +23,12 @@ import org.apache.pekko.actor.{ActorSystem, Props} import org.apache.pekko.testkit.{ImplicitSender, TestKit} import org.apache.pekko.util.Timeout import org.apache.texera.amber.clustering.SingleNodeListener -import org.apache.texera.amber.core.workflow.{ExecutionMode, PortIdentity, WorkflowContext, WorkflowSettings} +import org.apache.texera.amber.core.workflow.{ + ExecutionMode, + PortIdentity, + WorkflowContext, + WorkflowSettings +} import org.apache.texera.amber.engine.architecture.controller._ import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._ import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts index cb1848c455e..9ddf4bbcc2e 100644 --- a/frontend/src/app/app.module.ts +++ b/frontend/src/app/app.module.ts @@ -278,75 +278,75 @@ registerLocaleData(en); ComputingUnitSelectionComponent, AdminSettingsComponent, ], - imports: [ - BrowserModule, - AppRoutingModule, - HttpClientModule, - JwtModule.forRoot({ - config: { - tokenGetter: AuthService.getAccessToken, - skipWhenExpired: false, - throwNoTokenError: false, - disallowedRoutes: ["forum/api/users"], - }, - }), - BrowserAnimationsModule, - RouterModule, - FormsModule, - ReactiveFormsModule, - FormlyModule.forRoot(TEXERA_FORMLY_CONFIG), - FormlyNgZorroAntdModule, - OverlayModule, - NzDatePickerModule, - NzDropDownModule, - NzButtonModule, - NzAutocompleteModule, - NzIconModule, - NzFormModule, - NzListModule, - NzInputModule, - NzPopoverModule, - NzCollapseModule, - NzToolTipModule, - NzTableModule, - NzSelectModule, - NzSpaceModule, - NzBadgeModule, - NzUploadModule, - NgxJsonViewerModule, - NzMessageModule, - NzModalModule, - NzDescriptionsModule, - NzCardModule, - NzTagModule, - NzPopconfirmModule, - NzAvatarModule, - NzTabsModule, - NzPaginationModule, - NzCommentModule, - ColorPickerModule, - NzSwitchModule, - NzLayoutModule, - NzSliderModule, - MarkdownModule.forRoot(), - DragDropModule, - NzAlertModule, - NzResizableModule, - NzSpinModule, - NgxFileDropModule, - NzTreeModule, - NzTreeViewModule, - NzNoAnimationModule, - TreeModule, - SocialLoginModule, - GoogleSigninButtonModule, - NzEmptyModule, - NzDividerModule, - NzProgressModule, - NzInputNumberModule, - NzCheckboxModule, - NzRadioModule, - ], + imports: [ + BrowserModule, + AppRoutingModule, + HttpClientModule, + JwtModule.forRoot({ + config: { + tokenGetter: AuthService.getAccessToken, + skipWhenExpired: false, + throwNoTokenError: false, + disallowedRoutes: ["forum/api/users"], + }, + }), + BrowserAnimationsModule, + RouterModule, + FormsModule, + ReactiveFormsModule, + FormlyModule.forRoot(TEXERA_FORMLY_CONFIG), + FormlyNgZorroAntdModule, + OverlayModule, + NzDatePickerModule, + NzDropDownModule, + NzButtonModule, + NzAutocompleteModule, + NzIconModule, + NzFormModule, + NzListModule, + NzInputModule, + NzPopoverModule, + NzCollapseModule, + NzToolTipModule, + NzTableModule, + NzSelectModule, + NzSpaceModule, + NzBadgeModule, + NzUploadModule, + NgxJsonViewerModule, + NzMessageModule, + NzModalModule, + NzDescriptionsModule, + NzCardModule, + NzTagModule, + NzPopconfirmModule, + NzAvatarModule, + NzTabsModule, + NzPaginationModule, + NzCommentModule, + ColorPickerModule, + NzSwitchModule, + NzLayoutModule, + NzSliderModule, + MarkdownModule.forRoot(), + DragDropModule, + NzAlertModule, + NzResizableModule, + NzSpinModule, + NgxFileDropModule, + NzTreeModule, + NzTreeViewModule, + NzNoAnimationModule, + TreeModule, + SocialLoginModule, + GoogleSigninButtonModule, + NzEmptyModule, + NzDividerModule, + NzProgressModule, + NzInputNumberModule, + NzCheckboxModule, + NzRadioModule, + ], providers: [ provideNzI18n(en_US), AuthGuardService, diff --git a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts index 0e706a659d7..e41736e0ace 100644 --- a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts +++ b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts @@ -17,38 +17,34 @@ * under the License. */ -import {AfterViewInit, Component, Input, ViewChild} from "@angular/core"; -import {Router} from "@angular/router"; -import {NzModalService} from "ng-zorro-antd/modal"; -import {firstValueFrom, from, lastValueFrom, Observable, of} from "rxjs"; +import { AfterViewInit, Component, Input, ViewChild } from "@angular/core"; +import { Router } from "@angular/router"; +import { NzModalService } from "ng-zorro-antd/modal"; +import { firstValueFrom, from, lastValueFrom, Observable, of } from "rxjs"; import { DEFAULT_WORKFLOW_NAME, WorkflowPersistService, } from "../../../../common/service/workflow-persist/workflow-persist.service"; -import { - NgbdModalAddProjectWorkflowComponent -} from "../user-project/user-project-section/ngbd-modal-add-project-workflow/ngbd-modal-add-project-workflow.component"; -import { - NgbdModalRemoveProjectWorkflowComponent -} from "../user-project/user-project-section/ngbd-modal-remove-project-workflow/ngbd-modal-remove-project-workflow.component"; -import {DashboardEntry, UserInfo} from "../../../type/dashboard-entry"; -import {UserService} from "../../../../common/service/user/user.service"; -import {UntilDestroy, untilDestroyed} from "@ngneat/until-destroy"; -import {NotificationService} from "../../../../common/service/notification/notification.service"; -import {ExecutionMode, WorkflowContent} from "../../../../common/type/workflow"; -import {NzUploadFile} from "ng-zorro-antd/upload"; +import { NgbdModalAddProjectWorkflowComponent } from "../user-project/user-project-section/ngbd-modal-add-project-workflow/ngbd-modal-add-project-workflow.component"; +import { NgbdModalRemoveProjectWorkflowComponent } from "../user-project/user-project-section/ngbd-modal-remove-project-workflow/ngbd-modal-remove-project-workflow.component"; +import { DashboardEntry, UserInfo } from "../../../type/dashboard-entry"; +import { UserService } from "../../../../common/service/user/user.service"; +import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; +import { NotificationService } from "../../../../common/service/notification/notification.service"; +import { ExecutionMode, WorkflowContent } from "../../../../common/type/workflow"; +import { NzUploadFile } from "ng-zorro-antd/upload"; import * as JSZip from "jszip"; -import {FiltersComponent} from "../filters/filters.component"; -import {SearchResultsComponent} from "../search-results/search-results.component"; -import {SearchService} from "../../../service/user/search.service"; -import {SortMethod} from "../../../type/sort-method"; -import {isDefined} from "../../../../common/util/predicate"; -import {UserProjectService} from "../../../service/user/project/user-project.service"; -import {map, mergeMap, switchMap, tap} from "rxjs/operators"; -import {DashboardWorkflow} from "../../../type/dashboard-workflow.interface"; -import {DownloadService} from "../../../service/user/download/download.service"; -import {DASHBOARD_USER_WORKSPACE} from "../../../../app-routing.constant"; -import {GuiConfigService} from "../../../../common/service/gui-config.service"; +import { FiltersComponent } from "../filters/filters.component"; +import { SearchResultsComponent } from "../search-results/search-results.component"; +import { SearchService } from "../../../service/user/search.service"; +import { SortMethod } from "../../../type/sort-method"; +import { isDefined } from "../../../../common/util/predicate"; +import { UserProjectService } from "../../../service/user/project/user-project.service"; +import { map, mergeMap, switchMap, tap } from "rxjs/operators"; +import { DashboardWorkflow } from "../../../type/dashboard-workflow.interface"; +import { DownloadService } from "../../../service/user/download/download.service"; +import { DASHBOARD_USER_WORKSPACE } from "../../../../app-routing.constant"; +import { GuiConfigService } from "../../../../common/service/gui-config.service"; /** * Saved-workflow-section component contains information and functionality diff --git a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts index 4b3ce64b33f..5b67c23f189 100644 --- a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts +++ b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts @@ -17,12 +17,12 @@ * under the License. */ -import {Injectable} from "@angular/core"; +import { Injectable } from "@angular/core"; import * as joint from "jointjs"; -import {BehaviorSubject, merge, Observable, Subject} from "rxjs"; -import {ExecutionMode, Workflow, WorkflowContent, WorkflowSettings} from "../../../../common/type/workflow"; -import {WorkflowMetadata} from "../../../../dashboard/type/workflow-metadata.interface"; +import { BehaviorSubject, merge, Observable, Subject } from "rxjs"; +import { ExecutionMode, Workflow, WorkflowContent, WorkflowSettings } from "../../../../common/type/workflow"; +import { WorkflowMetadata } from "../../../../dashboard/type/workflow-metadata.interface"; import { Comment, CommentBox, @@ -32,18 +32,18 @@ import { Point, PortDescription, } from "../../../types/workflow-common.interface"; -import {JointUIService} from "../../joint-ui/joint-ui.service"; -import {OperatorMetadataService} from "../../operator-metadata/operator-metadata.service"; -import {UndoRedoService} from "../../undo-redo/undo-redo.service"; -import {WorkflowUtilService} from "../util/workflow-util.service"; -import {JointGraphWrapper} from "./joint-graph-wrapper"; -import {SyncTexeraModel} from "./sync-texera-model"; -import {WorkflowGraph, WorkflowGraphReadonly} from "./workflow-graph"; -import {filter} from "rxjs/operators"; -import {isDefined} from "../../../../common/util/predicate"; -import {User} from "../../../../common/type/user"; -import {SharedModelChangeHandler} from "./shared-model-change-handler"; -import {GuiConfigService} from "../../../../common/service/gui-config.service"; +import { JointUIService } from "../../joint-ui/joint-ui.service"; +import { OperatorMetadataService } from "../../operator-metadata/operator-metadata.service"; +import { UndoRedoService } from "../../undo-redo/undo-redo.service"; +import { WorkflowUtilService } from "../util/workflow-util.service"; +import { JointGraphWrapper } from "./joint-graph-wrapper"; +import { SyncTexeraModel } from "./sync-texera-model"; +import { WorkflowGraph, WorkflowGraphReadonly } from "./workflow-graph"; +import { filter } from "rxjs/operators"; +import { isDefined } from "../../../../common/util/predicate"; +import { User } from "../../../../common/type/user"; +import { SharedModelChangeHandler } from "./shared-model-change-handler"; +import { GuiConfigService } from "../../../../common/service/gui-config.service"; export const DEFAULT_WORKFLOW_NAME = "Untitled Workflow"; export const DEFAULT_WORKFLOW = { From 3530e71d78bd3a494e011406c4d1bf433601866d Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 13 Jan 2026 21:30:43 -0800 Subject: [PATCH 14/22] fix test --- .../dashboard/component/user-dashboard-test-fixtures.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts index e87b5acd22a..f821eb32441 100644 --- a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts +++ b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts @@ -19,9 +19,9 @@ //All times in test Workflows are in PST because our local machine's timezone is PST -import { Workflow, WorkflowContent } from "../../common/type/workflow"; -import { DashboardEntry } from "../type/dashboard-entry"; -import { DashboardProject } from "../type/dashboard-project.interface"; +import {ExecutionMode, Workflow, WorkflowContent} from "../../common/type/workflow"; +import {DashboardEntry} from "../type/dashboard-entry"; +import {DashboardProject} from "../type/dashboard-project.interface"; //the Date class creates unix timestamp based on local timezone, therefore test workflow time needs to be in local timezone const oneDay = 86400000; @@ -39,7 +39,7 @@ export const testWorkflowContent = (operatorTypes: string[]): WorkflowContent => commentBoxes: [], links: [], operatorPositions: {}, - settings: { dataTransferBatchSize: 400 }, + settings: { dataTransferBatchSize: 400 , executionMode: ExecutionMode.STREAMING }, }); export const testWorkflow1: Workflow = { From 2e17500b7ecfa5a1310ab864b90472bf498fc9cd Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 13 Jan 2026 21:40:35 -0800 Subject: [PATCH 15/22] fix test --- .../dashboard/component/user-dashboard-test-fixtures.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts index f821eb32441..c5a7d9804ad 100644 --- a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts +++ b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts @@ -19,9 +19,9 @@ //All times in test Workflows are in PST because our local machine's timezone is PST -import {ExecutionMode, Workflow, WorkflowContent} from "../../common/type/workflow"; -import {DashboardEntry} from "../type/dashboard-entry"; -import {DashboardProject} from "../type/dashboard-project.interface"; +import { ExecutionMode, Workflow, WorkflowContent } from "../../common/type/workflow"; +import { DashboardEntry } from "../type/dashboard-entry"; +import { DashboardProject } from "../type/dashboard-project.interface"; //the Date class creates unix timestamp based on local timezone, therefore test workflow time needs to be in local timezone const oneDay = 86400000; @@ -39,7 +39,7 @@ export const testWorkflowContent = (operatorTypes: string[]): WorkflowContent => commentBoxes: [], links: [], operatorPositions: {}, - settings: { dataTransferBatchSize: 400 , executionMode: ExecutionMode.STREAMING }, + settings: { dataTransferBatchSize: 400, executionMode: ExecutionMode.STREAMING }, }); export const testWorkflow1: Workflow = { From a67c326787c8651ecc37473cde5910959ab28e68 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 14 Jan 2026 23:58:50 -0800 Subject: [PATCH 16/22] add defaultExecutionMode --- frontend/src/app/common/type/gui-config.ts | 2 ++ .../service/workflow-graph/model/workflow-action.service.ts | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/frontend/src/app/common/type/gui-config.ts b/frontend/src/app/common/type/gui-config.ts index b47dfa0ab1b..b840e7347c8 100644 --- a/frontend/src/app/common/type/gui-config.ts +++ b/frontend/src/app/common/type/gui-config.ts @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ +import {ExecutionMode} from "./workflow"; // Please refer to core/config/src/main/resources/gui.conf for the definition of each config item export interface GuiConfig { @@ -33,6 +34,7 @@ export interface GuiConfig { productionSharedEditingServer: boolean; pythonLanguageServerPort: string; defaultDataTransferBatchSize: number; + defaultExecutionMode: ExecutionMode; workflowEmailNotificationEnabled: boolean; sharingComputingUnitEnabled: boolean; operatorConsoleMessageBufferSize: number; diff --git a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts index 5b67c23f189..32a5e3a2db4 100644 --- a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts +++ b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts @@ -127,7 +127,7 @@ export class WorkflowActionService { private getDefaultSettings(): WorkflowSettings { return { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize, - executionMode: ExecutionMode.STREAMING, + executionMode: this.config.env.defaultExecutionMode, }; } From dc873292fe5156950931737a9102b31e87d2ee0b Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Thu, 15 Jan 2026 00:48:05 -0800 Subject: [PATCH 17/22] add defaultExecutionMode --- .../component/user/user-workflow/user-workflow.component.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts index e41736e0ace..75f61a223ce 100644 --- a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts +++ b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts @@ -232,7 +232,7 @@ export class UserWorkflowComponent implements AfterViewInit { operatorPositions: {}, settings: { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize, - executionMode: ExecutionMode.STREAMING, + executionMode: this.config.env.defaultExecutionMode, }, }; let localPid = this.pid; From 95a14c842f132312e1bf4a026990742dc751f496 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Thu, 15 Jan 2026 17:08:40 -0800 Subject: [PATCH 18/22] add defaultExecutionMode --- common/config/src/main/resources/gui.conf | 5 ++++- .../main/scala/org/apache/texera/config/GuiConfig.scala | 2 ++ .../apache/texera/amber/core/workflow/ExecutionMode.java | 4 +++- .../apache/texera/service/resource/ConfigResource.scala | 1 + .../src/app/common/service/gui-config.service.mock.ts | 2 ++ frontend/src/app/common/type/gui-config.ts | 2 +- frontend/src/app/common/type/workflow.ts | 4 ++-- .../component/left-panel/settings/settings.component.html | 8 ++++---- .../component/left-panel/settings/settings.component.ts | 1 + 9 files changed, 20 insertions(+), 9 deletions(-) diff --git a/common/config/src/main/resources/gui.conf b/common/config/src/main/resources/gui.conf index 8039441b130..5020c40b854 100644 --- a/common/config/src/main/resources/gui.conf +++ b/common/config/src/main/resources/gui.conf @@ -63,8 +63,11 @@ gui { default-data-transfer-batch-size = 400 default-data-transfer-batch-size = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_DATA_TRANSFER_BATCH_SIZE} + # default execution mode for workflows, can be either BATCH or STREAMING + default-execution-mode = STREAMING + default-execution-mode = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_EXECUTION_MODE} + # whether selecting files from datasets instead of the local file system. - # The user system must be enabled to make this flag work! selecting-files-from-datasets-enabled = true selecting-files-from-datasets-enabled = ${?GUI_WORKFLOW_WORKSPACE_SELECTING_FILES_FROM_DATASETS_ENABLED} diff --git a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala index 14016f43743..adc789c9843 100644 --- a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala @@ -47,6 +47,8 @@ object GuiConfig { conf.getBoolean("gui.workflow-workspace.auto-attribute-correction-enabled") val guiWorkflowWorkspaceDefaultDataTransferBatchSize: Int = conf.getInt("gui.workflow-workspace.default-data-transfer-batch-size") + val guiWorkflowWorkspaceDefaultExecutionMode: String = + conf.getString("gui.workflow-workspace.default-execution-mode") val guiWorkflowWorkspaceSelectingFilesFromDatasetsEnabled: Boolean = conf.getBoolean("gui.workflow-workspace.selecting-files-from-datasets-enabled") val guiWorkflowWorkspaceWorkflowExecutionsTrackingEnabled: Boolean = diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java index 1c0212c1c4a..4ebdda7c2a1 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java @@ -21,5 +21,7 @@ public enum ExecutionMode { STREAMING, - BATCH + BATCH; + + public static ExecutionMode fromString(String value) { return valueOf(value); } } \ No newline at end of file diff --git a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala index 30c657746fb..b7517d81eb7 100644 --- a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala +++ b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala @@ -46,6 +46,7 @@ class ConfigResource { "timetravelEnabled" -> GuiConfig.guiWorkflowWorkspaceTimetravelEnabled, "productionSharedEditingServer" -> GuiConfig.guiWorkflowWorkspaceProductionSharedEditingServer, "defaultDataTransferBatchSize" -> GuiConfig.guiWorkflowWorkspaceDefaultDataTransferBatchSize, + "defaultExecutionMode" -> GuiConfig.guiWorkflowWorkspaceDefaultExecutionMode, "workflowEmailNotificationEnabled" -> GuiConfig.guiWorkflowWorkspaceWorkflowEmailNotificationEnabled, "sharingComputingUnitEnabled" -> ComputingUnitConfig.sharingComputingUnitEnabled, "operatorConsoleMessageBufferSize" -> GuiConfig.guiWorkflowWorkspaceOperatorConsoleMessageBufferSize, diff --git a/frontend/src/app/common/service/gui-config.service.mock.ts b/frontend/src/app/common/service/gui-config.service.mock.ts index daa8adfd224..bccaf99a9f4 100644 --- a/frontend/src/app/common/service/gui-config.service.mock.ts +++ b/frontend/src/app/common/service/gui-config.service.mock.ts @@ -20,6 +20,7 @@ import { Injectable } from "@angular/core"; import { Observable, of } from "rxjs"; import { GuiConfig } from "../type/gui-config"; +import { ExecutionMode } from "../type/workflow"; /** * Mock GuiConfigService for testing purposes. @@ -42,6 +43,7 @@ export class MockGuiConfigService { productionSharedEditingServer: false, pythonLanguageServerPort: "3000", defaultDataTransferBatchSize: 100, + defaultExecutionMode: ExecutionMode.STREAMING, workflowEmailNotificationEnabled: false, sharingComputingUnitEnabled: false, operatorConsoleMessageBufferSize: 1000, diff --git a/frontend/src/app/common/type/gui-config.ts b/frontend/src/app/common/type/gui-config.ts index b840e7347c8..d8786c1dc08 100644 --- a/frontend/src/app/common/type/gui-config.ts +++ b/frontend/src/app/common/type/gui-config.ts @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import {ExecutionMode} from "./workflow"; +import { ExecutionMode } from "./workflow"; // Please refer to core/config/src/main/resources/gui.conf for the definition of each config item export interface GuiConfig { diff --git a/frontend/src/app/common/type/workflow.ts b/frontend/src/app/common/type/workflow.ts index 7965aa9d48d..8bc03eed8a4 100644 --- a/frontend/src/app/common/type/workflow.ts +++ b/frontend/src/app/common/type/workflow.ts @@ -21,8 +21,8 @@ import { WorkflowMetadata } from "../../dashboard/type/workflow-metadata.interfa import { CommentBox, OperatorLink, OperatorPredicate, Point } from "../../workspace/types/workflow-common.interface"; export enum ExecutionMode { - STREAMING, - BATCH, + STREAMING = "STREAMING", + BATCH = "BATCH", } export interface WorkflowSettings { diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html index 611022b967a..2b9cad3f8a0 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html @@ -24,14 +24,14 @@ Streaming
Batch

diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts index d7ab644a896..b9d196945be 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts @@ -72,6 +72,7 @@ export class SettingsComponent implements OnInit { .workflowChanged() .pipe(untilDestroyed(this)) .subscribe(() => { + console.log("workflow changed", this.workflowActionService.getWorkflowContent().settings.executionMode); this.settingsForm.patchValue( { dataTransferBatchSize: this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize, From db9df6c4122bbf91bfd45db7e3caf624de245e60 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 17 Jan 2026 21:58:36 -0800 Subject: [PATCH 19/22] rename --- .../amber/engine/e2e/BatchSizePropagationSpec.scala | 10 +++++----- .../texera/amber/core/workflow/ExecutionMode.java | 4 ++-- .../texera/amber/core/workflow/PhysicalPlan.scala | 4 ++-- .../texera/amber/core/workflow/WorkflowContext.scala | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala index b018a1110eb..93e94b4fa91 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala @@ -126,7 +126,7 @@ class BatchSizePropagationSpec val customWorkflowSettings = WorkflowSettings( dataTransferBatchSize = expectedBatchSize, - executionMode = ExecutionMode.STREAMING + executionMode = ExecutionMode.PIPELINED ) val context = @@ -151,7 +151,7 @@ class BatchSizePropagationSpec val customWorkflowSettings = WorkflowSettings( dataTransferBatchSize = expectedBatchSize, - executionMode = ExecutionMode.STREAMING + executionMode = ExecutionMode.PIPELINED ) val context = @@ -184,7 +184,7 @@ class BatchSizePropagationSpec val customWorkflowSettings = WorkflowSettings( dataTransferBatchSize = expectedBatchSize, - executionMode = ExecutionMode.STREAMING + executionMode = ExecutionMode.PIPELINED ) val context = @@ -225,7 +225,7 @@ class BatchSizePropagationSpec val customWorkflowSettings = WorkflowSettings( dataTransferBatchSize = expectedBatchSize, - executionMode = ExecutionMode.STREAMING + executionMode = ExecutionMode.PIPELINED ) val context = @@ -269,7 +269,7 @@ class BatchSizePropagationSpec val customWorkflowSettings = WorkflowSettings( dataTransferBatchSize = expectedBatchSize, - executionMode = ExecutionMode.STREAMING + executionMode = ExecutionMode.PIPELINED ) val context = diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java index 4ebdda7c2a1..c02690e4e56 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java @@ -20,8 +20,8 @@ package org.apache.texera.amber.core.workflow; public enum ExecutionMode { - STREAMING, - BATCH; + PIPELINED, + MATERIALIZED; public static ExecutionMode fromString(String value) { return valueOf(value); } } \ No newline at end of file diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala index ad3f24e97bb..70d645793f6 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalPlan.scala @@ -39,7 +39,7 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, IteratorHasAsScala} case class PhysicalPlan( operators: Set[PhysicalOp], links: Set[PhysicalLink], - executionMode: ExecutionMode = ExecutionMode.STREAMING + executionMode: ExecutionMode = ExecutionMode.PIPELINED ) extends LazyLogging { @transient private lazy val operatorMap: Map[PhysicalOpIdentity, PhysicalOp] = @@ -246,7 +246,7 @@ case class PhysicalPlan( getOperator(physicalOp.id).isInputLinkDependee( link ) || getOperator(upstreamPhysicalOpId).isOutputLinkBlocking(link) - || executionMode == ExecutionMode.BATCH + || executionMode == ExecutionMode.MATERIALIZED ) } } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala index 1e68620ed5e..dc4edf27824 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala @@ -31,7 +31,7 @@ object WorkflowContext { val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L) val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings( dataTransferBatchSize = 400, - executionMode = ExecutionMode.STREAMING + executionMode = ExecutionMode.PIPELINED ) } class WorkflowContext( From f67eb2ecf50e5ba18d50257d9953a1e3bb0eb782 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 17 Jan 2026 22:01:48 -0800 Subject: [PATCH 20/22] rename --- frontend/src/app/common/service/gui-config.service.mock.ts | 2 +- frontend/src/app/common/type/workflow.ts | 4 ++-- .../app/dashboard/component/user-dashboard-test-fixtures.ts | 2 +- .../component/left-panel/settings/settings.component.html | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/frontend/src/app/common/service/gui-config.service.mock.ts b/frontend/src/app/common/service/gui-config.service.mock.ts index bccaf99a9f4..179259c5a97 100644 --- a/frontend/src/app/common/service/gui-config.service.mock.ts +++ b/frontend/src/app/common/service/gui-config.service.mock.ts @@ -43,7 +43,7 @@ export class MockGuiConfigService { productionSharedEditingServer: false, pythonLanguageServerPort: "3000", defaultDataTransferBatchSize: 100, - defaultExecutionMode: ExecutionMode.STREAMING, + defaultExecutionMode: ExecutionMode.PIPELINED, workflowEmailNotificationEnabled: false, sharingComputingUnitEnabled: false, operatorConsoleMessageBufferSize: 1000, diff --git a/frontend/src/app/common/type/workflow.ts b/frontend/src/app/common/type/workflow.ts index 8bc03eed8a4..8e1c1c7e85b 100644 --- a/frontend/src/app/common/type/workflow.ts +++ b/frontend/src/app/common/type/workflow.ts @@ -21,8 +21,8 @@ import { WorkflowMetadata } from "../../dashboard/type/workflow-metadata.interfa import { CommentBox, OperatorLink, OperatorPredicate, Point } from "../../workspace/types/workflow-common.interface"; export enum ExecutionMode { - STREAMING = "STREAMING", - BATCH = "BATCH", + PIPELINED = "PIPELINED", + MATERIALIZED = "MATERIALIZED", } export interface WorkflowSettings { diff --git a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts index c5a7d9804ad..09a158e336f 100644 --- a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts +++ b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts @@ -39,7 +39,7 @@ export const testWorkflowContent = (operatorTypes: string[]): WorkflowContent => commentBoxes: [], links: [], operatorPositions: {}, - settings: { dataTransferBatchSize: 400, executionMode: ExecutionMode.STREAMING }, + settings: { dataTransferBatchSize: 400, executionMode: ExecutionMode.PIPELINED }, }); export const testWorkflow1: Workflow = { diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html index 2b9cad3f8a0..25330bfbf9a 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html @@ -24,13 +24,13 @@
From 722fab1593ffd9ffe32b412a86f6078787199a32 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 17 Jan 2026 22:02:59 -0800 Subject: [PATCH 21/22] rename --- .../component/left-panel/settings/settings.component.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html index 25330bfbf9a..ceeea480af7 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html @@ -25,13 +25,13 @@ Pipelined
Materialized
From 6195115425dede954e60d919f3fc61cc4164176f Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 17 Jan 2026 22:03:52 -0800 Subject: [PATCH 22/22] rename --- common/config/src/main/resources/gui.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/config/src/main/resources/gui.conf b/common/config/src/main/resources/gui.conf index 5020c40b854..d58d94ac7b9 100644 --- a/common/config/src/main/resources/gui.conf +++ b/common/config/src/main/resources/gui.conf @@ -63,8 +63,8 @@ gui { default-data-transfer-batch-size = 400 default-data-transfer-batch-size = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_DATA_TRANSFER_BATCH_SIZE} - # default execution mode for workflows, can be either BATCH or STREAMING - default-execution-mode = STREAMING + # default execution mode for workflows, can be either MATERIALIZED or PIPELINED + default-execution-mode = PIPELINED default-execution-mode = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_EXECUTION_MODE} # whether selecting files from datasets instead of the local file system.