From 0279ec77fedfc9436026fa8aa25119d1eb492386 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Sun, 25 May 2025 17:04:33 -0700 Subject: [PATCH 01/29] add env vars for cluster mode --- .../amber/src/main/resources/application.conf | 11 +++++ .../ics/amber/engine/common/AmberConfig.scala | 6 +++ .../amber/engine/common/AmberRuntime.scala | 45 ++++++++----------- .../ics/texera/web/ComputingUnitMaster.scala | 33 ++------------ .../ics/texera/web/ComputingUnitWorker.scala | 26 +---------- 5 files changed, 41 insertions(+), 80 deletions(-) diff --git a/core/amber/src/main/resources/application.conf b/core/amber/src/main/resources/application.conf index aea7d2dfc39..1c01f9cacc1 100644 --- a/core/amber/src/main/resources/application.conf +++ b/core/amber/src/main/resources/application.conf @@ -57,6 +57,17 @@ reconfiguration { enable-transactional-reconfiguration = ${?RECONFIGURATION_ENABLE_TRANSACTIONAL_RECONFIGURATION} } +clustering{ + enabled = false + enabled = ${?CLUSTERING_ENABLED} + + master-ip-address = "localhost" + master-ip-address = ${?CLUSTERING_MASTER_IP_ADDRESS} + + master-port = 2552 + master-port = ${?CLUSTERING_MASTER_PORT} +} + cache { # [false, true] enabled = true diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberConfig.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberConfig.scala index cdf2fcf452f..cafc414db9d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberConfig.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberConfig.scala @@ -60,6 +60,12 @@ object AmberConfig { val creditPollingIntervalInMs: Int = getConfSource.getInt("flow-control.credit-poll-interval-in-ms") + + // clustering + val amberClusterEnabled: Boolean = getConfSource.getBoolean("clustering.enabled") + val masterIpAddress: String = getConfSource.getString("clustering.master-ip-address") + val masterPort: Int = getConfSource.getInt("clustering.master-port") + // Network buffering val defaultDataTransferBatchSize: Int = getConfSource.getInt("network-buffering.default-data-transfer-batch-size") diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala index 8efc76e42f1..34d2109d84f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala @@ -26,8 +26,6 @@ import com.typesafe.config.{Config, ConfigFactory} import edu.uci.ics.amber.clustering.ClusterListener import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor -import java.io.{BufferedReader, InputStreamReader} -import java.net.URL import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.FiniteDuration @@ -61,54 +59,47 @@ object AmberRuntime { _actorSystem.scheduler.scheduleWithFixedDelay(initialDelay, delay)(() => call) } - private def getNodeIpAddress: String = { - try { - val query = new URL("http://checkip.amazonaws.com") - val in = new BufferedReader(new InputStreamReader(query.openStream())) - in.readLine() - } catch { - case e: Exception => throw e - } - } - def startActorMaster(clusterMode: Boolean): Unit = { - var localIpAddress = "localhost" + var masterIpAddress = "localhost" + var masterPort = 2552 if (clusterMode) { - localIpAddress = getNodeIpAddress + masterIpAddress = AmberConfig.masterIpAddress + masterPort = AmberConfig.masterPort } val masterConfig = ConfigFactory .parseString(s""" - akka.remote.artery.canonical.port = 2552 - akka.remote.artery.canonical.hostname = $localIpAddress - akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ] + akka.remote.artery.canonical.port = $masterPort + akka.remote.artery.canonical.hostname = $masterIpAddress + akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ] """) .withFallback(akkaConfig) .resolve() - AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress) + AmberConfig.masterNodeAddr = createMasterAddress(masterIpAddress, masterPort) createAmberSystem(masterConfig) } def akkaConfig: Config = ConfigFactory.load("cluster").withFallback(defaultApplication()).resolve() - private def createMasterAddress(addr: String): Address = Address("akka", "Amber", addr, 2552) + private def createMasterAddress(addr: String, port:Int): Address = Address("akka", "Amber", addr, port) - def startActorWorker(mainNodeAddress: Option[String]): Unit = { - val addr = mainNodeAddress.getOrElse("localhost") - var localIpAddress = "localhost" - if (mainNodeAddress.isDefined) { - localIpAddress = getNodeIpAddress + def startActorWorker(clusterMode:Boolean): Unit = { + var masterIpAddress = "localhost" + var masterPort = 2552 + if (clusterMode) { + masterIpAddress = AmberConfig.masterIpAddress + masterPort = AmberConfig.masterPort } val workerConfig = ConfigFactory .parseString(s""" - akka.remote.artery.canonical.hostname = $localIpAddress + akka.remote.artery.canonical.hostname = $masterIpAddress akka.remote.artery.canonical.port = 0 - akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ] + akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ] """) .withFallback(akkaConfig) .resolve() - AmberConfig.masterNodeAddr = createMasterAddress(addr) + AmberConfig.masterNodeAddr = createMasterAddress(masterIpAddress, masterPort) createAmberSystem(workerConfig) } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala index 1c14caf9240..721e0efa3a1 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala @@ -22,35 +22,30 @@ package edu.uci.ics.texera.web import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig} +import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig -import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{ - COMPLETED, - FAILED -} +import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{COMPLETED, FAILED} import edu.uci.ics.amber.engine.common.AmberRuntime.scheduleRecurringCallThroughActorSystem import edu.uci.ics.amber.engine.common.Utils.{maptoStatusCode, objectMapper} import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime, Utils} -import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity import edu.uci.ics.texera.auth.SessionUser import edu.uci.ics.texera.dao.SqlServer -import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions +import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth import edu.uci.ics.texera.web.resource.WorkflowWebsocketResource import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService import io.dropwizard.Configuration import io.dropwizard.setup.{Bootstrap, Environment} import io.dropwizard.websockets.WebsocketBundle -import org.apache.commons.jcs3.access.exception.InvalidArgumentException import org.eclipse.jetty.server.session.SessionHandler import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter import java.net.URI import java.time.Duration -import scala.annotation.tailrec import scala.concurrent.duration.DurationInt object ComputingUnitMaster { @@ -70,29 +65,9 @@ object ComputingUnitMaster { ) } - type OptionMap = Map[Symbol, Any] - - def parseArgs(args: Array[String]): OptionMap = { - @tailrec - def nextOption(map: OptionMap, list: List[String]): OptionMap = { - list match { - case Nil => map - case "--cluster" :: value :: tail => - nextOption(map ++ Map(Symbol("cluster") -> value.toBoolean), tail) - case option :: tail => - throw new InvalidArgumentException("unknown command-line arg") - } - } - - nextOption(Map(), args.toList) - } - def main(args: Array[String]): Unit = { - val argMap = parseArgs(args) - - val clusterMode = argMap.get(Symbol("cluster")).asInstanceOf[Option[Boolean]].getOrElse(false) // start actor system master node - AmberRuntime.startActorMaster(clusterMode) + AmberRuntime.startActorMaster(AmberConfig.amberClusterEnabled) // start web server new ComputingUnitMaster().run( "server", diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala index 0830e242a9b..12182352c38 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala @@ -19,35 +19,13 @@ package edu.uci.ics.texera.web -import edu.uci.ics.amber.engine.common.AmberRuntime -import org.apache.commons.jcs3.access.exception.InvalidArgumentException - -import scala.annotation.tailrec +import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime} object ComputingUnitWorker { - type OptionMap = Map[Symbol, Any] - - def parseArgs(args: Array[String]): OptionMap = { - @tailrec - def nextOption(map: OptionMap, list: List[String]): OptionMap = { - list match { - case Nil => map - case "--serverAddr" :: value :: tail => - nextOption(map ++ Map(Symbol("serverAddr") -> value), tail) - case option :: tail => - throw new InvalidArgumentException("unknown command-line arg") - } - } - - nextOption(Map(), args.toList) - } - def main(args: Array[String]): Unit = { - val argMap = parseArgs(args) - // start actor system worker node - AmberRuntime.startActorWorker(argMap.get(Symbol("serverAddr")).asInstanceOf[Option[String]]) + AmberRuntime.startActorWorker(AmberConfig.amberClusterEnabled) } } From 133684b0a0a9c9a2d6c0bbb5a5bdc2ca79083c7f Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Sun, 25 May 2025 19:02:08 -0700 Subject: [PATCH 02/29] Update AmberRuntime.scala --- .../scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala index 34d2109d84f..92ca5e9d082 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala @@ -86,14 +86,16 @@ object AmberRuntime { def startActorWorker(clusterMode:Boolean): Unit = { var masterIpAddress = "localhost" + var nodeIpAddress = "localhost" var masterPort = 2552 if (clusterMode) { masterIpAddress = AmberConfig.masterIpAddress masterPort = AmberConfig.masterPort + nodeIpAddress = "0.0.0.0" } val workerConfig = ConfigFactory .parseString(s""" - akka.remote.artery.canonical.hostname = $masterIpAddress + akka.remote.artery.canonical.hostname = $nodeIpAddress akka.remote.artery.canonical.port = 0 akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ] """) From 21d9984bc9e9d542471a0d09ac6c31ce365c42ab Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Sun, 25 May 2025 20:48:43 -0700 Subject: [PATCH 03/29] Update AmberRuntime.scala --- .../edu/uci/ics/amber/engine/common/AmberRuntime.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala index 92ca5e9d082..dfb07053dce 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala @@ -28,6 +28,7 @@ import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorAct import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.FiniteDuration +import scala.sys.process._ object AmberRuntime { @@ -86,16 +87,16 @@ object AmberRuntime { def startActorWorker(clusterMode:Boolean): Unit = { var masterIpAddress = "localhost" - var nodeIpAddress = "localhost" var masterPort = 2552 + var nodeIp = "localhost" if (clusterMode) { masterIpAddress = AmberConfig.masterIpAddress masterPort = AmberConfig.masterPort - nodeIpAddress = "0.0.0.0" + nodeIp = "hostname -i".!!.trim // only supported by linux/unix } val workerConfig = ConfigFactory .parseString(s""" - akka.remote.artery.canonical.hostname = $nodeIpAddress + akka.remote.artery.canonical.hostname = $nodeIp akka.remote.artery.canonical.port = 0 akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ] """) From dbdcd02e321ffb127a6b21ee40cb02f6a56fb968 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Mon, 26 May 2025 00:32:23 -0700 Subject: [PATCH 04/29] update --- .../src/main/resources/kubernetes.conf | 7 +- .../ics/texera/service/KubernetesConfig.scala | 3 +- .../ComputingUnitManagingResource.scala | 77 +++++++--- .../service/util/KubernetesClient.scala | 138 +++++++++++++++++- .../computing-unit-selection.component.html | 22 +++ .../computing-unit-selection.component.scss | 8 +- .../computing-unit-selection.component.ts | 4 +- ...orkflow-computing-unit-managing.service.ts | 6 +- 8 files changed, 237 insertions(+), 28 deletions(-) diff --git a/core/computing-unit-managing-service/src/main/resources/kubernetes.conf b/core/computing-unit-managing-service/src/main/resources/kubernetes.conf index 45171ad2ac7..c5f336e9186 100644 --- a/core/computing-unit-managing-service/src/main/resources/kubernetes.conf +++ b/core/computing-unit-managing-service/src/main/resources/kubernetes.conf @@ -22,8 +22,11 @@ kubernetes { compute-unit-service-name = "workflow-computing-unit-svc" compute-unit-service-name = ${?KUBERNETES_COMPUTE_UNIT_SERVICE_NAME} - image-name = "bobbai/texera-workflow-computing-unit:dev" - image-name = ${?KUBERNETES_IMAGE_NAME} + master-image-name = "bobbai/texera-workflow-computing-unit:dev" + master-image-name = ${?KUBERNETES_MASTER_IMAGE_NAME} + + worker-image-name = "bobbai/texera-workflow-computing-unit:dev" + worker-image-name = ${?KUBERNETES_WORKER_IMAGE_NAME} image-pull-policy = "Always" image-pull-policy = ${?KUBERNETES_IMAGE_PULL_POLICY} diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala index e37d8c5bbe0..9cd69e1f663 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala @@ -28,7 +28,8 @@ object KubernetesConfig { // Access the Kubernetes settings with environment variable fallback val computeUnitServiceName: String = conf.getString("kubernetes.compute-unit-service-name") val computeUnitPoolNamespace: String = conf.getString("kubernetes.compute-unit-pool-namespace") - val computeUnitImageName: String = conf.getString("kubernetes.image-name") + val computeUnitMasterImageName: String = conf.getString("kubernetes.master-image-name") + val computeUnitWorkerImageName: String = conf.getString("kubernetes.worker-image-name") val computingUnitImagePullPolicy: String = conf.getString("kubernetes.image-pull-policy") val computeUnitPortNumber: Int = conf.getInt("kubernetes.port-num") diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala index 9dc238b8d3e..77a62fe8503 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala @@ -93,7 +93,8 @@ object ComputingUnitManagingResource { cpuLimit: String, memoryLimit: String, gpuLimit: String, - jvmMemorySize: String + jvmMemorySize: String, + numNodes: Int ) case class WorkflowComputingUnitResourceLimit( @@ -231,7 +232,12 @@ class ComputingUnitManagingResource { val computingUnit = new WorkflowComputingUnit() val userToken = JwtAuth.jwtToken(jwtClaims(user.user, dayToMin(TOKEN_EXPIRE_TIME_IN_DAYS))) computingUnit.setUid(user.getUid) - computingUnit.setName(param.name) + val name = if(param.numNodes > 0){ + param.name + "-cluster" + }else{ + param.name + } + computingUnit.setName(name) computingUnit.setCreationTime(new Timestamp(System.currentTimeMillis())) // Insert using the DAO @@ -241,17 +247,30 @@ class ComputingUnitManagingResource { val cuid = ctx.lastID().intValue() val insertedUnit = wcDao.fetchOneByCuid(cuid) - // Create the pod with the generated CUID - val pod = KubernetesClient.createPod( - cuid, - param.cpuLimit, - param.memoryLimit, - param.gpuLimit, - computingUnitEnvironmentVariables ++ Map( - EnvironmentalVariable.ENV_USER_JWT_TOKEN -> userToken, - EnvironmentalVariable.ENV_JAVA_OPTS -> s"-Xmx${param.jvmMemorySize}" + val pod = if(param.numNodes > 0){ + KubernetesClient.createCluster( + cuid, + param.cpuLimit, + param.memoryLimit, + param.numNodes, + computingUnitEnvironmentVariables ++ Map( + EnvironmentalVariable.ENV_USER_JWT_TOKEN -> userToken, + EnvironmentalVariable.ENV_JAVA_OPTS -> s"-Xmx${param.jvmMemorySize}" + ) ) - ) + }else{ + // Create the pod with the generated CUID + KubernetesClient.createPod( + cuid, + param.cpuLimit, + param.memoryLimit, + param.gpuLimit, + computingUnitEnvironmentVariables ++ Map( + EnvironmentalVariable.ENV_USER_JWT_TOKEN -> userToken, + EnvironmentalVariable.ENV_JAVA_OPTS -> s"-Xmx${param.jvmMemorySize}" + ) + ) + } // Return the dashboard response DashboardWorkflowComputingUnit( @@ -286,14 +305,28 @@ class ComputingUnitManagingResource { .filter(_.getTerminateTime == null) // Filter out terminated units units.map { unit => + val isCluster = unit.getName.endsWith("-cluster") val cuid = unit.getCuid.intValue() val podName = KubernetesClient.generatePodName(cuid) val pod = KubernetesClient.getPodByName(podName) + val status = if(isCluster){ + // master pod (Option[Pod]) + all worker pods + val phases = (pod.toSeq ++ KubernetesClient.getClusterPodsById(cuid)) + .map(_.getStatus.getPhase) // Seq[String] like Seq("Running", "Running", ...) + + phases.distinct match { + case Seq(singlePhase) => singlePhase // all identical → return it + case _ => "Unknown" // mixed or empty + } + }else{ + pod.map(_.getStatus.getPhase).getOrElse("Unknown") + } + DashboardWorkflowComputingUnit( computingUnit = unit, uri = KubernetesClient.generatePodURI(cuid), - status = pod.map(_.getStatus.getPhase).getOrElse("Unknown"), + status = status, metrics = getComputingUnitMetrics(cuid), resourceLimits = getComputingUnitResourceLimit(cuid) ) @@ -323,16 +356,20 @@ class ComputingUnitManagingResource { .build() } - KubernetesClient.deletePod(cuid) - - // If successful, update the database withTransaction(context) { ctx => val cuDao = new WorkflowComputingUnitDao(ctx.configuration()) - val units = cuDao.fetchByCuid(cuid) - - units.forEach(unit => unit.setTerminateTime(new Timestamp(System.currentTimeMillis()))) - cuDao.update(units) + val cu = cuDao.fetchOneByCuid(cuid) + val isCluster = cu.getName.endsWith("-cluster") + if(isCluster){ + KubernetesClient.deleteCluster(cuid) + }else{ + KubernetesClient.deletePod(cuid) + } + // If successful, update the database + cu.setTerminateTime(new Timestamp(System.currentTimeMillis())) + cuDao.update(cu) } + Response.ok().build() } diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala index 32e986d919b..d6db81a93c3 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala @@ -21,6 +21,7 @@ package edu.uci.ics.texera.service.util import edu.uci.ics.texera.service.KubernetesConfig import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.apps.{StatefulSet, StatefulSetBuilder, StatefulSetSpecBuilder} import io.fabric8.kubernetes.api.model.metrics.v1beta1.PodMetricsList import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientBuilder} @@ -39,10 +40,26 @@ object KubernetesClient { def generatePodName(cuid: Int): String = s"$podNamePrefix-$cuid" + def generateClusterMasterServiceName(cuid:Int) = s"${generatePodName(cuid)}-master" + + def generateStatefulSetName(cuid: Int): String = s"${generatePodName(cuid)}-workers" + def getPodByName(podName: String): Option[Pod] = { Option(client.pods().inNamespace(namespace).withName(podName).get()) } + def getClusterPodsById(cuid: Int): Array[Pod] = { + client + .pods() + .inNamespace(namespace) + .withLabel("type", "computing-unit") + .withLabel("cuid", cuid.toString) + .list() + .getItems + .asScala + .toArray + } + def getPodMetrics(cuid: Int): Map[String, String] = { val podMetricsList: PodMetricsList = client.top().pods().metrics(namespace) val targetPodName = generatePodName(cuid) @@ -74,6 +91,118 @@ object KubernetesClient { .getOrElse(Map.empty[String, String]) } + + // --------------------------------------------------------------------------- + // Cluster lifecycle helpers + // --------------------------------------------------------------------------- + def createCluster( + cuid: Int, + cpuLimit: String, + memoryLimit: String, + numNodes: Int, + envVars: Map[String, Any] + ): Pod = { + val masterIp = generatePodURI(cuid) + val enrichedEnv = envVars ++ Map( + "CLUSTERING_ENABLED" -> "true", + "CLUSTERING_MASTER_IP_ADDRESS" -> masterIp + ) + + val master = createPod(cuid, cpuLimit, memoryLimit, gpuLimit = "0", enrichedEnv) + createClusterMasterService(cuid) + createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes, enrichedEnv) + master // return master pod + } + + def deleteCluster(cuid: Int): Unit = { + deletePod(cuid) + deleteClusterMasterService(cuid) + deleteStatefulSet(cuid) + } + + // --------------------------------------------------------------------------- + // Kubernetes resource creators + // --------------------------------------------------------------------------- + + private def createClusterMasterService(cuid: Int): Service = { + val serviceName = generateClusterMasterServiceName(cuid) + val service = new ServiceBuilder() + .withNewMetadata() + .withName(serviceName) + .withNamespace(namespace) + .endMetadata() + .withNewSpec() + .withClusterIP("None") // headless for DNS discovery + .addNewPort() + .withPort(2552) + .endPort() + .addToSelector("type", "computing-unit") + .addToSelector("cuid", cuid.toString) + .addToSelector("role", "master") + .endSpec() + .build() + + client.services().inNamespace(namespace).create(service) + } + + private def createStatefulSet( + cuid: Int, + cpuLimit: String, + memoryLimit: String, + numNodes: Int, + envVars: Map[String, Any] + ): StatefulSet = { + val envList = envVars.map { case (k, v) => + new EnvVarBuilder().withName(k).withValue(v.toString).build() + }.toList.asJava + + val resources = new ResourceRequirementsBuilder() + .addToLimits("cpu", new Quantity(cpuLimit)) + .addToLimits("memory", new Quantity(memoryLimit)) + .build() + + val container = new ContainerBuilder() + .withName("computing-unit-worker") + .withImage(KubernetesConfig.computeUnitWorkerImageName) + .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy) + .addNewPort().withContainerPort(KubernetesConfig.computeUnitPortNumber).endPort() + .withEnv(envList) + .withResources(resources) + .build() + + val sts = new StatefulSetBuilder() + .withNewMetadata() + .withName(generateStatefulSetName(cuid)) + .withNamespace(namespace) + .endMetadata() + .withSpec( + new StatefulSetSpecBuilder() + .withServiceName(generatePodName(cuid)) + .withReplicas(numNodes) + .withSelector( + new LabelSelectorBuilder() + .addToMatchLabels("type", "computing-unit") + .addToMatchLabels("cuid", cuid.toString) + .addToMatchLabels("role", "worker") + .build() + ) + .withNewTemplate() + .withNewMetadata() + .addToLabels("type", "computing-unit") + .addToLabels("cuid", cuid.toString) + .addToLabels("role", "worker") + .endMetadata() + .withNewSpec() + .withContainers(container) + .endSpec() + .endTemplate() + .build() + ) + .build() + + client.apps().statefulSets().inNamespace(namespace).create(sts) + } + def createPod( cuid: Int, cpuLimit: String, @@ -116,6 +245,7 @@ object KubernetesClient { .addToLabels("type", "computing-unit") .addToLabels("cuid", cuid.toString) .addToLabels("name", podName) + .addToLabels("role", "master") // Start building the pod spec val specBuilder = podBuilder @@ -131,7 +261,7 @@ object KubernetesClient { val pod = specBuilder .addNewContainer() .withName("computing-unit-master") - .withImage(KubernetesConfig.computeUnitImageName) + .withImage(KubernetesConfig.computeUnitMasterImageName) .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy) .addNewPort() .withContainerPort(KubernetesConfig.computeUnitPortNumber) @@ -150,4 +280,10 @@ object KubernetesClient { def deletePod(cuid: Int): Unit = { client.pods().inNamespace(namespace).withName(generatePodName(cuid)).delete() } + + private def deleteClusterMasterService(cuid: Int): Unit = + client.services().inNamespace(namespace).withName(generateClusterMasterServiceName(cuid)).delete() + + private def deleteStatefulSet(cuid: Int): Unit = + client.apps().statefulSets().inNamespace(namespace).withName(generateStatefulSetName(cuid)).delete() } diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html index c9b554ed67f..aeab2677769 100644 --- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -208,6 +208,28 @@ +
+ Select #Worker Nodes + + + + +
+ +
+ + +
+
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.scss b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.scss index 22f63167fa7..90a2d650edb 100644 --- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.scss +++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.scss @@ -170,10 +170,16 @@ .memory-selection, .cpu-selection, -.gpu-selection { +.gpu-selection, +.node-selection +{ width: 100%; } +.cluster-warning { + font-size: 0.9em; +} + .jvm-memory-slider { width: 100%; margin: 10px 0; diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.ts b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.ts index ea576386480..297ac572411 100644 --- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.ts +++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.ts @@ -53,6 +53,7 @@ export class ComputingUnitSelectionComponent implements OnInit, OnChanges { selectedCpu: string = ""; selectedGpu: string = "0"; // Default to no GPU selectedJvmMemorySize: string = "1G"; // Initial JVM memory size + selectedNumNodes: number = 0 // JVM memory slider configuration jvmMemorySliderValue: number = 1; // Initial value in GB @@ -65,6 +66,7 @@ export class ComputingUnitSelectionComponent implements OnInit, OnChanges { cpuOptions: string[] = []; memoryOptions: string[] = []; gpuOptions: string[] = []; // Add GPU options array + nodeOptions: number[] = [0,1,2,3,4,5,6,7,8] // Add property to track user-initiated termination private isUserTerminatingUnit = false; @@ -270,7 +272,7 @@ export class ComputingUnitSelectionComponent implements OnInit, OnChanges { const computeJvmMemory = this.selectedJvmMemorySize; this.computingUnitService - .createComputingUnit(computeUnitName, computeCPU, computeMemory, computeGPU, computeJvmMemory) + .createComputingUnit(computeUnitName, computeCPU, computeMemory, computeGPU, computeJvmMemory, this.selectedNumNodes) .pipe(untilDestroyed(this)) .subscribe({ next: (unit: DashboardWorkflowComputingUnit) => { diff --git a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts index b5664514d7b..90322a98a9f 100644 --- a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts +++ b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts @@ -43,6 +43,7 @@ export class WorkflowComputingUnitManagingService { * @param gpuLimit The gpu resource limit for the computing unit. * @param jvmMemorySize The JVM memory size (e.g. "1G", "2G") * @param unitType + * @param numNodes * @returns An Observable of the created WorkflowComputingUnit. */ public createComputingUnit( @@ -51,9 +52,10 @@ export class WorkflowComputingUnitManagingService { memoryLimit: string, gpuLimit: string = "0", jvmMemorySize: string = "1G", - unitType: string = "k8s_pod" + numNodes: number = 0, + unitType: string = "k8s_pod", ): Observable { - const body = { name, cpuLimit, memoryLimit, gpuLimit, jvmMemorySize, unitType }; + const body = { name, cpuLimit, memoryLimit, gpuLimit, jvmMemorySize, unitType, numNodes }; return this.http.post( `${AppSettings.getApiEndpoint()}/${COMPUTING_UNIT_CREATE_URL}`, From 632740cbea3d48bb3c4a42e5476b3e85de9055f8 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Mon, 26 May 2025 10:25:53 -0700 Subject: [PATCH 05/29] Update workflow-computing-unit-manager-service-account.yaml --- ...kflow-computing-unit-manager-service-account.yaml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-service-account.yaml b/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-service-account.yaml index 44dce0cf877..ef698226c23 100644 --- a/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-service-account.yaml +++ b/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-service-account.yaml @@ -29,11 +29,17 @@ metadata: namespace: {{ .Values.workflowComputingUnitPool.namespace }} rules: - apiGroups: [""] - resources: ["pods"] + resources: ["pods", "services"] # added services verbs: ["get", "list", "watch", "create", "delete"] - - apiGroups: ["metrics.k8s.io"] # Added metrics permissions + + - apiGroups: ["metrics.k8s.io"] resources: ["pods"] - verbs: ["list", "get"] # Added metrics permissions + verbs: ["get", "list"] + + - apiGroups: ["apps"] + resources: ["statefulsets"] + verbs: ["get", "list", "watch", "create", "delete"] + --- apiVersion: rbac.authorization.k8s.io/v1 From 092938d23e0f4fd33173d58ea67651dccaacbfd4 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Mon, 26 May 2025 10:54:39 -0700 Subject: [PATCH 06/29] Update workflow-computing-unit-manager-deployment.yaml --- .../workflow-computing-unit-manager-deployment.yaml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-deployment.yaml b/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-deployment.yaml index 2226bc67168..0762fb25dd2 100644 --- a/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-deployment.yaml +++ b/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-deployment.yaml @@ -45,8 +45,10 @@ spec: value: {{ .Values.workflowComputingUnitPool.namespace }} - name: KUBERNETES_COMPUTE_UNIT_SERVICE_NAME value: {{ .Values.workflowComputingUnitPool.name }}-svc - - name: KUBERNETES_IMAGE_NAME - value: {{ .Values.workflowComputingUnitPool.imageName }} + - name: KUBERNETES_MASTER_IMAGE_NAME + value: {{ .Values.workflowComputingUnitPool.masterImageName }} + - name: KUBERNETES_WORKER_IMAGE_NAME + value: {{ .Values.workflowComputingUnitPool.workerImageName }} # TexeraDB Access - name: STORAGE_JDBC_URL value: jdbc:postgresql://{{ .Release.Name }}-postgresql:5432/texera_db?currentSchema=texera_db,public From 9c6b74d19f24a84b4414045136b3750d4bdd0f8f Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Mon, 26 May 2025 11:28:32 -0700 Subject: [PATCH 07/29] update --- .../ComputingUnitManagingResource.scala | 7 ++- deployment/computing-unit-worker.dockerfile | 46 +++++++++++++++---- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala index 77a62fe8503..d3e53d2fade 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala @@ -311,13 +311,12 @@ class ComputingUnitManagingResource { val pod = KubernetesClient.getPodByName(podName) val status = if(isCluster){ - // master pod (Option[Pod]) + all worker pods val phases = (pod.toSeq ++ KubernetesClient.getClusterPodsById(cuid)) - .map(_.getStatus.getPhase) // Seq[String] like Seq("Running", "Running", ...) + .map(_.getStatus.getPhase) phases.distinct match { - case Seq(singlePhase) => singlePhase // all identical → return it - case _ => "Unknown" // mixed or empty + case Seq(singlePhase) => singlePhase // all identical + case _ => "Unknown" // mixed } }else{ pod.map(_.getStatus.getPhase).getOrElse("Unknown") diff --git a/deployment/computing-unit-worker.dockerfile b/deployment/computing-unit-worker.dockerfile index 5d744610eaf..075682fea15 100644 --- a/deployment/computing-unit-worker.dockerfile +++ b/deployment/computing-unit-worker.dockerfile @@ -43,23 +43,51 @@ FROM eclipse-temurin:11-jre-jammy AS runtime WORKDIR /core/amber +COPY --from=build /core/amber/r-requirements.txt /tmp/r-requirements.txt COPY --from=build /core/amber/requirements.txt /tmp/requirements.txt COPY --from=build /core/amber/operator-requirements.txt /tmp/operator-requirements.txt -# Install Python runtime and dependencies +# Install Python & R runtime dependencies RUN apt-get update && apt-get install -y \ python3-pip \ python3-dev \ libpq-dev \ + gfortran \ + curl \ + build-essential \ + libreadline-dev \ + libncurses-dev \ + libssl-dev \ + libxml2-dev \ + xorg-dev \ + libbz2-dev \ + liblzma-dev \ + libpcre++-dev \ + libpango1.0-dev \ + libcurl4-openssl-dev \ + unzip \ && apt-get clean -RUN pip3 install --upgrade pip setuptools wheel -RUN pip3 install python-lsp-server python-lsp-server[websockets] - -# Install requirements with a fallback for wordcloud -RUN pip3 install -r /tmp/requirements.txt -RUN pip3 install --no-cache-dir --find-links https://pypi.org/simple/ -r /tmp/operator-requirements.txt || \ - pip3 install --no-cache-dir wordcloud==1.9.2 +# Install R and needed libraries +ENV R_VERSION=4.3.3 +RUN curl -O https://cran.r-project.org/src/base/R-4/R-${R_VERSION}.tar.gz && \ + tar -xf R-${R_VERSION}.tar.gz && \ + cd R-${R_VERSION} && \ + ./configure --prefix=/usr/local \ + --enable-R-shlib \ + --with-blas \ + --with-lapack && \ + make -j 4 && \ + make install && \ + cd .. && \ + rm -rf R-${R_VERSION}* && R --version && pip3 install --upgrade pip setuptools wheel && \ + pip3 install -r /tmp/requirements.txt && \ + pip3 install -r /tmp/operator-requirements.txt && \ + pip3 install -r /tmp/r-requirements.txt +RUN Rscript -e "options(repos = c(CRAN = 'https://cran.r-project.org')); \ + install.packages(c('coro', 'arrow', 'dplyr'), \ + Ncpus = parallel::detectCores())" +ENV LD_LIBRARY_PATH=/usr/local/lib/R/lib:$LD_LIBRARY_PATH # Copy the built texera binary from the build phase COPY --from=build /core/amber/target/texera-0.1-SNAPSHOT /core/amber @@ -67,6 +95,8 @@ COPY --from=build /core/amber/target/texera-0.1-SNAPSHOT /core/amber COPY --from=build /core/amber/src/main/resources /core/amber/src/main/resources COPY --from=build /core/workflow-core/src/main/resources /core/workflow-core/src/main/resources COPY --from=build /core/file-service/src/main/resources /core/file-service/src/main/resources +# Copy code for python & R UDF +COPY --from=build /core/amber/src/main/python /core/amber/src/main/python CMD ["bin/computing-unit-worker"] From 0be9bd93d331ffeacba5eac16e71054a86643787 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Mon, 26 May 2025 13:58:46 -0700 Subject: [PATCH 08/29] update --- .../src/main/resources/kubernetes.conf | 3 + .../ics/texera/service/KubernetesConfig.scala | 1 + .../ComputingUnitManagingResource.scala | 6 +- .../service/util/KubernetesClient.scala | 83 +++++++++-- core/gui/src/app/app.module.ts | 132 +++++++++--------- .../computing-unit-selection.component.html | 59 +++++--- .../computing-unit-selection.component.ts | 4 +- ...orkflow-computing-unit-managing.service.ts | 4 +- 8 files changed, 189 insertions(+), 103 deletions(-) diff --git a/core/computing-unit-managing-service/src/main/resources/kubernetes.conf b/core/computing-unit-managing-service/src/main/resources/kubernetes.conf index c5f336e9186..c8b48ad8cfd 100644 --- a/core/computing-unit-managing-service/src/main/resources/kubernetes.conf +++ b/core/computing-unit-managing-service/src/main/resources/kubernetes.conf @@ -28,6 +28,9 @@ kubernetes { worker-image-name = "bobbai/texera-workflow-computing-unit:dev" worker-image-name = ${?KUBERNETES_WORKER_IMAGE_NAME} + storage-class-name = "standard" + storage-class-name = ${?KUBERNETES_STORAGE_CLASS_NAME} + image-pull-policy = "Always" image-pull-policy = ${?KUBERNETES_IMAGE_PULL_POLICY} diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala index 9cd69e1f663..e2ea126186b 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala @@ -31,6 +31,7 @@ object KubernetesConfig { val computeUnitMasterImageName: String = conf.getString("kubernetes.master-image-name") val computeUnitWorkerImageName: String = conf.getString("kubernetes.worker-image-name") val computingUnitImagePullPolicy: String = conf.getString("kubernetes.image-pull-policy") + val computingUnitStorageClassName: String = conf.getString("kubernetes.storage-class-name") val computeUnitPortNumber: Int = conf.getInt("kubernetes.port-num") diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala index d3e53d2fade..1dbaec51223 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala @@ -94,7 +94,8 @@ object ComputingUnitManagingResource { memoryLimit: String, gpuLimit: String, jvmMemorySize: String, - numNodes: Int + numNodes: Int, + diskLimit: String ) case class WorkflowComputingUnitResourceLimit( @@ -252,6 +253,7 @@ class ComputingUnitManagingResource { cuid, param.cpuLimit, param.memoryLimit, + param.diskLimit, param.numNodes, computingUnitEnvironmentVariables ++ Map( EnvironmentalVariable.ENV_USER_JWT_TOKEN -> userToken, @@ -259,7 +261,7 @@ class ComputingUnitManagingResource { ) ) }else{ - // Create the pod with the generated CUID + // Create the pod with the generated CUID (ignoring disk limit for now) KubernetesClient.createPod( cuid, param.cpuLimit, diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala index d6db81a93c3..93905ec2c9c 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala @@ -40,6 +40,8 @@ object KubernetesClient { def generatePodName(cuid: Int): String = s"$podNamePrefix-$cuid" + def generateVolumeName(cuid:Int) = s"${generatePodName(cuid)}-pvc" + def generateClusterMasterServiceName(cuid:Int) = s"${generatePodName(cuid)}-master" def generateStatefulSetName(cuid: Int): String = s"${generatePodName(cuid)}-workers" @@ -95,10 +97,45 @@ object KubernetesClient { // --------------------------------------------------------------------------- // Cluster lifecycle helpers // --------------------------------------------------------------------------- + + def createVolume(cuid: Int, diskLimit: String): Volume = { + val pvcName = generateVolumeName(cuid) + + // Build / create PVC if it doesn't exist yet + val pvc = new PersistentVolumeClaimBuilder() + .withNewMetadata() + .withName(pvcName) + .withNamespace(namespace) + .addToLabels("type", "computing-unit") + .addToLabels("cuid", cuid.toString) + .endMetadata() + .withNewSpec() + .withAccessModes("ReadWriteOnce") + .withNewResources() + .addToRequests("storage", new Quantity(diskLimit)) + .endResources() + .withStorageClassName(KubernetesConfig.computingUnitStorageClassName) + .endSpec() + .build() + + // idempotent create / update + client.persistentVolumeClaims().inNamespace(namespace).create(pvc) + + // Return a Volume that points to the PVC so callers can mount it + new VolumeBuilder() + .withName(pvcName) + .withNewPersistentVolumeClaim() + .withClaimName(pvcName) + .endPersistentVolumeClaim() + .build() + } + + def createCluster( cuid: Int, cpuLimit: String, memoryLimit: String, + diskLimit: String, numNodes: Int, envVars: Map[String, Any] ): Pod = { @@ -107,10 +144,10 @@ object KubernetesClient { "CLUSTERING_ENABLED" -> "true", "CLUSTERING_MASTER_IP_ADDRESS" -> masterIp ) - - val master = createPod(cuid, cpuLimit, memoryLimit, gpuLimit = "0", enrichedEnv) + val volume = createVolume(cuid, diskLimit) + val master = createPod(cuid, cpuLimit, memoryLimit, gpuLimit = "0", enrichedEnv, Some(volume)) createClusterMasterService(cuid) - createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes, enrichedEnv) + createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes, enrichedEnv, volume) master // return master pod } @@ -118,6 +155,7 @@ object KubernetesClient { deletePod(cuid) deleteClusterMasterService(cuid) deleteStatefulSet(cuid) + deleteVolume(cuid) } // --------------------------------------------------------------------------- @@ -150,7 +188,8 @@ object KubernetesClient { cpuLimit: String, memoryLimit: String, numNodes: Int, - envVars: Map[String, Any] + envVars: Map[String, Any], + volume: Volume ): StatefulSet = { val envList = envVars.map { case (k, v) => new EnvVarBuilder().withName(k).withValue(v.toString).build() @@ -165,6 +204,7 @@ object KubernetesClient { .withName("computing-unit-worker") .withImage(KubernetesConfig.computeUnitWorkerImageName) .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy) + .addNewVolumeMount().withName(volume.getName).withMountPath("/core/amber/user-resources").endVolumeMount() .addNewPort().withContainerPort(KubernetesConfig.computeUnitPortNumber).endPort() .withEnv(envList) .withResources(resources) @@ -208,7 +248,8 @@ object KubernetesClient { cpuLimit: String, memoryLimit: String, gpuLimit: String, - envVars: Map[String, Any] + envVars: Map[String, Any], + attachVolume: Option[Volume] = None ): Pod = { val podName = generatePodName(cuid) if (getPodByName(podName).isDefined) { @@ -247,6 +288,22 @@ object KubernetesClient { .addToLabels("name", podName) .addToLabels("role", "master") + // -------------- CONTAINER ------------- + val containerB = new ContainerBuilder() + .withName("computing-unit-master") + .withImage(KubernetesConfig.computeUnitMasterImageName) + .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy) + .addNewPort().withContainerPort(KubernetesConfig.computeUnitPortNumber).endPort() + .withEnv(envList) + .withResources(resourceBuilder.build()) + + // mount PVC at /data if provided + attachVolume.foreach { v => + containerB.addNewVolumeMount().withName(v.getName).withMountPath("/core/amber/user-resources").endVolumeMount() + } + + val container = containerB.build() + // Start building the pod spec val specBuilder = podBuilder .endMetadata() @@ -259,16 +316,7 @@ object KubernetesClient { // Complete the pod spec val pod = specBuilder - .addNewContainer() - .withName("computing-unit-master") - .withImage(KubernetesConfig.computeUnitMasterImageName) - .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy) - .addNewPort() - .withContainerPort(KubernetesConfig.computeUnitPortNumber) - .endPort() - .withEnv(envList) - .withResources(resourceBuilder.build()) - .endContainer() + .withContainers(container) .withHostname(podName) .withSubdomain(KubernetesConfig.computeUnitServiceName) .endSpec() @@ -281,6 +329,11 @@ object KubernetesClient { client.pods().inNamespace(namespace).withName(generatePodName(cuid)).delete() } + + private def deleteVolume(cuid: Int): Unit = { + client.persistentVolumeClaims().inNamespace(namespace).withName(generateVolumeName(cuid)).delete() + } + private def deleteClusterMasterService(cuid: Int): Unit = client.services().inNamespace(namespace).withName(generateClusterMasterServiceName(cuid)).delete() diff --git a/core/gui/src/app/app.module.ts b/core/gui/src/app/app.module.ts index fe57e86673a..ff638ff2549 100644 --- a/core/gui/src/app/app.module.ts +++ b/core/gui/src/app/app.module.ts @@ -167,6 +167,7 @@ import { NzDividerModule } from "ng-zorro-antd/divider"; import { NzProgressModule } from "ng-zorro-antd/progress"; import { ComputingUnitSelectionComponent } from "./workspace/component/power-button/computing-unit-selection.component"; import { NzSliderModule } from "ng-zorro-antd/slider"; +import {NzInputNumberModule} from "ng-zorro-antd/input-number"; registerLocaleData(en); @@ -258,71 +259,72 @@ registerLocaleData(en); HubSearchResultComponent, ComputingUnitSelectionComponent, ], - imports: [ - BrowserModule, - AppRoutingModule, - HttpClientModule, - JwtModule.forRoot({ - config: { - tokenGetter: AuthService.getAccessToken, - skipWhenExpired: false, - throwNoTokenError: false, - disallowedRoutes: ["forum/api/users"], - }, - }), - BrowserAnimationsModule, - RouterModule.forRoot([]), - FormsModule, - ReactiveFormsModule, - FormlyModule.forRoot(TEXERA_FORMLY_CONFIG), - FormlyNgZorroAntdModule, - OverlayModule, - NzDatePickerModule, - NzDropDownModule, - NzButtonModule, - NzAutocompleteModule, - NzIconModule, - NzFormModule, - NzListModule, - NzInputModule, - NzPopoverModule, - NzCollapseModule, - NzToolTipModule, - NzTableModule, - NzSelectModule, - NzSpaceModule, - NzBadgeModule, - NzUploadModule, - NgxJsonViewerModule, - NzMessageModule, - NzModalModule, - NzCardModule, - NzTagModule, - NzPopconfirmModule, - NzAvatarModule, - NzTabsModule, - NzPaginationModule, - NzCommentModule, - ColorPickerModule, - NzSwitchModule, - NzLayoutModule, - NzSliderModule, - MarkdownModule.forRoot(), - DragDropModule, - NzAlertModule, - NzResizableModule, - NzSpinModule, - NgxFileDropModule, - NzTreeModule, - NzTreeViewModule, - NzNoAnimationModule, - TreeModule, - SocialLoginModule, - GoogleSigninButtonModule, - NzEmptyModule, - NzDividerModule, - NzProgressModule, - ], + imports: [ + BrowserModule, + AppRoutingModule, + HttpClientModule, + JwtModule.forRoot({ + config: { + tokenGetter: AuthService.getAccessToken, + skipWhenExpired: false, + throwNoTokenError: false, + disallowedRoutes: ["forum/api/users"], + }, + }), + BrowserAnimationsModule, + RouterModule.forRoot([]), + FormsModule, + ReactiveFormsModule, + FormlyModule.forRoot(TEXERA_FORMLY_CONFIG), + FormlyNgZorroAntdModule, + OverlayModule, + NzDatePickerModule, + NzDropDownModule, + NzButtonModule, + NzAutocompleteModule, + NzIconModule, + NzFormModule, + NzListModule, + NzInputModule, + NzPopoverModule, + NzCollapseModule, + NzToolTipModule, + NzTableModule, + NzSelectModule, + NzSpaceModule, + NzBadgeModule, + NzUploadModule, + NgxJsonViewerModule, + NzMessageModule, + NzModalModule, + NzCardModule, + NzTagModule, + NzPopconfirmModule, + NzAvatarModule, + NzTabsModule, + NzPaginationModule, + NzCommentModule, + ColorPickerModule, + NzSwitchModule, + NzLayoutModule, + NzSliderModule, + MarkdownModule.forRoot(), + DragDropModule, + NzAlertModule, + NzResizableModule, + NzSpinModule, + NgxFileDropModule, + NzTreeModule, + NzTreeViewModule, + NzNoAnimationModule, + TreeModule, + SocialLoginModule, + GoogleSigninButtonModule, + NzEmptyModule, + NzDividerModule, + NzProgressModule, + NzInputNumberModule, + ], providers: [ provideNzI18n(en_US), AuthGuardService, diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html index aeab2677769..25d03cabae3 100644 --- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -208,27 +208,48 @@
-
- Select #Worker Nodes - - - - +
+ Cluster Mode +
-
- - -
+ + + +
+ # Worker Nodes + + +
+ + +
+ Disk Size + Gi +
+ + +
+ + +
+
{ diff --git a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts index 90322a98a9f..52bd1f43e83 100644 --- a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts +++ b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts @@ -42,6 +42,7 @@ export class WorkflowComputingUnitManagingService { * @param memoryLimit The memory resource limit for the computing unit. * @param gpuLimit The gpu resource limit for the computing unit. * @param jvmMemorySize The JVM memory size (e.g. "1G", "2G") + * @param diskLimit * @param unitType * @param numNodes * @returns An Observable of the created WorkflowComputingUnit. @@ -52,10 +53,11 @@ export class WorkflowComputingUnitManagingService { memoryLimit: string, gpuLimit: string = "0", jvmMemorySize: string = "1G", + diskLimit: string = "auto", numNodes: number = 0, unitType: string = "k8s_pod", ): Observable { - const body = { name, cpuLimit, memoryLimit, gpuLimit, jvmMemorySize, unitType, numNodes }; + const body = { name, cpuLimit, memoryLimit, gpuLimit, jvmMemorySize, unitType, numNodes, diskLimit }; return this.http.post( `${AppSettings.getApiEndpoint()}/${COMPUTING_UNIT_CREATE_URL}`, From a1cf6f2ea997663805d727f67bc5de99dd526c67 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Mon, 26 May 2025 15:43:19 -0700 Subject: [PATCH 09/29] update --- .../ComputingUnitManagingResource.scala | 4 +- .../service/util/KubernetesClient.scala | 2 +- .../computing-unit-selection.component.html | 97 ++++++++++--------- .../computing-unit-selection.component.scss | 3 +- .../computing-unit-selection.component.ts | 7 +- 5 files changed, 61 insertions(+), 52 deletions(-) diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala index 1dbaec51223..46eb68f8aa7 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala @@ -233,7 +233,7 @@ class ComputingUnitManagingResource { val computingUnit = new WorkflowComputingUnit() val userToken = JwtAuth.jwtToken(jwtClaims(user.user, dayToMin(TOKEN_EXPIRE_TIME_IN_DAYS))) computingUnit.setUid(user.getUid) - val name = if(param.numNodes > 0){ + val name = if(param.numNodes > 1){ param.name + "-cluster" }else{ param.name @@ -248,7 +248,7 @@ class ComputingUnitManagingResource { val cuid = ctx.lastID().intValue() val insertedUnit = wcDao.fetchOneByCuid(cuid) - val pod = if(param.numNodes > 0){ + val pod = if(param.numNodes > 1){ KubernetesClient.createCluster( cuid, param.cpuLimit, diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala index 93905ec2c9c..18faa0f1bac 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala @@ -147,7 +147,7 @@ object KubernetesClient { val volume = createVolume(cuid, diskLimit) val master = createPod(cuid, cpuLimit, memoryLimit, gpuLimit = "0", enrichedEnv, Some(volume)) createClusterMasterService(cuid) - createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes, enrichedEnv, volume) + createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes - 1, enrichedEnv, volume) master // return master pod } diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html index 25d03cabae3..a348a97ec28 100644 --- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -173,7 +173,18 @@
- Computing Unit Name +
+ Computing Unit Name +
+ Create Cluster + + +
+
+ + + + +
+ # of Nodes + + +
+ + +
+ Shared Disk Size + + + +
+
+ +
+ Per-node Spec: +
+
Select Memory Size
-
- Cluster Mode - -
- - - - -
- # Worker Nodes - - -
- - -
- Disk Size - Gi -
- - -
- - -
-
-
Select #GPU(s)
+ + +
+ + +
-
+
+ + +
+ +
+ + +
- - - -
- # of Nodes - - -
+ + + +
+ # of Nodes + + +
- -
- Shared Disk Size - - - -
-
+ +
+ Shared Disk Size + + + +
+
-
- Per-node Spec: -
+
+ Per-node Spec: +
Select RAM Size @@ -238,38 +242,45 @@
-
-
- Select #GPU(s) - -
- - - - -
+
+
+ Select #GPU(s) + +
+ + + + +
- Shared Memory Size - - - - + + +
-
- JVM Memory Size: {{selectedJvmMemorySize}} - - -
+
+ JVM Memory Size: {{selectedJvmMemorySize}} + + +
@@ -333,7 +344,9 @@
-
+
1; } diff --git a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts index 503b928f95f..208d3f689e4 100644 --- a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts +++ b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts @@ -122,7 +122,18 @@ export class WorkflowComputingUnitManagingService { diskLimit: string, numNodes: number ): Observable { - return this.createComputingUnit(name, cpuLimit, memoryLimit, gpuLimit, jvmMemorySize, shmSize, "", diskLimit, numNodes, "kubernetes"); + return this.createComputingUnit( + name, + cpuLimit, + memoryLimit, + gpuLimit, + jvmMemorySize, + shmSize, + "", + diskLimit, + numNodes, + "kubernetes" + ); } /** @@ -133,7 +144,7 @@ export class WorkflowComputingUnitManagingService { * @returns An Observable of the created WorkflowComputingUnit. */ public createLocalComputingUnit(name: string, uri: string): Observable { - return this.createComputingUnit(name, "NaN", "NaN", undefined, "NaN", undefined, uri,"NaN", 1, "local"); + return this.createComputingUnit(name, "NaN", "NaN", undefined, "NaN", undefined, uri, "NaN", 1, "local"); } /** From 6e613242b50f9e36c8ef4f37400806af25758193 Mon Sep 17 00:00:00 2001 From: Shengquan Ni Date: Fri, 30 May 2025 10:04:32 -0700 Subject: [PATCH 17/29] Update computing-unit-selection.component.html --- .../power-button/computing-unit-selection.component.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html index 588d69d2f41..1dbdd102931 100644 --- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -352,7 +352,7 @@ nzMessage="This will start a cluster with {{ selectedNumNodes }} nodes, allocating a total of {{ parseResourceNumber(selectedCpu) * selectedNumNodes }}{{parseResourceUnit(selectedCpu)}} CPU, {{ parseResourceNumber(selectedMemory) * selectedNumNodes }}{{parseResourceUnit(selectedMemory)}} Memory, - and a {{ selectedDiskSize }}GiB disk shared by all nodes." + and a {{ selectedDiskSize }}GiB disk shared by all nodes. Advanced configurations (e.g. GPU, Shared Memory, etc.) will be ignored." nzShowIcon>
From 24cec03bf49a86547521e4f9c4b3b2bc1217f42d Mon Sep 17 00:00:00 2001 From: Shengquan Ni Date: Fri, 30 May 2025 10:09:21 -0700 Subject: [PATCH 18/29] Update computing-unit-selection.component.html --- .../power-button/computing-unit-selection.component.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html index 1dbdd102931..c927e18d696 100644 --- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -368,7 +368,7 @@
Date: Fri, 30 May 2025 17:37:48 -0700 Subject: [PATCH 19/29] Update ComputingUnitManagingResource.scala --- .../service/resource/ComputingUnitManagingResource.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala index 19e59fc20a3..911fed2dd28 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala @@ -281,9 +281,9 @@ class ComputingUnitManagingResource { s"Memory quantity '${param.memoryLimit}' is not allowed. " + s"Valid options: ${memoryLimitOptions.mkString(", ")}" ) - if (!gpuLimitOptions.contains(param.gpuLimit)) + if (param.gpuLimit.isDefined && !gpuLimitOptions.contains(param.gpuLimit)) throw new ForbiddenException( - s"GPU quantity '${param.gpuLimit}' is not allowed. " + + s"GPU quantity '${param.gpuLimit.get}' is not allowed. " + s"Valid options: ${gpuLimitOptions.mkString(", ")}" ) From 5e25b55927cbca83a631b159b5aa9b20b3b930f4 Mon Sep 17 00:00:00 2001 From: Shengquan Ni Date: Fri, 30 May 2025 17:38:41 -0700 Subject: [PATCH 20/29] Update computing-unit-selection.component.html --- .../power-button/computing-unit-selection.component.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html index c927e18d696..9735137620a 100644 --- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -201,7 +201,7 @@
- Shared Disk Size + Total Disk Size Date: Fri, 30 May 2025 18:52:42 -0700 Subject: [PATCH 21/29] Update computing-unit-selection.component.html --- .../power-button/computing-unit-selection.component.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html index 9735137620a..7c8de3ff625 100644 --- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -267,7 +267,7 @@
- Shared Memory Size + Shared Memory for IPC Date: Tue, 3 Jun 2025 22:11:22 -0700 Subject: [PATCH 22/29] Update ComputingUnitManagingResource.scala --- .../service/resource/ComputingUnitManagingResource.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala index 911fed2dd28..97c0af40657 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala @@ -151,7 +151,8 @@ class ComputingUnitManagingResource { .parse(unit.getResource) .as[JsObject] .value("numNodes") - .as[Int] > 1 + .asOpt[Int] + .getOrElse(1) > 1 // for backward compatibility } private def getSupportedComputingUnitTypes: List[String] = { From 72d37e58e821aff5eee0eec76acbc4f9f6721897 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Tue, 3 Jun 2025 22:23:31 -0700 Subject: [PATCH 23/29] Update ComputingUnitManagingResource.scala --- .../service/resource/ComputingUnitManagingResource.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala index 97c0af40657..9780b0a924a 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala @@ -149,8 +149,9 @@ class ComputingUnitManagingResource { private def isCluster(unit: WorkflowComputingUnit): Boolean = { Json .parse(unit.getResource) - .as[JsObject] - .value("numNodes") + .asOpt[JsObject] + .getOrElse(JsObject.empty) + .\("numNodes") .asOpt[Int] .getOrElse(1) > 1 // for backward compatibility } From a3eff4b6311f8484dfd1fd46ed46747f835a76f1 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Tue, 3 Jun 2025 22:29:37 -0700 Subject: [PATCH 24/29] Update ComputingUnitManagingResource.scala --- .../texera/service/resource/ComputingUnitManagingResource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala index 9780b0a924a..b95f9f518cf 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala @@ -151,7 +151,7 @@ class ComputingUnitManagingResource { .parse(unit.getResource) .asOpt[JsObject] .getOrElse(JsObject.empty) - .\("numNodes") + .\("numNodes") // lookup operation .asOpt[Int] .getOrElse(1) > 1 // for backward compatibility } From 0d2618d341fa588ce682133b6ade370ef70bd551 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Wed, 4 Jun 2025 13:24:31 -0700 Subject: [PATCH 25/29] Update KubernetesClient.scala --- .../uci/ics/texera/service/util/KubernetesClient.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala index 97f2ee64108..bf32a15a749 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala @@ -101,9 +101,6 @@ object KubernetesClient { .getOrElse(Map.empty[String, String]) } - // --------------------------------------------------------------------------- - // Cluster lifecycle helpers - // --------------------------------------------------------------------------- def createVolume(cuid: Int, diskLimit: String): Volume = { val pvcName = generateVolumeName(cuid) @@ -164,10 +161,6 @@ object KubernetesClient { deleteVolume(cuid) } - // --------------------------------------------------------------------------- - // Kubernetes resource creators - // --------------------------------------------------------------------------- - private def createClusterMasterService(cuid: Int): Service = { val serviceName = generateClusterMasterServiceName(cuid) val service = new ServiceBuilder() @@ -305,7 +298,6 @@ object KubernetesClient { .addToLabels("name", podName) .addToLabels("role", "master") - // -------------- CONTAINER ------------- val containerBuilder = new ContainerBuilder() .withName("computing-unit-master") .withImage(KubernetesConfig.computeUnitMasterImageName) From ae032b8145d9b438fc666b9b7e06c93d646eb906 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Wed, 4 Jun 2025 13:37:02 -0700 Subject: [PATCH 26/29] wip --- .../service/util/KubernetesClient.scala | 1 - .../computing-unit-selection.component.html | 131 ++++++++++-------- .../computing-unit-selection.component.scss | 4 + 3 files changed, 74 insertions(+), 62 deletions(-) diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala index bf32a15a749..8fa5c705a50 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala @@ -101,7 +101,6 @@ object KubernetesClient { .getOrElse(Map.empty[String, String]) } - def createVolume(cuid: Int, diskLimit: String): Volume = { val pvcName = generateVolumeName(cuid) diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html index 7c8de3ff625..79b7ddb97f2 100644 --- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -242,68 +242,77 @@
-
-
- Select #GPU(s) - -
- - - - -
+ + +
+
+ Select #GPU(s) + +
+ + + + +
-
-
- - Shared Memory for IPC - - - - -
-
- - - - - -
-
+
+
+ + Shared Memory for IPC + + + + +
+
+ + + + + +
+
+
+
JVM Memory Size: {{selectedJvmMemorySize}} Date: Sun, 29 Jun 2025 14:29:28 -0700 Subject: [PATCH 27/29] address conflicts --- .../uci/ics/amber/engine/common/AmberRuntime.scala | 10 +++++----- .../edu/uci/ics/texera/web/ComputingUnitMaster.scala | 11 ++++------- .../edu/uci/ics/texera/web/ComputingUnitWorker.scala | 5 +++-- .../edu/uci/ics/amber/config/ApplicationConfig.scala | 5 +++++ 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala index 0a9fa45da8e..0770b6162d1 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala @@ -21,7 +21,7 @@ package edu.uci.ics.amber.engine.common import akka.actor.{ActorSystem, Address, Cancellable, DeadLetter, Props} import akka.serialization.{Serialization, SerializationExtension} -import edu.uci.ics.amber.config.AkkaConfig +import edu.uci.ics.amber.config.{AkkaConfig, ApplicationConfig} import com.typesafe.config.{Config, ConfigFactory} import edu.uci.ics.amber.clustering.ClusterListener import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor @@ -64,8 +64,8 @@ object AmberRuntime { var masterIpAddress = "localhost" var masterPort = 2552 if (clusterMode) { - masterIpAddress = AmberConfig.masterIpAddress - masterPort = AmberConfig.masterPort + masterIpAddress = ApplicationConfig.masterIpAddress + masterPort = ApplicationConfig.masterPort } val masterConfig = ConfigFactory @@ -90,8 +90,8 @@ object AmberRuntime { var masterPort = 2552 var nodeIp = "localhost" if (clusterMode) { - masterIpAddress = AmberConfig.masterIpAddress - masterPort = AmberConfig.masterPort + masterIpAddress = ApplicationConfig.masterIpAddress + masterPort = ApplicationConfig.masterPort nodeIp = "hostname -i".!!.trim // only supported by linux/unix } val workerConfig = ConfigFactory diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala index b6bc0127d0d..09c61e5fe9b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala @@ -23,18 +23,15 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.config.{ApplicationConfig, StorageConfig} import edu.uci.ics.amber.core.storage.DocumentFactory +import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig -import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{ - COMPLETED, - FAILED -} +import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{COMPLETED, FAILED} import edu.uci.ics.amber.engine.common.AmberRuntime.scheduleRecurringCallThroughActorSystem import edu.uci.ics.amber.engine.common.Utils.maptoStatusCode import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage import edu.uci.ics.amber.engine.common.{AmberRuntime, Utils} -import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity import edu.uci.ics.amber.util.JSONUtils.objectMapper import edu.uci.ics.amber.util.ObjectMapperUtils import edu.uci.ics.texera.auth.SessionUser @@ -42,8 +39,8 @@ import edu.uci.ics.texera.config.UserSystemConfig import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth -import edu.uci.ics.texera.web.resource.{WebsocketPayloadSizeTuner, WorkflowWebsocketResource} import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource +import edu.uci.ics.texera.web.resource.{WebsocketPayloadSizeTuner, WorkflowWebsocketResource} import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService import io.dropwizard.Configuration import io.dropwizard.setup.{Bootstrap, Environment} @@ -74,7 +71,7 @@ object ComputingUnitMaster { def main(args: Array[String]): Unit = { // start actor system master node - AmberRuntime.startActorMaster(AmberConfig.amberClusterEnabled) + AmberRuntime.startActorMaster(ApplicationConfig.amberClusterEnabled) // start web server new ComputingUnitMaster().run( "server", diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala index 12182352c38..d90c5c50f73 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala @@ -19,13 +19,14 @@ package edu.uci.ics.texera.web -import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime} +import edu.uci.ics.amber.config.ApplicationConfig +import edu.uci.ics.amber.engine.common.AmberRuntime object ComputingUnitWorker { def main(args: Array[String]): Unit = { // start actor system worker node - AmberRuntime.startActorWorker(AmberConfig.amberClusterEnabled) + AmberRuntime.startActorWorker(ApplicationConfig.amberClusterEnabled) } } diff --git a/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala b/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala index fd007da281a..22a67133cd5 100644 --- a/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala +++ b/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala @@ -57,6 +57,11 @@ object ApplicationConfig { val creditPollingIntervalInMs: Int = getConfSource.getInt("flow-control.credit-poll-interval-in-ms") + // clustering + val amberClusterEnabled: Boolean = getConfSource.getBoolean("clustering.enabled") + val masterIpAddress: String = getConfSource.getString("clustering.master-ip-address") + val masterPort: Int = getConfSource.getInt("clustering.master-port") + // Network buffering val defaultDataTransferBatchSize: Int = getConfSource.getInt("network-buffering.default-data-transfer-batch-size") From 9e08d9423fdc0c1b58e379a6052bb71f00295caf Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Sun, 29 Jun 2025 14:34:28 -0700 Subject: [PATCH 28/29] reformat --- .../scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala | 5 ++++- .../service/resource/ComputingUnitManagingResource.scala | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala index 09c61e5fe9b..81a93f94ff2 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala @@ -26,7 +26,10 @@ import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig -import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{COMPLETED, FAILED} +import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{ + COMPLETED, + FAILED +} import edu.uci.ics.amber.engine.common.AmberRuntime.scheduleRecurringCallThroughActorSystem import edu.uci.ics.amber.engine.common.Utils.maptoStatusCode import edu.uci.ics.amber.engine.common.client.AmberClient diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala index 4a2332bfe7a..ae3ddf378d7 100644 --- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala +++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala @@ -25,7 +25,6 @@ import edu.uci.ics.texera.auth.{JwtAuth, SessionUser} import edu.uci.ics.texera.config.{ComputingUnitConfig, KubernetesConfig} import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.dao.SqlServer.withTransaction -import edu.uci.ics.texera.dao.jooq.generated.enums.WorkflowComputingUnitTypeEnum import edu.uci.ics.texera.dao.jooq.generated.tables.daos.WorkflowComputingUnitDao import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowComputingUnit import edu.uci.ics.texera.dao.jooq.generated.enums.WorkflowComputingUnitTypeEnum From e4616284444425e5d4b8e39e2be5f29804e87ef9 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <121989255@qq.com> Date: Sun, 29 Jun 2025 14:42:12 -0700 Subject: [PATCH 29/29] add r support to worker container --- deployment/computing-unit-worker.dockerfile | 49 +++++++++++++++++---- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/deployment/computing-unit-worker.dockerfile b/deployment/computing-unit-worker.dockerfile index 0c936eaf94a..323fe3ec2da 100644 --- a/deployment/computing-unit-worker.dockerfile +++ b/deployment/computing-unit-worker.dockerfile @@ -43,29 +43,60 @@ FROM eclipse-temurin:11-jre-jammy AS runtime WORKDIR /core/amber +COPY --from=build /core/amber/r-requirements.txt /tmp/r-requirements.txt COPY --from=build /core/amber/requirements.txt /tmp/requirements.txt COPY --from=build /core/amber/operator-requirements.txt /tmp/operator-requirements.txt -# Install Python runtime and dependencies +# Install Python & R runtime dependencies RUN apt-get update && apt-get install -y \ python3-pip \ python3-dev \ libpq-dev \ + gfortran \ + curl \ + build-essential \ + libreadline-dev \ + libncurses-dev \ + libssl-dev \ + libxml2-dev \ + xorg-dev \ + libbz2-dev \ + liblzma-dev \ + libpcre++-dev \ + libpango1.0-dev \ + libcurl4-openssl-dev \ + unzip \ && apt-get clean -RUN pip3 install --upgrade pip setuptools wheel -RUN pip3 install python-lsp-server python-lsp-server[websockets] - -# Install requirements with a fallback for wordcloud -RUN pip3 install -r /tmp/requirements.txt -RUN pip3 install --no-cache-dir --find-links https://pypi.org/simple/ -r /tmp/operator-requirements.txt || \ - pip3 install --no-cache-dir wordcloud==1.9.2 +# Install R and needed libraries +ENV R_VERSION=4.3.3 +RUN curl -O https://cran.r-project.org/src/base/R-4/R-${R_VERSION}.tar.gz && \ + tar -xf R-${R_VERSION}.tar.gz && \ + cd R-${R_VERSION} && \ + ./configure --prefix=/usr/local \ + --enable-R-shlib \ + --with-blas \ + --with-lapack && \ + make -j 4 && \ + make install && \ + cd .. && \ + rm -rf R-${R_VERSION}* && R --version && pip3 install --upgrade pip setuptools wheel && \ + pip3 install -r /tmp/requirements.txt && \ + pip3 install -r /tmp/operator-requirements.txt && \ + pip3 install -r /tmp/r-requirements.txt +RUN Rscript -e "options(repos = c(CRAN = 'https://cran.r-project.org')); \ + install.packages(c('coro', 'arrow', 'dplyr'), \ + Ncpus = parallel::detectCores())" +ENV LD_LIBRARY_PATH=/usr/local/lib/R/lib:$LD_LIBRARY_PATH # Copy the built texera binary from the build phase +COPY --from=build /.git /.git COPY --from=build /core/amber/target/texera-0.1-SNAPSHOT /core/amber # Copy resources directories under /core from build phase -COPY --from=build /core/amber/src/main/resources /core/amber/src/main/resources COPY --from=build /core/config/src/main/resources /core/config/src/main/resources +COPY --from=build /core/amber/src/main/resources /core/amber/src/main/resources +# Copy code for python & R UDF +COPY --from=build /core/amber/src/main/python /core/amber/src/main/python CMD ["bin/computing-unit-worker"]