diff --git a/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto b/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto index 7e9c65b3a4a..1f6352e9e88 100644 --- a/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto +++ b/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto @@ -57,6 +57,7 @@ message ControlRequest { EmptyRequest emptyRequest = 56; PrepareCheckpointRequest prepareCheckpointRequest = 57; QueryStatisticsRequest queryStatisticsRequest = 58; + EndIterationRequest endIterationRequest = 59; // request for testing Ping ping = 100; @@ -271,4 +272,8 @@ message PrepareCheckpointRequest{ message QueryStatisticsRequest{ repeated core.ActorVirtualIdentity filterByWorkers = 1; +} + +message EndIterationRequest{ + core.ActorVirtualIdentity worker = 1 [(scalapb.field).no_box = true]; } \ No newline at end of file diff --git a/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/workerservice.proto b/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/workerservice.proto index 97e90b52e57..89b70af2ac6 100644 --- a/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/workerservice.proto +++ b/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/workerservice.proto @@ -47,6 +47,8 @@ service WorkerService { rpc EndWorker(EmptyRequest) returns (EmptyReturn); rpc StartChannel(EmptyRequest) returns (EmptyReturn); rpc EndChannel(EmptyRequest) returns (EmptyReturn); + rpc EndIteration(EndIterationRequest) returns (EmptyReturn); + rpc NextIteration(EmptyRequest) returns (EmptyReturn); rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn); rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue); rpc NoOperation(EmptyRequest) returns (EmptyReturn); diff --git a/core/amber/src/main/python/core/architecture/handlers/control/end_iteration_handler.py b/core/amber/src/main/python/core/architecture/handlers/control/end_iteration_handler.py new file mode 100644 index 00000000000..7351b6771ed --- /dev/null +++ b/core/amber/src/main/python/core/architecture/handlers/control/end_iteration_handler.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from core.architecture.handlers.control.control_handler_base import ControlHandler +from proto.edu.uci.ics.amber.engine.architecture.rpc import ( + EmptyReturn, + EndIterationRequest, +) +from core.models.internal_marker import EndIteration + + +class EndIterationHandler(ControlHandler): + async def end_iteration(self, req: EndIterationRequest) -> EmptyReturn: + self.context.tuple_processing_manager.current_internal_marker = EndIteration( + req.worker + ) + return EmptyReturn() diff --git a/core/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py b/core/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py index ef173cceffc..f52f3ad0a6d 100644 --- a/core/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py +++ b/core/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py @@ -27,6 +27,7 @@ ) from core.architecture.handlers.control.start_channel_handler import StartChannelHandler from core.architecture.handlers.control.end_channel_handler import EndChannelHandler +from core.architecture.handlers.control.end_iteration_handler import EndIterationHandler from core.architecture.handlers.control.end_worker_handler import EndWorkerHandler from core.architecture.handlers.control.evaluate_expression_handler import ( EvaluateExpressionHandler, @@ -63,6 +64,7 @@ class AsyncRPCHandlerInitializer( EndWorkerHandler, StartChannelHandler, EndChannelHandler, + EndIterationHandler, NoOperationHandler, ): pass diff --git a/core/amber/src/main/python/core/models/internal_marker.py b/core/amber/src/main/python/core/models/internal_marker.py index 6c9c80bafc4..1a6302f35a3 100644 --- a/core/amber/src/main/python/core/models/internal_marker.py +++ b/core/amber/src/main/python/core/models/internal_marker.py @@ -15,6 +15,9 @@ # specific language governing permissions and limitations # under the License. +from dataclasses import dataclass +from proto.edu.uci.ics.amber.core import ActorVirtualIdentity + class InternalMarker: """ @@ -31,3 +34,8 @@ class StartChannel(InternalMarker): class EndChannel(InternalMarker): pass + + +@dataclass +class EndIteration(InternalMarker): + worker: ActorVirtualIdentity diff --git a/core/amber/src/main/python/core/models/operator.py b/core/amber/src/main/python/core/models/operator.py index fcbf642575b..3c10a0b727d 100644 --- a/core/amber/src/main/python/core/models/operator.py +++ b/core/amber/src/main/python/core/models/operator.py @@ -64,6 +64,13 @@ def close(self) -> None: """ pass + def reset(self) -> None: + """ + Reset the operator to its initial state. + """ + self.close() + self.open() + def process_state(self, state: State, port: int) -> Optional[State]: """ Process an input State from the given link. @@ -238,6 +245,7 @@ def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike def on_finish(self, port: int) -> Iterator[Optional[TableLike]]: table = Table(self.__table_data[port]) + self.__table_data.clear() yield from self.process_table(table, port) @abstractmethod diff --git a/core/amber/src/main/python/core/runnables/data_processor.py b/core/amber/src/main/python/core/runnables/data_processor.py index 7130c7676b8..e06688ea77b 100644 --- a/core/amber/src/main/python/core/runnables/data_processor.py +++ b/core/amber/src/main/python/core/runnables/data_processor.py @@ -24,7 +24,7 @@ from typing import Iterator, Optional from core.architecture.managers import Context from core.models import ExceptionInfo, State, TupleLike, InternalMarker -from core.models.internal_marker import StartChannel, EndChannel +from core.models.internal_marker import StartChannel, EndChannel, EndIteration from core.models.table import all_output_to_tuple from core.util import Stoppable from core.util.console_message.replace_print import replace_print @@ -74,7 +74,9 @@ def process_internal_marker(self, internal_marker: InternalMarker) -> None: ): if isinstance(internal_marker, StartChannel): self._set_output_state(executor.produce_state_on_start(port_id)) - elif isinstance(internal_marker, EndChannel): + elif isinstance(internal_marker, EndChannel) or isinstance( + internal_marker, EndIteration + ): self._set_output_state(executor.produce_state_on_finish(port_id)) self._switch_context() self._set_output_tuple(executor.on_finish(port_id)) diff --git a/core/amber/src/main/python/core/runnables/main_loop.py b/core/amber/src/main/python/core/runnables/main_loop.py index 5ec02e81f56..2b4b0fd9027 100644 --- a/core/amber/src/main/python/core/runnables/main_loop.py +++ b/core/amber/src/main/python/core/runnables/main_loop.py @@ -31,7 +31,7 @@ InternalQueue, Tuple, ) -from core.models.internal_marker import StartChannel, EndChannel +from core.models.internal_marker import StartChannel, EndChannel, EndIteration from core.models.internal_queue import ( DataElement, DCMElement, @@ -55,6 +55,7 @@ EmbeddedControlMessage, AsyncRpcContext, ControlRequest, + EndIterationRequest, ) from proto.edu.uci.ics.amber.engine.architecture.worker import ( WorkerState, @@ -65,6 +66,7 @@ ChannelIdentity, EmbeddedControlMessageIdentity, ) +from core.util import set_one_of class MainLoop(StoppableQueueBlockingRunnable): @@ -284,6 +286,17 @@ def _process_end_channel(self) -> None: ) self.complete() + def _process_end_iteration(self) -> None: + worker_id = self.context.tuple_processing_manager.current_internal_marker.worker + self.process_input_state() + self.process_input_tuple() + self._send_ecm_to_data_channels( + "EndIteration", + EmbeddedControlMessageType.PORT_ALIGNMENT, + EndIterationRequest(worker_id), + ) + self.context.executor_manager.executor.reset() + def _process_ecm(self, ecm_element: ECMElement): """ Processes a received ECM and handles synchronization, @@ -335,21 +348,25 @@ def _process_ecm(self, ecm_element: ECMElement): { StartChannel: self._process_start_channel, EndChannel: self._process_end_channel, + EndIteration: self._process_end_iteration, }[type(self.context.tuple_processing_manager.current_internal_marker)]() def _send_ecm_to_data_channels( - self, method_name: str, alignment: EmbeddedControlMessageType + self, + method: str, + alignment: EmbeddedControlMessageType, + request: ControlRequest = EmptyRequest(), ) -> None: for active_channel_id in self.context.output_manager.get_output_channel_ids(): if not active_channel_id.is_control: ecm = EmbeddedControlMessage( - EmbeddedControlMessageIdentity(method_name), + EmbeddedControlMessageIdentity(method), alignment, [], { active_channel_id.to_worker_id.name: ControlInvocation( - method_name, - ControlRequest(empty_request=EmptyRequest()), + method, + set_one_of(ControlRequest, request), AsyncRpcContext( ActorVirtualIdentity(), ActorVirtualIdentity() ), diff --git a/core/amber/src/main/python/proto/edu/uci/ics/amber/engine/architecture/rpc/__init__.py b/core/amber/src/main/python/proto/edu/uci/ics/amber/engine/architecture/rpc/__init__.py index ac8e98914a1..e74f2e79071 100644 --- a/core/amber/src/main/python/proto/edu/uci/ics/amber/engine/architecture/rpc/__init__.py +++ b/core/amber/src/main/python/proto/edu/uci/ics/amber/engine/architecture/rpc/__init__.py @@ -140,6 +140,9 @@ class ControlRequest(betterproto.Message): query_statistics_request: "QueryStatisticsRequest" = betterproto.message_field( 58, group="sealed_value" ) + end_iteration_request: "EndIterationRequest" = betterproto.message_field( + 59, group="sealed_value" + ) ping: "Ping" = betterproto.message_field(100, group="sealed_value") """request for testing""" @@ -404,6 +407,11 @@ class QueryStatisticsRequest(betterproto.Message): ) +@dataclass(eq=False, repr=False) +class EndIterationRequest(betterproto.Message): + worker: "___core__.ActorVirtualIdentity" = betterproto.message_field(1) + + @dataclass(eq=False, repr=False) class ControlReturn(betterproto.Message): """The generic return message""" @@ -996,6 +1004,40 @@ async def end_channel( metadata=metadata, ) + async def end_iteration( + self, + end_iteration_request: "EndIterationRequest", + *, + timeout: Optional[float] = None, + deadline: Optional["Deadline"] = None, + metadata: Optional["MetadataLike"] = None + ) -> "EmptyReturn": + return await self._unary_unary( + "/edu.uci.ics.amber.engine.architecture.rpc.WorkerService/EndIteration", + end_iteration_request, + EmptyReturn, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + + async def next_iteration( + self, + empty_request: "EmptyRequest", + *, + timeout: Optional[float] = None, + deadline: Optional["Deadline"] = None, + metadata: Optional["MetadataLike"] = None + ) -> "EmptyReturn": + return await self._unary_unary( + "/edu.uci.ics.amber.engine.architecture.rpc.WorkerService/NextIteration", + empty_request, + EmptyReturn, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + async def debug_command( self, debug_command_request: "DebugCommandRequest", @@ -1551,6 +1593,14 @@ async def start_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": async def end_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def end_iteration( + self, end_iteration_request: "EndIterationRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def next_iteration(self, empty_request: "EmptyRequest") -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def debug_command( self, debug_command_request: "DebugCommandRequest" ) -> "EmptyReturn": @@ -1684,6 +1734,20 @@ async def __rpc_end_channel( response = await self.end_channel(request) await stream.send_message(response) + async def __rpc_end_iteration( + self, stream: "grpclib.server.Stream[EndIterationRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.end_iteration(request) + await stream.send_message(response) + + async def __rpc_next_iteration( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.next_iteration(request) + await stream.send_message(response) + async def __rpc_debug_command( self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" ) -> None: @@ -1810,6 +1874,18 @@ def __mapping__(self) -> Dict[str, grpclib.const.Handler]: EmptyRequest, EmptyReturn, ), + "/edu.uci.ics.amber.engine.architecture.rpc.WorkerService/EndIteration": grpclib.const.Handler( + self.__rpc_end_iteration, + grpclib.const.Cardinality.UNARY_UNARY, + EndIterationRequest, + EmptyReturn, + ), + "/edu.uci.ics.amber.engine.architecture.rpc.WorkerService/NextIteration": grpclib.const.Handler( + self.__rpc_next_iteration, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, + EmptyReturn, + ), "/edu.uci.ics.amber.engine.architecture.rpc.WorkerService/DebugCommand": grpclib.const.Handler( self.__rpc_debug_command, grpclib.const.Cardinality.UNARY_UNARY, diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index 023c6b5738f..c742b72c995 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -259,6 +259,10 @@ class OutputManager( outputIterator.appendSpecialTupleToEnd(FinalizeExecutor()) } + def finalizeIteration(worker: ActorVirtualIdentity): Unit = { + outputIterator.appendSpecialTupleToEnd(FinalizeIteration(worker)) + } + /** * This method is only used for ensuring correct region execution. Some operators may have input port dependency * relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionCoordinator` diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index d36fd8b1a1e..c9b30fd8e2a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -24,6 +24,7 @@ import edu.uci.ics.amber.core.executor.OperatorExecutor import edu.uci.ics.amber.core.state.State import edu.uci.ics.amber.core.tuple.{ FinalizeExecutor, + FinalizeIteration, FinalizePort, SchemaEnforceable, Tuple, @@ -62,9 +63,10 @@ import edu.uci.ics.amber.core.virtualidentity.{ EmbeddedControlMessageIdentity } import edu.uci.ics.amber.core.workflow.PortIdentity -import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import edu.uci.ics.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_END_CHANNEL -import io.grpc.MethodDescriptor +import edu.uci.ics.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{ + METHOD_END_CHANNEL, + METHOD_END_ITERATION +} import java.util.concurrent.LinkedBlockingQueue @@ -163,7 +165,7 @@ class DataProcessor( if (outputTuple == null) return outputTuple match { case FinalizeExecutor() => - sendECMToDataChannels(METHOD_END_CHANNEL, PORT_ALIGNMENT) + sendECMToDataChannels(METHOD_END_CHANNEL.getBareMethodName, PORT_ALIGNMENT) // Send Completed signal to worker actor. executor.close() adaptiveBatchingMonitor.stopAdaptiveBatching() @@ -185,6 +187,13 @@ class DataProcessor( PortCompletedRequest(portId, input), asyncRPCClient.mkContext(CONTROLLER) ) + case FinalizeIteration(worker: ActorVirtualIdentity) => + sendECMToDataChannels( + METHOD_END_ITERATION.getBareMethodName, + PORT_ALIGNMENT, + EndIterationRequest(worker) + ) + executor.reset() case schemaEnforceable: SchemaEnforceable => val portIdentity = outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity) val tuple = schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema) @@ -270,23 +279,54 @@ class DataProcessor( } } + def processOnStart(): Unit = { + val portId = inputGateway.getChannel(inputManager.currentChannelId).getPortId + try { + val outputState = executor.produceStateOnStart(portId.id) + if (outputState.isDefined) { + outputManager.emitState(outputState.get) + } + } catch safely { + case e => + handleExecutorException(e) + } + } + + def processOnFinish(): Unit = { + val portId = inputGateway.getChannel(inputManager.currentChannelId).getPortId + try { + val outputState = executor.produceStateOnFinish(portId.id) + if (outputState.isDefined) { + outputManager.emitState(outputState.get) + } + outputManager.outputIterator.setTupleOutput( + executor.onFinishMultiPort(portId.id) + ) + } catch safely { + case e => + // forward input tuple to the user and pause DP thread + handleExecutorException(e) + } + } + def sendECMToDataChannels( - method: MethodDescriptor[EmptyRequest, EmptyReturn], - alignment: EmbeddedControlMessageType + method: String, + alignment: EmbeddedControlMessageType, + request: ControlRequest = EmptyRequest() ): Unit = { outputManager.flush() outputGateway.getActiveChannels .filter(!_.isControl) .foreach { activeChannelId => asyncRPCClient.sendECMToChannel( - EmbeddedControlMessageIdentity(method.getBareMethodName), + EmbeddedControlMessageIdentity(method), alignment, Set(), Map( activeChannelId.toWorkerId.name -> ControlInvocation( - method.getBareMethodName, - EmptyRequest(), + method, + request, AsyncRPCContext(ActorVirtualIdentity(""), ActorVirtualIdentity("")), -1 ) @@ -296,7 +336,7 @@ class DataProcessor( } } - def handleExecutorException(e: Throwable): Unit = { + private[this] def handleExecutorException(e: Throwable): Unit = { asyncRPCClient.controllerInterface.consoleMessageTriggered( ConsoleMessageTriggeredRequest(mkConsoleMessage(actorId, e)), asyncRPCClient.mkContext(CONTROLLER) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala index bf5363e6e61..30c896a73a4 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala @@ -47,6 +47,8 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) with EndHandler with StartChannelHandler with EndChannelHandler + with EndIterationHandler + with NextIterationHandler with AssignPortHandler with AddInputChannelHandler with FlushNetworkBufferHandler diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala index 863dd59f5c7..465a1e179e0 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala @@ -20,11 +20,17 @@ package edu.uci.ics.amber.engine.architecture.worker.promisehandlers import com.twitter.util.Future -import edu.uci.ics.amber.core.tuple.FinalizePort -import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest} +import edu.uci.ics.amber.core.tuple.{FinalizeIteration, FinalizePort} +import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.PORT_ALIGNMENT +import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + EmptyRequest, + EndIterationRequest +} import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import edu.uci.ics.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_END_ITERATION import edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer -import edu.uci.ics.amber.error.ErrorUtils.safely +import edu.uci.ics.amber.operator.loop.LoopStartOpExec trait EndChannelHandler { this: DataProcessorRPCHandlerInitializer => @@ -37,20 +43,7 @@ trait EndChannelHandler { val portId = dp.inputGateway.getChannel(channelId).getPortId dp.inputManager.getPort(portId).completed = true dp.inputManager.initBatch(channelId, Array.empty) - try { - val outputState = dp.executor.produceStateOnFinish(portId.id) - if (outputState.isDefined) { - dp.outputManager.emitState(outputState.get) - } - dp.outputManager.outputIterator.setTupleOutput( - dp.executor.onFinishMultiPort(portId.id) - ) - } catch safely { - case e => - // forward input tuple to the user and pause DP thread - dp.handleExecutorException(e) - } - + dp.processOnFinish() dp.outputManager.outputIterator.appendSpecialTupleToEnd( FinalizePort(portId, input = true) ) @@ -59,8 +52,12 @@ trait EndChannelHandler { // Need this check for handling input port dependency relationships. // See documentation of isMissingOutputPort if (!dp.outputManager.isMissingOutputPort) { - // assuming all the output ports finalize after all input ports are finalized. - dp.outputManager.finalizeOutput() + dp.executor match { + case executor: LoopStartOpExec if executor.checkCondition() => + dp.outputManager.finalizeIteration(dp.actorId) + case _ => + dp.outputManager.finalizeOutput() + } } } EmptyReturn() diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala new file mode 100644 index 00000000000..cca2d0a8f14 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.engine.architecture.worker.promisehandlers + +import com.twitter.util.Future +import edu.uci.ics.amber.core.tuple.FinalizeIteration +import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + EmptyRequest, + EndIterationRequest +} +import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import edu.uci.ics.amber.operator.loop.LoopEndOpExec + +trait EndIterationHandler { + this: DataProcessorRPCHandlerInitializer => + + override def endIteration( + request: EndIterationRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + dp.executor match { + case _: LoopEndOpExec => + workerInterface.nextIteration(EmptyRequest(), mkContext(request.worker)) + case _ => + dp.processOnFinish() + dp.outputManager.finalizeIteration(request.worker) + } + EmptyReturn() + } +} diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala new file mode 100644 index 00000000000..5a55a2eb615 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.engine.architecture.worker.promisehandlers + +import com.twitter.util.Future +import edu.uci.ics.amber.core.tuple.FinalizeIteration +import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.PORT_ALIGNMENT +import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + EmptyRequest, + EndIterationRequest +} +import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import edu.uci.ics.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_END_ITERATION +import edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import edu.uci.ics.amber.operator.loop.LoopStartOpExec + +trait NextIterationHandler { + this: DataProcessorRPCHandlerInitializer => + + override def nextIteration( + request: EmptyRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + dp.processOnFinish() + if (dp.executor.asInstanceOf[LoopStartOpExec].checkCondition()) { + dp.outputManager.finalizeIteration(dp.actorId) + } else { + dp.outputManager.finalizeOutput() + } + EmptyReturn() + } +} diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala index 36918015388..227c029290a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala @@ -25,7 +25,6 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContex import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn import edu.uci.ics.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_START_CHANNEL import edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer -import edu.uci.ics.amber.error.ErrorUtils.safely trait StartChannelHandler { this: DataProcessorRPCHandlerInitializer => @@ -34,17 +33,8 @@ trait StartChannelHandler { request: EmptyRequest, ctx: AsyncRPCContext ): Future[EmptyReturn] = { - val portId = dp.inputGateway.getChannel(dp.inputManager.currentChannelId).getPortId - dp.sendECMToDataChannels(METHOD_START_CHANNEL, NO_ALIGNMENT) - try { - val outputState = dp.executor.produceStateOnStart(portId.id) - if (outputState.isDefined) { - dp.outputManager.emitState(outputState.get) - } - } catch safely { - case e => - dp.handleExecutorException(e) - } + dp.sendECMToDataChannels(METHOD_START_CHANNEL.getBareMethodName, NO_ALIGNMENT) + dp.processOnStart() EmptyReturn() } } diff --git a/core/gui/src/assets/operator_images/LoopEnd.png b/core/gui/src/assets/operator_images/LoopEnd.png new file mode 100644 index 00000000000..ee0f9ab6fac Binary files /dev/null and b/core/gui/src/assets/operator_images/LoopEnd.png differ diff --git a/core/gui/src/assets/operator_images/LoopStart.png b/core/gui/src/assets/operator_images/LoopStart.png new file mode 100644 index 00000000000..7e5be023cdf Binary files /dev/null and b/core/gui/src/assets/operator_images/LoopStart.png differ diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OperatorExecutor.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OperatorExecutor.scala index a3ef8ea917c..16706a59c21 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OperatorExecutor.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OperatorExecutor.scala @@ -56,4 +56,9 @@ trait OperatorExecutor { def close(): Unit = {} + def reset(): Unit = { + close() + open() + } + } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleLike.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleLike.scala index 0cf030f964a..c0053fb1948 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleLike.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleLike.scala @@ -19,6 +19,7 @@ package edu.uci.ics.amber.core.tuple +import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity import edu.uci.ics.amber.core.workflow.PortIdentity import scala.jdk.CollectionConverters.CollectionHasAsScala @@ -41,6 +42,7 @@ trait InternalMarker extends TupleLike { final case class FinalizePort(portId: PortIdentity, input: Boolean) extends InternalMarker final case class FinalizeExecutor() extends InternalMarker +final case class FinalizeIteration(worker: ActorVirtualIdentity) extends InternalMarker trait SeqTupleLike extends TupleLike with SchemaEnforceable { override def inMemSize: Long = ??? diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala index 95bb2f87d61..69dd89d9d45 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala @@ -50,6 +50,7 @@ import edu.uci.ics.amber.operator.intersect.IntersectOpDesc import edu.uci.ics.amber.operator.intervalJoin.IntervalJoinOpDesc import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc import edu.uci.ics.amber.operator.limit.LimitOpDesc +import edu.uci.ics.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import edu.uci.ics.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc import edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{ SklearnAdvancedKNNClassifierTrainerOpDesc, @@ -216,6 +217,8 @@ trait StateTransferFunc new Type(value = classOf[AsterixDBSourceOpDesc], name = "AsterixDBSource"), new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), new Type(value = classOf[LimitOpDesc], name = "Limit"), + new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"), + new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"), new Type(value = classOf[SleepOpDesc], name = "Sleep"), new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala new file mode 100644 index 00000000000..e86f27d3ab2 --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.operator.loop + +import edu.uci.ics.amber.core.executor.OpExecWithClassName +import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import edu.uci.ics.amber.operator.LogicalOp + +class LoopEndOpDesc extends LogicalOp { + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName("edu.uci.ics.amber.operator.loop.LoopEndOpExec") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Loop End", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) +} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpExec.scala new file mode 100644 index 00000000000..ad201f9c30a --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpExec.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.operator.loop + +import edu.uci.ics.amber.core.executor.OperatorExecutor +import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} + +class LoopEndOpExec extends OperatorExecutor { + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) +} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala new file mode 100644 index 00000000000..561da5fee88 --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import edu.uci.ics.amber.core.executor.OpExecWithClassName +import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import edu.uci.ics.amber.operator.LogicalOp +import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import edu.uci.ics.amber.util.JSONUtils.objectMapper + +class LoopStartOpDesc extends LogicalOp { + + @JsonProperty(required = true) + @JsonSchemaTitle("Iteration Number") + var iteration: Int = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "edu.uci.ics.amber.operator.loop.LoopStartOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Loop Start", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + +} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala new file mode 100644 index 00000000000..edc4b049247 --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.operator.loop + +import edu.uci.ics.amber.core.executor.OperatorExecutor +import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} +import edu.uci.ics.amber.util.JSONUtils.objectMapper +import scala.collection.mutable + +class LoopStartOpExec(descString: String) extends OperatorExecutor { + private val desc: LoopStartOpDesc = objectMapper.readValue(descString, classOf[LoopStartOpDesc]) + private val data = new mutable.ArrayBuffer[Tuple] + private var currentIteration = 0 + + def checkCondition(): Boolean = { + desc.iteration > currentIteration + } + + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + data.append(tuple) + Iterator.empty + } + + override def onFinish(port: Int): Iterator[TupleLike] = { + currentIteration += 1 + data.iterator + } + +}