From 0279ec77fedfc9436026fa8aa25119d1eb492386 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Sun, 25 May 2025 17:04:33 -0700
Subject: [PATCH 01/29] add env vars for cluster mode
---
.../amber/src/main/resources/application.conf | 11 +++++
.../ics/amber/engine/common/AmberConfig.scala | 6 +++
.../amber/engine/common/AmberRuntime.scala | 45 ++++++++-----------
.../ics/texera/web/ComputingUnitMaster.scala | 33 ++------------
.../ics/texera/web/ComputingUnitWorker.scala | 26 +----------
5 files changed, 41 insertions(+), 80 deletions(-)
diff --git a/core/amber/src/main/resources/application.conf b/core/amber/src/main/resources/application.conf
index aea7d2dfc39..1c01f9cacc1 100644
--- a/core/amber/src/main/resources/application.conf
+++ b/core/amber/src/main/resources/application.conf
@@ -57,6 +57,17 @@ reconfiguration {
enable-transactional-reconfiguration = ${?RECONFIGURATION_ENABLE_TRANSACTIONAL_RECONFIGURATION}
}
+clustering{
+ enabled = false
+ enabled = ${?CLUSTERING_ENABLED}
+
+ master-ip-address = "localhost"
+ master-ip-address = ${?CLUSTERING_MASTER_IP_ADDRESS}
+
+ master-port = 2552
+ master-port = ${?CLUSTERING_MASTER_PORT}
+}
+
cache {
# [false, true]
enabled = true
diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberConfig.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberConfig.scala
index cdf2fcf452f..cafc414db9d 100644
--- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberConfig.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberConfig.scala
@@ -60,6 +60,12 @@ object AmberConfig {
val creditPollingIntervalInMs: Int =
getConfSource.getInt("flow-control.credit-poll-interval-in-ms")
+
+ // clustering
+ val amberClusterEnabled: Boolean = getConfSource.getBoolean("clustering.enabled")
+ val masterIpAddress: String = getConfSource.getString("clustering.master-ip-address")
+ val masterPort: Int = getConfSource.getInt("clustering.master-port")
+
// Network buffering
val defaultDataTransferBatchSize: Int =
getConfSource.getInt("network-buffering.default-data-transfer-batch-size")
diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
index 8efc76e42f1..34d2109d84f 100644
--- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
@@ -26,8 +26,6 @@ import com.typesafe.config.{Config, ConfigFactory}
import edu.uci.ics.amber.clustering.ClusterListener
import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor
-import java.io.{BufferedReader, InputStreamReader}
-import java.net.URL
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
@@ -61,54 +59,47 @@ object AmberRuntime {
_actorSystem.scheduler.scheduleWithFixedDelay(initialDelay, delay)(() => call)
}
- private def getNodeIpAddress: String = {
- try {
- val query = new URL("http://checkip.amazonaws.com")
- val in = new BufferedReader(new InputStreamReader(query.openStream()))
- in.readLine()
- } catch {
- case e: Exception => throw e
- }
- }
-
def startActorMaster(clusterMode: Boolean): Unit = {
- var localIpAddress = "localhost"
+ var masterIpAddress = "localhost"
+ var masterPort = 2552
if (clusterMode) {
- localIpAddress = getNodeIpAddress
+ masterIpAddress = AmberConfig.masterIpAddress
+ masterPort = AmberConfig.masterPort
}
val masterConfig = ConfigFactory
.parseString(s"""
- akka.remote.artery.canonical.port = 2552
- akka.remote.artery.canonical.hostname = $localIpAddress
- akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ]
+ akka.remote.artery.canonical.port = $masterPort
+ akka.remote.artery.canonical.hostname = $masterIpAddress
+ akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ]
""")
.withFallback(akkaConfig)
.resolve()
- AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
+ AmberConfig.masterNodeAddr = createMasterAddress(masterIpAddress, masterPort)
createAmberSystem(masterConfig)
}
def akkaConfig: Config =
ConfigFactory.load("cluster").withFallback(defaultApplication()).resolve()
- private def createMasterAddress(addr: String): Address = Address("akka", "Amber", addr, 2552)
+ private def createMasterAddress(addr: String, port:Int): Address = Address("akka", "Amber", addr, port)
- def startActorWorker(mainNodeAddress: Option[String]): Unit = {
- val addr = mainNodeAddress.getOrElse("localhost")
- var localIpAddress = "localhost"
- if (mainNodeAddress.isDefined) {
- localIpAddress = getNodeIpAddress
+ def startActorWorker(clusterMode:Boolean): Unit = {
+ var masterIpAddress = "localhost"
+ var masterPort = 2552
+ if (clusterMode) {
+ masterIpAddress = AmberConfig.masterIpAddress
+ masterPort = AmberConfig.masterPort
}
val workerConfig = ConfigFactory
.parseString(s"""
- akka.remote.artery.canonical.hostname = $localIpAddress
+ akka.remote.artery.canonical.hostname = $masterIpAddress
akka.remote.artery.canonical.port = 0
- akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ]
+ akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ]
""")
.withFallback(akkaConfig)
.resolve()
- AmberConfig.masterNodeAddr = createMasterAddress(addr)
+ AmberConfig.masterNodeAddr = createMasterAddress(masterIpAddress, masterPort)
createAmberSystem(workerConfig)
}
diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
index 1c14caf9240..721e0efa3a1 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
@@ -22,35 +22,30 @@ package edu.uci.ics.texera.web
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig}
+import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
-import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
- COMPLETED,
- FAILED
-}
+import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{COMPLETED, FAILED}
import edu.uci.ics.amber.engine.common.AmberRuntime.scheduleRecurringCallThroughActorSystem
import edu.uci.ics.amber.engine.common.Utils.{maptoStatusCode, objectMapper}
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime, Utils}
-import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
import edu.uci.ics.texera.auth.SessionUser
import edu.uci.ics.texera.dao.SqlServer
-import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
+import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth
import edu.uci.ics.texera.web.resource.WorkflowWebsocketResource
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService
import io.dropwizard.Configuration
import io.dropwizard.setup.{Bootstrap, Environment}
import io.dropwizard.websockets.WebsocketBundle
-import org.apache.commons.jcs3.access.exception.InvalidArgumentException
import org.eclipse.jetty.server.session.SessionHandler
import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter
import java.net.URI
import java.time.Duration
-import scala.annotation.tailrec
import scala.concurrent.duration.DurationInt
object ComputingUnitMaster {
@@ -70,29 +65,9 @@ object ComputingUnitMaster {
)
}
- type OptionMap = Map[Symbol, Any]
-
- def parseArgs(args: Array[String]): OptionMap = {
- @tailrec
- def nextOption(map: OptionMap, list: List[String]): OptionMap = {
- list match {
- case Nil => map
- case "--cluster" :: value :: tail =>
- nextOption(map ++ Map(Symbol("cluster") -> value.toBoolean), tail)
- case option :: tail =>
- throw new InvalidArgumentException("unknown command-line arg")
- }
- }
-
- nextOption(Map(), args.toList)
- }
-
def main(args: Array[String]): Unit = {
- val argMap = parseArgs(args)
-
- val clusterMode = argMap.get(Symbol("cluster")).asInstanceOf[Option[Boolean]].getOrElse(false)
// start actor system master node
- AmberRuntime.startActorMaster(clusterMode)
+ AmberRuntime.startActorMaster(AmberConfig.amberClusterEnabled)
// start web server
new ComputingUnitMaster().run(
"server",
diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala
index 0830e242a9b..12182352c38 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala
@@ -19,35 +19,13 @@
package edu.uci.ics.texera.web
-import edu.uci.ics.amber.engine.common.AmberRuntime
-import org.apache.commons.jcs3.access.exception.InvalidArgumentException
-
-import scala.annotation.tailrec
+import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime}
object ComputingUnitWorker {
- type OptionMap = Map[Symbol, Any]
-
- def parseArgs(args: Array[String]): OptionMap = {
- @tailrec
- def nextOption(map: OptionMap, list: List[String]): OptionMap = {
- list match {
- case Nil => map
- case "--serverAddr" :: value :: tail =>
- nextOption(map ++ Map(Symbol("serverAddr") -> value), tail)
- case option :: tail =>
- throw new InvalidArgumentException("unknown command-line arg")
- }
- }
-
- nextOption(Map(), args.toList)
- }
-
def main(args: Array[String]): Unit = {
- val argMap = parseArgs(args)
-
// start actor system worker node
- AmberRuntime.startActorWorker(argMap.get(Symbol("serverAddr")).asInstanceOf[Option[String]])
+ AmberRuntime.startActorWorker(AmberConfig.amberClusterEnabled)
}
}
From 133684b0a0a9c9a2d6c0bbb5a5bdc2ca79083c7f Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Sun, 25 May 2025 19:02:08 -0700
Subject: [PATCH 02/29] Update AmberRuntime.scala
---
.../scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
index 34d2109d84f..92ca5e9d082 100644
--- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
@@ -86,14 +86,16 @@ object AmberRuntime {
def startActorWorker(clusterMode:Boolean): Unit = {
var masterIpAddress = "localhost"
+ var nodeIpAddress = "localhost"
var masterPort = 2552
if (clusterMode) {
masterIpAddress = AmberConfig.masterIpAddress
masterPort = AmberConfig.masterPort
+ nodeIpAddress = "0.0.0.0"
}
val workerConfig = ConfigFactory
.parseString(s"""
- akka.remote.artery.canonical.hostname = $masterIpAddress
+ akka.remote.artery.canonical.hostname = $nodeIpAddress
akka.remote.artery.canonical.port = 0
akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ]
""")
From 21d9984bc9e9d542471a0d09ac6c31ce365c42ab Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Sun, 25 May 2025 20:48:43 -0700
Subject: [PATCH 03/29] Update AmberRuntime.scala
---
.../edu/uci/ics/amber/engine/common/AmberRuntime.scala | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
index 92ca5e9d082..dfb07053dce 100644
--- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
@@ -28,6 +28,7 @@ import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorAct
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
+import scala.sys.process._
object AmberRuntime {
@@ -86,16 +87,16 @@ object AmberRuntime {
def startActorWorker(clusterMode:Boolean): Unit = {
var masterIpAddress = "localhost"
- var nodeIpAddress = "localhost"
var masterPort = 2552
+ var nodeIp = "localhost"
if (clusterMode) {
masterIpAddress = AmberConfig.masterIpAddress
masterPort = AmberConfig.masterPort
- nodeIpAddress = "0.0.0.0"
+ nodeIp = "hostname -i".!!.trim // only supported by linux/unix
}
val workerConfig = ConfigFactory
.parseString(s"""
- akka.remote.artery.canonical.hostname = $nodeIpAddress
+ akka.remote.artery.canonical.hostname = $nodeIp
akka.remote.artery.canonical.port = 0
akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ]
""")
From dbdcd02e321ffb127a6b21ee40cb02f6a56fb968 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Mon, 26 May 2025 00:32:23 -0700
Subject: [PATCH 04/29] update
---
.../src/main/resources/kubernetes.conf | 7 +-
.../ics/texera/service/KubernetesConfig.scala | 3 +-
.../ComputingUnitManagingResource.scala | 77 +++++++---
.../service/util/KubernetesClient.scala | 138 +++++++++++++++++-
.../computing-unit-selection.component.html | 22 +++
.../computing-unit-selection.component.scss | 8 +-
.../computing-unit-selection.component.ts | 4 +-
...orkflow-computing-unit-managing.service.ts | 6 +-
8 files changed, 237 insertions(+), 28 deletions(-)
diff --git a/core/computing-unit-managing-service/src/main/resources/kubernetes.conf b/core/computing-unit-managing-service/src/main/resources/kubernetes.conf
index 45171ad2ac7..c5f336e9186 100644
--- a/core/computing-unit-managing-service/src/main/resources/kubernetes.conf
+++ b/core/computing-unit-managing-service/src/main/resources/kubernetes.conf
@@ -22,8 +22,11 @@ kubernetes {
compute-unit-service-name = "workflow-computing-unit-svc"
compute-unit-service-name = ${?KUBERNETES_COMPUTE_UNIT_SERVICE_NAME}
- image-name = "bobbai/texera-workflow-computing-unit:dev"
- image-name = ${?KUBERNETES_IMAGE_NAME}
+ master-image-name = "bobbai/texera-workflow-computing-unit:dev"
+ master-image-name = ${?KUBERNETES_MASTER_IMAGE_NAME}
+
+ worker-image-name = "bobbai/texera-workflow-computing-unit:dev"
+ worker-image-name = ${?KUBERNETES_WORKER_IMAGE_NAME}
image-pull-policy = "Always"
image-pull-policy = ${?KUBERNETES_IMAGE_PULL_POLICY}
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala
index e37d8c5bbe0..9cd69e1f663 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala
@@ -28,7 +28,8 @@ object KubernetesConfig {
// Access the Kubernetes settings with environment variable fallback
val computeUnitServiceName: String = conf.getString("kubernetes.compute-unit-service-name")
val computeUnitPoolNamespace: String = conf.getString("kubernetes.compute-unit-pool-namespace")
- val computeUnitImageName: String = conf.getString("kubernetes.image-name")
+ val computeUnitMasterImageName: String = conf.getString("kubernetes.master-image-name")
+ val computeUnitWorkerImageName: String = conf.getString("kubernetes.worker-image-name")
val computingUnitImagePullPolicy: String = conf.getString("kubernetes.image-pull-policy")
val computeUnitPortNumber: Int = conf.getInt("kubernetes.port-num")
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
index 9dc238b8d3e..77a62fe8503 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
@@ -93,7 +93,8 @@ object ComputingUnitManagingResource {
cpuLimit: String,
memoryLimit: String,
gpuLimit: String,
- jvmMemorySize: String
+ jvmMemorySize: String,
+ numNodes: Int
)
case class WorkflowComputingUnitResourceLimit(
@@ -231,7 +232,12 @@ class ComputingUnitManagingResource {
val computingUnit = new WorkflowComputingUnit()
val userToken = JwtAuth.jwtToken(jwtClaims(user.user, dayToMin(TOKEN_EXPIRE_TIME_IN_DAYS)))
computingUnit.setUid(user.getUid)
- computingUnit.setName(param.name)
+ val name = if(param.numNodes > 0){
+ param.name + "-cluster"
+ }else{
+ param.name
+ }
+ computingUnit.setName(name)
computingUnit.setCreationTime(new Timestamp(System.currentTimeMillis()))
// Insert using the DAO
@@ -241,17 +247,30 @@ class ComputingUnitManagingResource {
val cuid = ctx.lastID().intValue()
val insertedUnit = wcDao.fetchOneByCuid(cuid)
- // Create the pod with the generated CUID
- val pod = KubernetesClient.createPod(
- cuid,
- param.cpuLimit,
- param.memoryLimit,
- param.gpuLimit,
- computingUnitEnvironmentVariables ++ Map(
- EnvironmentalVariable.ENV_USER_JWT_TOKEN -> userToken,
- EnvironmentalVariable.ENV_JAVA_OPTS -> s"-Xmx${param.jvmMemorySize}"
+ val pod = if(param.numNodes > 0){
+ KubernetesClient.createCluster(
+ cuid,
+ param.cpuLimit,
+ param.memoryLimit,
+ param.numNodes,
+ computingUnitEnvironmentVariables ++ Map(
+ EnvironmentalVariable.ENV_USER_JWT_TOKEN -> userToken,
+ EnvironmentalVariable.ENV_JAVA_OPTS -> s"-Xmx${param.jvmMemorySize}"
+ )
)
- )
+ }else{
+ // Create the pod with the generated CUID
+ KubernetesClient.createPod(
+ cuid,
+ param.cpuLimit,
+ param.memoryLimit,
+ param.gpuLimit,
+ computingUnitEnvironmentVariables ++ Map(
+ EnvironmentalVariable.ENV_USER_JWT_TOKEN -> userToken,
+ EnvironmentalVariable.ENV_JAVA_OPTS -> s"-Xmx${param.jvmMemorySize}"
+ )
+ )
+ }
// Return the dashboard response
DashboardWorkflowComputingUnit(
@@ -286,14 +305,28 @@ class ComputingUnitManagingResource {
.filter(_.getTerminateTime == null) // Filter out terminated units
units.map { unit =>
+ val isCluster = unit.getName.endsWith("-cluster")
val cuid = unit.getCuid.intValue()
val podName = KubernetesClient.generatePodName(cuid)
val pod = KubernetesClient.getPodByName(podName)
+ val status = if(isCluster){
+ // master pod (Option[Pod]) + all worker pods
+ val phases = (pod.toSeq ++ KubernetesClient.getClusterPodsById(cuid))
+ .map(_.getStatus.getPhase) // Seq[String] like Seq("Running", "Running", ...)
+
+ phases.distinct match {
+ case Seq(singlePhase) => singlePhase // all identical → return it
+ case _ => "Unknown" // mixed or empty
+ }
+ }else{
+ pod.map(_.getStatus.getPhase).getOrElse("Unknown")
+ }
+
DashboardWorkflowComputingUnit(
computingUnit = unit,
uri = KubernetesClient.generatePodURI(cuid),
- status = pod.map(_.getStatus.getPhase).getOrElse("Unknown"),
+ status = status,
metrics = getComputingUnitMetrics(cuid),
resourceLimits = getComputingUnitResourceLimit(cuid)
)
@@ -323,16 +356,20 @@ class ComputingUnitManagingResource {
.build()
}
- KubernetesClient.deletePod(cuid)
-
- // If successful, update the database
withTransaction(context) { ctx =>
val cuDao = new WorkflowComputingUnitDao(ctx.configuration())
- val units = cuDao.fetchByCuid(cuid)
-
- units.forEach(unit => unit.setTerminateTime(new Timestamp(System.currentTimeMillis())))
- cuDao.update(units)
+ val cu = cuDao.fetchOneByCuid(cuid)
+ val isCluster = cu.getName.endsWith("-cluster")
+ if(isCluster){
+ KubernetesClient.deleteCluster(cuid)
+ }else{
+ KubernetesClient.deletePod(cuid)
+ }
+ // If successful, update the database
+ cu.setTerminateTime(new Timestamp(System.currentTimeMillis()))
+ cuDao.update(cu)
}
+
Response.ok().build()
}
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
index 32e986d919b..d6db81a93c3 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
@@ -21,6 +21,7 @@ package edu.uci.ics.texera.service.util
import edu.uci.ics.texera.service.KubernetesConfig
import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.apps.{StatefulSet, StatefulSetBuilder, StatefulSetSpecBuilder}
import io.fabric8.kubernetes.api.model.metrics.v1beta1.PodMetricsList
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientBuilder}
@@ -39,10 +40,26 @@ object KubernetesClient {
def generatePodName(cuid: Int): String = s"$podNamePrefix-$cuid"
+ def generateClusterMasterServiceName(cuid:Int) = s"${generatePodName(cuid)}-master"
+
+ def generateStatefulSetName(cuid: Int): String = s"${generatePodName(cuid)}-workers"
+
def getPodByName(podName: String): Option[Pod] = {
Option(client.pods().inNamespace(namespace).withName(podName).get())
}
+ def getClusterPodsById(cuid: Int): Array[Pod] = {
+ client
+ .pods()
+ .inNamespace(namespace)
+ .withLabel("type", "computing-unit")
+ .withLabel("cuid", cuid.toString)
+ .list()
+ .getItems
+ .asScala
+ .toArray
+ }
+
def getPodMetrics(cuid: Int): Map[String, String] = {
val podMetricsList: PodMetricsList = client.top().pods().metrics(namespace)
val targetPodName = generatePodName(cuid)
@@ -74,6 +91,118 @@ object KubernetesClient {
.getOrElse(Map.empty[String, String])
}
+
+ // ---------------------------------------------------------------------------
+ // Cluster lifecycle helpers
+ // ---------------------------------------------------------------------------
+ def createCluster(
+ cuid: Int,
+ cpuLimit: String,
+ memoryLimit: String,
+ numNodes: Int,
+ envVars: Map[String, Any]
+ ): Pod = {
+ val masterIp = generatePodURI(cuid)
+ val enrichedEnv = envVars ++ Map(
+ "CLUSTERING_ENABLED" -> "true",
+ "CLUSTERING_MASTER_IP_ADDRESS" -> masterIp
+ )
+
+ val master = createPod(cuid, cpuLimit, memoryLimit, gpuLimit = "0", enrichedEnv)
+ createClusterMasterService(cuid)
+ createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes, enrichedEnv)
+ master // return master pod
+ }
+
+ def deleteCluster(cuid: Int): Unit = {
+ deletePod(cuid)
+ deleteClusterMasterService(cuid)
+ deleteStatefulSet(cuid)
+ }
+
+ // ---------------------------------------------------------------------------
+ // Kubernetes resource creators
+ // ---------------------------------------------------------------------------
+
+ private def createClusterMasterService(cuid: Int): Service = {
+ val serviceName = generateClusterMasterServiceName(cuid)
+ val service = new ServiceBuilder()
+ .withNewMetadata()
+ .withName(serviceName)
+ .withNamespace(namespace)
+ .endMetadata()
+ .withNewSpec()
+ .withClusterIP("None") // headless for DNS discovery
+ .addNewPort()
+ .withPort(2552)
+ .endPort()
+ .addToSelector("type", "computing-unit")
+ .addToSelector("cuid", cuid.toString)
+ .addToSelector("role", "master")
+ .endSpec()
+ .build()
+
+ client.services().inNamespace(namespace).create(service)
+ }
+
+ private def createStatefulSet(
+ cuid: Int,
+ cpuLimit: String,
+ memoryLimit: String,
+ numNodes: Int,
+ envVars: Map[String, Any]
+ ): StatefulSet = {
+ val envList = envVars.map { case (k, v) =>
+ new EnvVarBuilder().withName(k).withValue(v.toString).build()
+ }.toList.asJava
+
+ val resources = new ResourceRequirementsBuilder()
+ .addToLimits("cpu", new Quantity(cpuLimit))
+ .addToLimits("memory", new Quantity(memoryLimit))
+ .build()
+
+ val container = new ContainerBuilder()
+ .withName("computing-unit-worker")
+ .withImage(KubernetesConfig.computeUnitWorkerImageName)
+ .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy)
+ .addNewPort().withContainerPort(KubernetesConfig.computeUnitPortNumber).endPort()
+ .withEnv(envList)
+ .withResources(resources)
+ .build()
+
+ val sts = new StatefulSetBuilder()
+ .withNewMetadata()
+ .withName(generateStatefulSetName(cuid))
+ .withNamespace(namespace)
+ .endMetadata()
+ .withSpec(
+ new StatefulSetSpecBuilder()
+ .withServiceName(generatePodName(cuid))
+ .withReplicas(numNodes)
+ .withSelector(
+ new LabelSelectorBuilder()
+ .addToMatchLabels("type", "computing-unit")
+ .addToMatchLabels("cuid", cuid.toString)
+ .addToMatchLabels("role", "worker")
+ .build()
+ )
+ .withNewTemplate()
+ .withNewMetadata()
+ .addToLabels("type", "computing-unit")
+ .addToLabels("cuid", cuid.toString)
+ .addToLabels("role", "worker")
+ .endMetadata()
+ .withNewSpec()
+ .withContainers(container)
+ .endSpec()
+ .endTemplate()
+ .build()
+ )
+ .build()
+
+ client.apps().statefulSets().inNamespace(namespace).create(sts)
+ }
+
def createPod(
cuid: Int,
cpuLimit: String,
@@ -116,6 +245,7 @@ object KubernetesClient {
.addToLabels("type", "computing-unit")
.addToLabels("cuid", cuid.toString)
.addToLabels("name", podName)
+ .addToLabels("role", "master")
// Start building the pod spec
val specBuilder = podBuilder
@@ -131,7 +261,7 @@ object KubernetesClient {
val pod = specBuilder
.addNewContainer()
.withName("computing-unit-master")
- .withImage(KubernetesConfig.computeUnitImageName)
+ .withImage(KubernetesConfig.computeUnitMasterImageName)
.withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy)
.addNewPort()
.withContainerPort(KubernetesConfig.computeUnitPortNumber)
@@ -150,4 +280,10 @@ object KubernetesClient {
def deletePod(cuid: Int): Unit = {
client.pods().inNamespace(namespace).withName(generatePodName(cuid)).delete()
}
+
+ private def deleteClusterMasterService(cuid: Int): Unit =
+ client.services().inNamespace(namespace).withName(generateClusterMasterServiceName(cuid)).delete()
+
+ private def deleteStatefulSet(cuid: Int): Unit =
+ client.apps().statefulSets().inNamespace(namespace).withName(generateStatefulSetName(cuid)).delete()
}
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
index c9b554ed67f..aeab2677769 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
@@ -208,6 +208,28 @@
+
+ Select #Worker Nodes
+
+
+
+
+
+
+ 0"
+ class="cluster-warning">
+
+
+
+
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.scss b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.scss
index 22f63167fa7..90a2d650edb 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.scss
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.scss
@@ -170,10 +170,16 @@
.memory-selection,
.cpu-selection,
-.gpu-selection {
+.gpu-selection,
+.node-selection
+{
width: 100%;
}
+.cluster-warning {
+ font-size: 0.9em;
+}
+
.jvm-memory-slider {
width: 100%;
margin: 10px 0;
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.ts b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.ts
index ea576386480..297ac572411 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.ts
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.ts
@@ -53,6 +53,7 @@ export class ComputingUnitSelectionComponent implements OnInit, OnChanges {
selectedCpu: string = "";
selectedGpu: string = "0"; // Default to no GPU
selectedJvmMemorySize: string = "1G"; // Initial JVM memory size
+ selectedNumNodes: number = 0
// JVM memory slider configuration
jvmMemorySliderValue: number = 1; // Initial value in GB
@@ -65,6 +66,7 @@ export class ComputingUnitSelectionComponent implements OnInit, OnChanges {
cpuOptions: string[] = [];
memoryOptions: string[] = [];
gpuOptions: string[] = []; // Add GPU options array
+ nodeOptions: number[] = [0,1,2,3,4,5,6,7,8]
// Add property to track user-initiated termination
private isUserTerminatingUnit = false;
@@ -270,7 +272,7 @@ export class ComputingUnitSelectionComponent implements OnInit, OnChanges {
const computeJvmMemory = this.selectedJvmMemorySize;
this.computingUnitService
- .createComputingUnit(computeUnitName, computeCPU, computeMemory, computeGPU, computeJvmMemory)
+ .createComputingUnit(computeUnitName, computeCPU, computeMemory, computeGPU, computeJvmMemory, this.selectedNumNodes)
.pipe(untilDestroyed(this))
.subscribe({
next: (unit: DashboardWorkflowComputingUnit) => {
diff --git a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts
index b5664514d7b..90322a98a9f 100644
--- a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts
+++ b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts
@@ -43,6 +43,7 @@ export class WorkflowComputingUnitManagingService {
* @param gpuLimit The gpu resource limit for the computing unit.
* @param jvmMemorySize The JVM memory size (e.g. "1G", "2G")
* @param unitType
+ * @param numNodes
* @returns An Observable of the created WorkflowComputingUnit.
*/
public createComputingUnit(
@@ -51,9 +52,10 @@ export class WorkflowComputingUnitManagingService {
memoryLimit: string,
gpuLimit: string = "0",
jvmMemorySize: string = "1G",
- unitType: string = "k8s_pod"
+ numNodes: number = 0,
+ unitType: string = "k8s_pod",
): Observable {
- const body = { name, cpuLimit, memoryLimit, gpuLimit, jvmMemorySize, unitType };
+ const body = { name, cpuLimit, memoryLimit, gpuLimit, jvmMemorySize, unitType, numNodes };
return this.http.post(
`${AppSettings.getApiEndpoint()}/${COMPUTING_UNIT_CREATE_URL}`,
From 632740cbea3d48bb3c4a42e5476b3e85de9055f8 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Mon, 26 May 2025 10:25:53 -0700
Subject: [PATCH 05/29] Update
workflow-computing-unit-manager-service-account.yaml
---
...kflow-computing-unit-manager-service-account.yaml | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
diff --git a/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-service-account.yaml b/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-service-account.yaml
index 44dce0cf877..ef698226c23 100644
--- a/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-service-account.yaml
+++ b/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-service-account.yaml
@@ -29,11 +29,17 @@ metadata:
namespace: {{ .Values.workflowComputingUnitPool.namespace }}
rules:
- apiGroups: [""]
- resources: ["pods"]
+ resources: ["pods", "services"] # added services
verbs: ["get", "list", "watch", "create", "delete"]
- - apiGroups: ["metrics.k8s.io"] # Added metrics permissions
+
+ - apiGroups: ["metrics.k8s.io"]
resources: ["pods"]
- verbs: ["list", "get"] # Added metrics permissions
+ verbs: ["get", "list"]
+
+ - apiGroups: ["apps"]
+ resources: ["statefulsets"]
+ verbs: ["get", "list", "watch", "create", "delete"]
+
---
apiVersion: rbac.authorization.k8s.io/v1
From 092938d23e0f4fd33173d58ea67651dccaacbfd4 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Mon, 26 May 2025 10:54:39 -0700
Subject: [PATCH 06/29] Update workflow-computing-unit-manager-deployment.yaml
---
.../workflow-computing-unit-manager-deployment.yaml | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-deployment.yaml b/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-deployment.yaml
index 2226bc67168..0762fb25dd2 100644
--- a/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-deployment.yaml
+++ b/deployment/k8s/texera-helmchart/templates/workflow-computing-unit-manager-deployment.yaml
@@ -45,8 +45,10 @@ spec:
value: {{ .Values.workflowComputingUnitPool.namespace }}
- name: KUBERNETES_COMPUTE_UNIT_SERVICE_NAME
value: {{ .Values.workflowComputingUnitPool.name }}-svc
- - name: KUBERNETES_IMAGE_NAME
- value: {{ .Values.workflowComputingUnitPool.imageName }}
+ - name: KUBERNETES_MASTER_IMAGE_NAME
+ value: {{ .Values.workflowComputingUnitPool.masterImageName }}
+ - name: KUBERNETES_WORKER_IMAGE_NAME
+ value: {{ .Values.workflowComputingUnitPool.workerImageName }}
# TexeraDB Access
- name: STORAGE_JDBC_URL
value: jdbc:postgresql://{{ .Release.Name }}-postgresql:5432/texera_db?currentSchema=texera_db,public
From 9c6b74d19f24a84b4414045136b3750d4bdd0f8f Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Mon, 26 May 2025 11:28:32 -0700
Subject: [PATCH 07/29] update
---
.../ComputingUnitManagingResource.scala | 7 ++-
deployment/computing-unit-worker.dockerfile | 46 +++++++++++++++----
2 files changed, 41 insertions(+), 12 deletions(-)
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
index 77a62fe8503..d3e53d2fade 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
@@ -311,13 +311,12 @@ class ComputingUnitManagingResource {
val pod = KubernetesClient.getPodByName(podName)
val status = if(isCluster){
- // master pod (Option[Pod]) + all worker pods
val phases = (pod.toSeq ++ KubernetesClient.getClusterPodsById(cuid))
- .map(_.getStatus.getPhase) // Seq[String] like Seq("Running", "Running", ...)
+ .map(_.getStatus.getPhase)
phases.distinct match {
- case Seq(singlePhase) => singlePhase // all identical → return it
- case _ => "Unknown" // mixed or empty
+ case Seq(singlePhase) => singlePhase // all identical
+ case _ => "Unknown" // mixed
}
}else{
pod.map(_.getStatus.getPhase).getOrElse("Unknown")
diff --git a/deployment/computing-unit-worker.dockerfile b/deployment/computing-unit-worker.dockerfile
index 5d744610eaf..075682fea15 100644
--- a/deployment/computing-unit-worker.dockerfile
+++ b/deployment/computing-unit-worker.dockerfile
@@ -43,23 +43,51 @@ FROM eclipse-temurin:11-jre-jammy AS runtime
WORKDIR /core/amber
+COPY --from=build /core/amber/r-requirements.txt /tmp/r-requirements.txt
COPY --from=build /core/amber/requirements.txt /tmp/requirements.txt
COPY --from=build /core/amber/operator-requirements.txt /tmp/operator-requirements.txt
-# Install Python runtime and dependencies
+# Install Python & R runtime dependencies
RUN apt-get update && apt-get install -y \
python3-pip \
python3-dev \
libpq-dev \
+ gfortran \
+ curl \
+ build-essential \
+ libreadline-dev \
+ libncurses-dev \
+ libssl-dev \
+ libxml2-dev \
+ xorg-dev \
+ libbz2-dev \
+ liblzma-dev \
+ libpcre++-dev \
+ libpango1.0-dev \
+ libcurl4-openssl-dev \
+ unzip \
&& apt-get clean
-RUN pip3 install --upgrade pip setuptools wheel
-RUN pip3 install python-lsp-server python-lsp-server[websockets]
-
-# Install requirements with a fallback for wordcloud
-RUN pip3 install -r /tmp/requirements.txt
-RUN pip3 install --no-cache-dir --find-links https://pypi.org/simple/ -r /tmp/operator-requirements.txt || \
- pip3 install --no-cache-dir wordcloud==1.9.2
+# Install R and needed libraries
+ENV R_VERSION=4.3.3
+RUN curl -O https://cran.r-project.org/src/base/R-4/R-${R_VERSION}.tar.gz && \
+ tar -xf R-${R_VERSION}.tar.gz && \
+ cd R-${R_VERSION} && \
+ ./configure --prefix=/usr/local \
+ --enable-R-shlib \
+ --with-blas \
+ --with-lapack && \
+ make -j 4 && \
+ make install && \
+ cd .. && \
+ rm -rf R-${R_VERSION}* && R --version && pip3 install --upgrade pip setuptools wheel && \
+ pip3 install -r /tmp/requirements.txt && \
+ pip3 install -r /tmp/operator-requirements.txt && \
+ pip3 install -r /tmp/r-requirements.txt
+RUN Rscript -e "options(repos = c(CRAN = 'https://cran.r-project.org')); \
+ install.packages(c('coro', 'arrow', 'dplyr'), \
+ Ncpus = parallel::detectCores())"
+ENV LD_LIBRARY_PATH=/usr/local/lib/R/lib:$LD_LIBRARY_PATH
# Copy the built texera binary from the build phase
COPY --from=build /core/amber/target/texera-0.1-SNAPSHOT /core/amber
@@ -67,6 +95,8 @@ COPY --from=build /core/amber/target/texera-0.1-SNAPSHOT /core/amber
COPY --from=build /core/amber/src/main/resources /core/amber/src/main/resources
COPY --from=build /core/workflow-core/src/main/resources /core/workflow-core/src/main/resources
COPY --from=build /core/file-service/src/main/resources /core/file-service/src/main/resources
+# Copy code for python & R UDF
+COPY --from=build /core/amber/src/main/python /core/amber/src/main/python
CMD ["bin/computing-unit-worker"]
From 0be9bd93d331ffeacba5eac16e71054a86643787 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Mon, 26 May 2025 13:58:46 -0700
Subject: [PATCH 08/29] update
---
.../src/main/resources/kubernetes.conf | 3 +
.../ics/texera/service/KubernetesConfig.scala | 1 +
.../ComputingUnitManagingResource.scala | 6 +-
.../service/util/KubernetesClient.scala | 83 +++++++++--
core/gui/src/app/app.module.ts | 132 +++++++++---------
.../computing-unit-selection.component.html | 59 +++++---
.../computing-unit-selection.component.ts | 4 +-
...orkflow-computing-unit-managing.service.ts | 4 +-
8 files changed, 189 insertions(+), 103 deletions(-)
diff --git a/core/computing-unit-managing-service/src/main/resources/kubernetes.conf b/core/computing-unit-managing-service/src/main/resources/kubernetes.conf
index c5f336e9186..c8b48ad8cfd 100644
--- a/core/computing-unit-managing-service/src/main/resources/kubernetes.conf
+++ b/core/computing-unit-managing-service/src/main/resources/kubernetes.conf
@@ -28,6 +28,9 @@ kubernetes {
worker-image-name = "bobbai/texera-workflow-computing-unit:dev"
worker-image-name = ${?KUBERNETES_WORKER_IMAGE_NAME}
+ storage-class-name = "standard"
+ storage-class-name = ${?KUBERNETES_STORAGE_CLASS_NAME}
+
image-pull-policy = "Always"
image-pull-policy = ${?KUBERNETES_IMAGE_PULL_POLICY}
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala
index 9cd69e1f663..e2ea126186b 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/KubernetesConfig.scala
@@ -31,6 +31,7 @@ object KubernetesConfig {
val computeUnitMasterImageName: String = conf.getString("kubernetes.master-image-name")
val computeUnitWorkerImageName: String = conf.getString("kubernetes.worker-image-name")
val computingUnitImagePullPolicy: String = conf.getString("kubernetes.image-pull-policy")
+ val computingUnitStorageClassName: String = conf.getString("kubernetes.storage-class-name")
val computeUnitPortNumber: Int = conf.getInt("kubernetes.port-num")
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
index d3e53d2fade..1dbaec51223 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
@@ -94,7 +94,8 @@ object ComputingUnitManagingResource {
memoryLimit: String,
gpuLimit: String,
jvmMemorySize: String,
- numNodes: Int
+ numNodes: Int,
+ diskLimit: String
)
case class WorkflowComputingUnitResourceLimit(
@@ -252,6 +253,7 @@ class ComputingUnitManagingResource {
cuid,
param.cpuLimit,
param.memoryLimit,
+ param.diskLimit,
param.numNodes,
computingUnitEnvironmentVariables ++ Map(
EnvironmentalVariable.ENV_USER_JWT_TOKEN -> userToken,
@@ -259,7 +261,7 @@ class ComputingUnitManagingResource {
)
)
}else{
- // Create the pod with the generated CUID
+ // Create the pod with the generated CUID (ignoring disk limit for now)
KubernetesClient.createPod(
cuid,
param.cpuLimit,
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
index d6db81a93c3..93905ec2c9c 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
@@ -40,6 +40,8 @@ object KubernetesClient {
def generatePodName(cuid: Int): String = s"$podNamePrefix-$cuid"
+ def generateVolumeName(cuid:Int) = s"${generatePodName(cuid)}-pvc"
+
def generateClusterMasterServiceName(cuid:Int) = s"${generatePodName(cuid)}-master"
def generateStatefulSetName(cuid: Int): String = s"${generatePodName(cuid)}-workers"
@@ -95,10 +97,45 @@ object KubernetesClient {
// ---------------------------------------------------------------------------
// Cluster lifecycle helpers
// ---------------------------------------------------------------------------
+
+ def createVolume(cuid: Int, diskLimit: String): Volume = {
+ val pvcName = generateVolumeName(cuid)
+
+ // Build / create PVC if it doesn't exist yet
+ val pvc = new PersistentVolumeClaimBuilder()
+ .withNewMetadata()
+ .withName(pvcName)
+ .withNamespace(namespace)
+ .addToLabels("type", "computing-unit")
+ .addToLabels("cuid", cuid.toString)
+ .endMetadata()
+ .withNewSpec()
+ .withAccessModes("ReadWriteOnce")
+ .withNewResources()
+ .addToRequests("storage", new Quantity(diskLimit))
+ .endResources()
+ .withStorageClassName(KubernetesConfig.computingUnitStorageClassName)
+ .endSpec()
+ .build()
+
+ // idempotent create / update
+ client.persistentVolumeClaims().inNamespace(namespace).create(pvc)
+
+ // Return a Volume that points to the PVC so callers can mount it
+ new VolumeBuilder()
+ .withName(pvcName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(pvcName)
+ .endPersistentVolumeClaim()
+ .build()
+ }
+
+
def createCluster(
cuid: Int,
cpuLimit: String,
memoryLimit: String,
+ diskLimit: String,
numNodes: Int,
envVars: Map[String, Any]
): Pod = {
@@ -107,10 +144,10 @@ object KubernetesClient {
"CLUSTERING_ENABLED" -> "true",
"CLUSTERING_MASTER_IP_ADDRESS" -> masterIp
)
-
- val master = createPod(cuid, cpuLimit, memoryLimit, gpuLimit = "0", enrichedEnv)
+ val volume = createVolume(cuid, diskLimit)
+ val master = createPod(cuid, cpuLimit, memoryLimit, gpuLimit = "0", enrichedEnv, Some(volume))
createClusterMasterService(cuid)
- createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes, enrichedEnv)
+ createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes, enrichedEnv, volume)
master // return master pod
}
@@ -118,6 +155,7 @@ object KubernetesClient {
deletePod(cuid)
deleteClusterMasterService(cuid)
deleteStatefulSet(cuid)
+ deleteVolume(cuid)
}
// ---------------------------------------------------------------------------
@@ -150,7 +188,8 @@ object KubernetesClient {
cpuLimit: String,
memoryLimit: String,
numNodes: Int,
- envVars: Map[String, Any]
+ envVars: Map[String, Any],
+ volume: Volume
): StatefulSet = {
val envList = envVars.map { case (k, v) =>
new EnvVarBuilder().withName(k).withValue(v.toString).build()
@@ -165,6 +204,7 @@ object KubernetesClient {
.withName("computing-unit-worker")
.withImage(KubernetesConfig.computeUnitWorkerImageName)
.withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy)
+ .addNewVolumeMount().withName(volume.getName).withMountPath("/core/amber/user-resources").endVolumeMount()
.addNewPort().withContainerPort(KubernetesConfig.computeUnitPortNumber).endPort()
.withEnv(envList)
.withResources(resources)
@@ -208,7 +248,8 @@ object KubernetesClient {
cpuLimit: String,
memoryLimit: String,
gpuLimit: String,
- envVars: Map[String, Any]
+ envVars: Map[String, Any],
+ attachVolume: Option[Volume] = None
): Pod = {
val podName = generatePodName(cuid)
if (getPodByName(podName).isDefined) {
@@ -247,6 +288,22 @@ object KubernetesClient {
.addToLabels("name", podName)
.addToLabels("role", "master")
+ // -------------- CONTAINER -------------
+ val containerB = new ContainerBuilder()
+ .withName("computing-unit-master")
+ .withImage(KubernetesConfig.computeUnitMasterImageName)
+ .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy)
+ .addNewPort().withContainerPort(KubernetesConfig.computeUnitPortNumber).endPort()
+ .withEnv(envList)
+ .withResources(resourceBuilder.build())
+
+ // mount PVC at /data if provided
+ attachVolume.foreach { v =>
+ containerB.addNewVolumeMount().withName(v.getName).withMountPath("/core/amber/user-resources").endVolumeMount()
+ }
+
+ val container = containerB.build()
+
// Start building the pod spec
val specBuilder = podBuilder
.endMetadata()
@@ -259,16 +316,7 @@ object KubernetesClient {
// Complete the pod spec
val pod = specBuilder
- .addNewContainer()
- .withName("computing-unit-master")
- .withImage(KubernetesConfig.computeUnitMasterImageName)
- .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy)
- .addNewPort()
- .withContainerPort(KubernetesConfig.computeUnitPortNumber)
- .endPort()
- .withEnv(envList)
- .withResources(resourceBuilder.build())
- .endContainer()
+ .withContainers(container)
.withHostname(podName)
.withSubdomain(KubernetesConfig.computeUnitServiceName)
.endSpec()
@@ -281,6 +329,11 @@ object KubernetesClient {
client.pods().inNamespace(namespace).withName(generatePodName(cuid)).delete()
}
+
+ private def deleteVolume(cuid: Int): Unit = {
+ client.persistentVolumeClaims().inNamespace(namespace).withName(generateVolumeName(cuid)).delete()
+ }
+
private def deleteClusterMasterService(cuid: Int): Unit =
client.services().inNamespace(namespace).withName(generateClusterMasterServiceName(cuid)).delete()
diff --git a/core/gui/src/app/app.module.ts b/core/gui/src/app/app.module.ts
index fe57e86673a..ff638ff2549 100644
--- a/core/gui/src/app/app.module.ts
+++ b/core/gui/src/app/app.module.ts
@@ -167,6 +167,7 @@ import { NzDividerModule } from "ng-zorro-antd/divider";
import { NzProgressModule } from "ng-zorro-antd/progress";
import { ComputingUnitSelectionComponent } from "./workspace/component/power-button/computing-unit-selection.component";
import { NzSliderModule } from "ng-zorro-antd/slider";
+import {NzInputNumberModule} from "ng-zorro-antd/input-number";
registerLocaleData(en);
@@ -258,71 +259,72 @@ registerLocaleData(en);
HubSearchResultComponent,
ComputingUnitSelectionComponent,
],
- imports: [
- BrowserModule,
- AppRoutingModule,
- HttpClientModule,
- JwtModule.forRoot({
- config: {
- tokenGetter: AuthService.getAccessToken,
- skipWhenExpired: false,
- throwNoTokenError: false,
- disallowedRoutes: ["forum/api/users"],
- },
- }),
- BrowserAnimationsModule,
- RouterModule.forRoot([]),
- FormsModule,
- ReactiveFormsModule,
- FormlyModule.forRoot(TEXERA_FORMLY_CONFIG),
- FormlyNgZorroAntdModule,
- OverlayModule,
- NzDatePickerModule,
- NzDropDownModule,
- NzButtonModule,
- NzAutocompleteModule,
- NzIconModule,
- NzFormModule,
- NzListModule,
- NzInputModule,
- NzPopoverModule,
- NzCollapseModule,
- NzToolTipModule,
- NzTableModule,
- NzSelectModule,
- NzSpaceModule,
- NzBadgeModule,
- NzUploadModule,
- NgxJsonViewerModule,
- NzMessageModule,
- NzModalModule,
- NzCardModule,
- NzTagModule,
- NzPopconfirmModule,
- NzAvatarModule,
- NzTabsModule,
- NzPaginationModule,
- NzCommentModule,
- ColorPickerModule,
- NzSwitchModule,
- NzLayoutModule,
- NzSliderModule,
- MarkdownModule.forRoot(),
- DragDropModule,
- NzAlertModule,
- NzResizableModule,
- NzSpinModule,
- NgxFileDropModule,
- NzTreeModule,
- NzTreeViewModule,
- NzNoAnimationModule,
- TreeModule,
- SocialLoginModule,
- GoogleSigninButtonModule,
- NzEmptyModule,
- NzDividerModule,
- NzProgressModule,
- ],
+ imports: [
+ BrowserModule,
+ AppRoutingModule,
+ HttpClientModule,
+ JwtModule.forRoot({
+ config: {
+ tokenGetter: AuthService.getAccessToken,
+ skipWhenExpired: false,
+ throwNoTokenError: false,
+ disallowedRoutes: ["forum/api/users"],
+ },
+ }),
+ BrowserAnimationsModule,
+ RouterModule.forRoot([]),
+ FormsModule,
+ ReactiveFormsModule,
+ FormlyModule.forRoot(TEXERA_FORMLY_CONFIG),
+ FormlyNgZorroAntdModule,
+ OverlayModule,
+ NzDatePickerModule,
+ NzDropDownModule,
+ NzButtonModule,
+ NzAutocompleteModule,
+ NzIconModule,
+ NzFormModule,
+ NzListModule,
+ NzInputModule,
+ NzPopoverModule,
+ NzCollapseModule,
+ NzToolTipModule,
+ NzTableModule,
+ NzSelectModule,
+ NzSpaceModule,
+ NzBadgeModule,
+ NzUploadModule,
+ NgxJsonViewerModule,
+ NzMessageModule,
+ NzModalModule,
+ NzCardModule,
+ NzTagModule,
+ NzPopconfirmModule,
+ NzAvatarModule,
+ NzTabsModule,
+ NzPaginationModule,
+ NzCommentModule,
+ ColorPickerModule,
+ NzSwitchModule,
+ NzLayoutModule,
+ NzSliderModule,
+ MarkdownModule.forRoot(),
+ DragDropModule,
+ NzAlertModule,
+ NzResizableModule,
+ NzSpinModule,
+ NgxFileDropModule,
+ NzTreeModule,
+ NzTreeViewModule,
+ NzNoAnimationModule,
+ TreeModule,
+ SocialLoginModule,
+ GoogleSigninButtonModule,
+ NzEmptyModule,
+ NzDividerModule,
+ NzProgressModule,
+ NzInputNumberModule,
+ ],
providers: [
provideNzI18n(en_US),
AuthGuardService,
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
index aeab2677769..25d03cabae3 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
@@ -208,27 +208,48 @@
-
-
Select #Worker Nodes
-
-
-
-
+
+ Cluster Mode
+
-
0"
- class="cluster-warning">
-
-
-
+
+
+
+
+ # Worker Nodes
+
+
+
+
+
+
+ Disk Size
+ Gi
+
+
+
+
+
+
+
+
{
diff --git a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts
index 90322a98a9f..52bd1f43e83 100644
--- a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts
+++ b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts
@@ -42,6 +42,7 @@ export class WorkflowComputingUnitManagingService {
* @param memoryLimit The memory resource limit for the computing unit.
* @param gpuLimit The gpu resource limit for the computing unit.
* @param jvmMemorySize The JVM memory size (e.g. "1G", "2G")
+ * @param diskLimit
* @param unitType
* @param numNodes
* @returns An Observable of the created WorkflowComputingUnit.
@@ -52,10 +53,11 @@ export class WorkflowComputingUnitManagingService {
memoryLimit: string,
gpuLimit: string = "0",
jvmMemorySize: string = "1G",
+ diskLimit: string = "auto",
numNodes: number = 0,
unitType: string = "k8s_pod",
): Observable
{
- const body = { name, cpuLimit, memoryLimit, gpuLimit, jvmMemorySize, unitType, numNodes };
+ const body = { name, cpuLimit, memoryLimit, gpuLimit, jvmMemorySize, unitType, numNodes, diskLimit };
return this.http.post(
`${AppSettings.getApiEndpoint()}/${COMPUTING_UNIT_CREATE_URL}`,
From a1cf6f2ea997663805d727f67bc5de99dd526c67 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Mon, 26 May 2025 15:43:19 -0700
Subject: [PATCH 09/29] update
---
.../ComputingUnitManagingResource.scala | 4 +-
.../service/util/KubernetesClient.scala | 2 +-
.../computing-unit-selection.component.html | 97 ++++++++++---------
.../computing-unit-selection.component.scss | 3 +-
.../computing-unit-selection.component.ts | 7 +-
5 files changed, 61 insertions(+), 52 deletions(-)
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
index 1dbaec51223..46eb68f8aa7 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
@@ -233,7 +233,7 @@ class ComputingUnitManagingResource {
val computingUnit = new WorkflowComputingUnit()
val userToken = JwtAuth.jwtToken(jwtClaims(user.user, dayToMin(TOKEN_EXPIRE_TIME_IN_DAYS)))
computingUnit.setUid(user.getUid)
- val name = if(param.numNodes > 0){
+ val name = if(param.numNodes > 1){
param.name + "-cluster"
}else{
param.name
@@ -248,7 +248,7 @@ class ComputingUnitManagingResource {
val cuid = ctx.lastID().intValue()
val insertedUnit = wcDao.fetchOneByCuid(cuid)
- val pod = if(param.numNodes > 0){
+ val pod = if(param.numNodes > 1){
KubernetesClient.createCluster(
cuid,
param.cpuLimit,
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
index 93905ec2c9c..18faa0f1bac 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
@@ -147,7 +147,7 @@ object KubernetesClient {
val volume = createVolume(cuid, diskLimit)
val master = createPod(cuid, cpuLimit, memoryLimit, gpuLimit = "0", enrichedEnv, Some(volume))
createClusterMasterService(cuid)
- createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes, enrichedEnv, volume)
+ createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes - 1, enrichedEnv, volume)
master // return master pod
}
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
index 25d03cabae3..a348a97ec28 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
@@ -173,7 +173,18 @@
-
Computing Unit Name
+
+
Computing Unit Name
+
+ Create Cluster
+
+
+
+
+
+
+
+
+
+ # of Nodes
+
+
+
+
+
+
+ Shared Disk Size
+
+
+
+
+
+
+
+ Per-node Spec:
+
+
Select Memory Size
-
- Cluster Mode
-
-
-
-
-
-
-
- # Worker Nodes
-
-
-
-
-
-
- Disk Size
- Gi
-
-
-
-
-
-
-
-
-
Select #GPU(s)
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
-
-
-
-
- # of Nodes
-
-
-
+
+
+
+
+ # of Nodes
+
+
+
-
-
- Shared Disk Size
-
-
-
-
-
+
+
+ Shared Disk Size
+
+
+
+
+
-
- Per-node Spec:
-
+
+ Per-node Spec:
+
Select RAM Size
@@ -238,38 +242,45 @@
-
-
- Select #GPU(s)
-
-
-
-
-
-
-
+
+
+ Select #GPU(s)
+
+
+
+
+
+
+
-
- JVM Memory Size: {{selectedJvmMemorySize}}
-
-
-
+
+ JVM Memory Size: {{selectedJvmMemorySize}}
+
+
+
@@ -333,7 +344,9 @@
-
+
1;
}
diff --git a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts
index 503b928f95f..208d3f689e4 100644
--- a/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts
+++ b/core/gui/src/app/workspace/service/workflow-computing-unit/workflow-computing-unit-managing.service.ts
@@ -122,7 +122,18 @@ export class WorkflowComputingUnitManagingService {
diskLimit: string,
numNodes: number
): Observable {
- return this.createComputingUnit(name, cpuLimit, memoryLimit, gpuLimit, jvmMemorySize, shmSize, "", diskLimit, numNodes, "kubernetes");
+ return this.createComputingUnit(
+ name,
+ cpuLimit,
+ memoryLimit,
+ gpuLimit,
+ jvmMemorySize,
+ shmSize,
+ "",
+ diskLimit,
+ numNodes,
+ "kubernetes"
+ );
}
/**
@@ -133,7 +144,7 @@ export class WorkflowComputingUnitManagingService {
* @returns An Observable of the created WorkflowComputingUnit.
*/
public createLocalComputingUnit(name: string, uri: string): Observable {
- return this.createComputingUnit(name, "NaN", "NaN", undefined, "NaN", undefined, uri,"NaN", 1, "local");
+ return this.createComputingUnit(name, "NaN", "NaN", undefined, "NaN", undefined, uri, "NaN", 1, "local");
}
/**
From 6e613242b50f9e36c8ef4f37400806af25758193 Mon Sep 17 00:00:00 2001
From: Shengquan Ni
Date: Fri, 30 May 2025 10:04:32 -0700
Subject: [PATCH 17/29] Update computing-unit-selection.component.html
---
.../power-button/computing-unit-selection.component.html | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
index 588d69d2f41..1dbdd102931 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
@@ -352,7 +352,7 @@
nzMessage="This will start a cluster with {{ selectedNumNodes }} nodes, allocating a total
of {{ parseResourceNumber(selectedCpu) * selectedNumNodes }}{{parseResourceUnit(selectedCpu)}} CPU,
{{ parseResourceNumber(selectedMemory) * selectedNumNodes }}{{parseResourceUnit(selectedMemory)}} Memory,
- and a {{ selectedDiskSize }}GiB disk shared by all nodes."
+ and a {{ selectedDiskSize }}GiB disk shared by all nodes. Advanced configurations (e.g. GPU, Shared Memory, etc.) will be ignored."
nzShowIcon>
From 24cec03bf49a86547521e4f9c4b3b2bc1217f42d Mon Sep 17 00:00:00 2001
From: Shengquan Ni
Date: Fri, 30 May 2025 10:09:21 -0700
Subject: [PATCH 18/29] Update computing-unit-selection.component.html
---
.../power-button/computing-unit-selection.component.html | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
index 1dbdd102931..c927e18d696 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
@@ -368,7 +368,7 @@
Date: Fri, 30 May 2025 17:37:48 -0700
Subject: [PATCH 19/29] Update ComputingUnitManagingResource.scala
---
.../service/resource/ComputingUnitManagingResource.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
index 19e59fc20a3..911fed2dd28 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
@@ -281,9 +281,9 @@ class ComputingUnitManagingResource {
s"Memory quantity '${param.memoryLimit}' is not allowed. " +
s"Valid options: ${memoryLimitOptions.mkString(", ")}"
)
- if (!gpuLimitOptions.contains(param.gpuLimit))
+ if (param.gpuLimit.isDefined && !gpuLimitOptions.contains(param.gpuLimit))
throw new ForbiddenException(
- s"GPU quantity '${param.gpuLimit}' is not allowed. " +
+ s"GPU quantity '${param.gpuLimit.get}' is not allowed. " +
s"Valid options: ${gpuLimitOptions.mkString(", ")}"
)
From 5e25b55927cbca83a631b159b5aa9b20b3b930f4 Mon Sep 17 00:00:00 2001
From: Shengquan Ni
Date: Fri, 30 May 2025 17:38:41 -0700
Subject: [PATCH 20/29] Update computing-unit-selection.component.html
---
.../power-button/computing-unit-selection.component.html | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
index c927e18d696..9735137620a 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
@@ -201,7 +201,7 @@
-
Shared Disk Size
+
Total Disk Size
Date: Fri, 30 May 2025 18:52:42 -0700
Subject: [PATCH 21/29] Update computing-unit-selection.component.html
---
.../power-button/computing-unit-selection.component.html | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
index 9735137620a..7c8de3ff625 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
@@ -267,7 +267,7 @@
- Shared Memory Size
+ Shared Memory for IPC
Date: Tue, 3 Jun 2025 22:11:22 -0700
Subject: [PATCH 22/29] Update ComputingUnitManagingResource.scala
---
.../service/resource/ComputingUnitManagingResource.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
index 911fed2dd28..97c0af40657 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
@@ -151,7 +151,8 @@ class ComputingUnitManagingResource {
.parse(unit.getResource)
.as[JsObject]
.value("numNodes")
- .as[Int] > 1
+ .asOpt[Int]
+ .getOrElse(1) > 1 // for backward compatibility
}
private def getSupportedComputingUnitTypes: List[String] = {
From 72d37e58e821aff5eee0eec76acbc4f9f6721897 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Tue, 3 Jun 2025 22:23:31 -0700
Subject: [PATCH 23/29] Update ComputingUnitManagingResource.scala
---
.../service/resource/ComputingUnitManagingResource.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
index 97c0af40657..9780b0a924a 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
@@ -149,8 +149,9 @@ class ComputingUnitManagingResource {
private def isCluster(unit: WorkflowComputingUnit): Boolean = {
Json
.parse(unit.getResource)
- .as[JsObject]
- .value("numNodes")
+ .asOpt[JsObject]
+ .getOrElse(JsObject.empty)
+ .\("numNodes")
.asOpt[Int]
.getOrElse(1) > 1 // for backward compatibility
}
From a3eff4b6311f8484dfd1fd46ed46747f835a76f1 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Tue, 3 Jun 2025 22:29:37 -0700
Subject: [PATCH 24/29] Update ComputingUnitManagingResource.scala
---
.../texera/service/resource/ComputingUnitManagingResource.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
index 9780b0a924a..b95f9f518cf 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
@@ -151,7 +151,7 @@ class ComputingUnitManagingResource {
.parse(unit.getResource)
.asOpt[JsObject]
.getOrElse(JsObject.empty)
- .\("numNodes")
+ .\("numNodes") // lookup operation
.asOpt[Int]
.getOrElse(1) > 1 // for backward compatibility
}
From 0d2618d341fa588ce682133b6ade370ef70bd551 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Wed, 4 Jun 2025 13:24:31 -0700
Subject: [PATCH 25/29] Update KubernetesClient.scala
---
.../uci/ics/texera/service/util/KubernetesClient.scala | 8 --------
1 file changed, 8 deletions(-)
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
index 97f2ee64108..bf32a15a749 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
@@ -101,9 +101,6 @@ object KubernetesClient {
.getOrElse(Map.empty[String, String])
}
- // ---------------------------------------------------------------------------
- // Cluster lifecycle helpers
- // ---------------------------------------------------------------------------
def createVolume(cuid: Int, diskLimit: String): Volume = {
val pvcName = generateVolumeName(cuid)
@@ -164,10 +161,6 @@ object KubernetesClient {
deleteVolume(cuid)
}
- // ---------------------------------------------------------------------------
- // Kubernetes resource creators
- // ---------------------------------------------------------------------------
-
private def createClusterMasterService(cuid: Int): Service = {
val serviceName = generateClusterMasterServiceName(cuid)
val service = new ServiceBuilder()
@@ -305,7 +298,6 @@ object KubernetesClient {
.addToLabels("name", podName)
.addToLabels("role", "master")
- // -------------- CONTAINER -------------
val containerBuilder = new ContainerBuilder()
.withName("computing-unit-master")
.withImage(KubernetesConfig.computeUnitMasterImageName)
From ae032b8145d9b438fc666b9b7e06c93d646eb906 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Wed, 4 Jun 2025 13:37:02 -0700
Subject: [PATCH 26/29] wip
---
.../service/util/KubernetesClient.scala | 1 -
.../computing-unit-selection.component.html | 131 ++++++++++--------
.../computing-unit-selection.component.scss | 4 +
3 files changed, 74 insertions(+), 62 deletions(-)
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
index bf32a15a749..8fa5c705a50 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
@@ -101,7 +101,6 @@ object KubernetesClient {
.getOrElse(Map.empty[String, String])
}
-
def createVolume(cuid: Int, diskLimit: String): Volume = {
val pvcName = generateVolumeName(cuid)
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
index 7c8de3ff625..79b7ddb97f2 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
@@ -242,68 +242,77 @@
-
-
- Select #GPU(s)
-
-
-
-
-
-
-
+
+
+
+
+ Select #GPU(s)
+
+
+
+
+
+
+
-
-
-
- Shared Memory for IPC
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+ Shared Memory for IPC
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
JVM Memory Size: {{selectedJvmMemorySize}}
Date: Sun, 29 Jun 2025 14:29:28 -0700
Subject: [PATCH 27/29] address conflicts
---
.../uci/ics/amber/engine/common/AmberRuntime.scala | 10 +++++-----
.../edu/uci/ics/texera/web/ComputingUnitMaster.scala | 11 ++++-------
.../edu/uci/ics/texera/web/ComputingUnitWorker.scala | 5 +++--
.../edu/uci/ics/amber/config/ApplicationConfig.scala | 5 +++++
4 files changed, 17 insertions(+), 14 deletions(-)
diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
index 0a9fa45da8e..0770b6162d1 100644
--- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
@@ -21,7 +21,7 @@ package edu.uci.ics.amber.engine.common
import akka.actor.{ActorSystem, Address, Cancellable, DeadLetter, Props}
import akka.serialization.{Serialization, SerializationExtension}
-import edu.uci.ics.amber.config.AkkaConfig
+import edu.uci.ics.amber.config.{AkkaConfig, ApplicationConfig}
import com.typesafe.config.{Config, ConfigFactory}
import edu.uci.ics.amber.clustering.ClusterListener
import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor
@@ -64,8 +64,8 @@ object AmberRuntime {
var masterIpAddress = "localhost"
var masterPort = 2552
if (clusterMode) {
- masterIpAddress = AmberConfig.masterIpAddress
- masterPort = AmberConfig.masterPort
+ masterIpAddress = ApplicationConfig.masterIpAddress
+ masterPort = ApplicationConfig.masterPort
}
val masterConfig = ConfigFactory
@@ -90,8 +90,8 @@ object AmberRuntime {
var masterPort = 2552
var nodeIp = "localhost"
if (clusterMode) {
- masterIpAddress = AmberConfig.masterIpAddress
- masterPort = AmberConfig.masterPort
+ masterIpAddress = ApplicationConfig.masterIpAddress
+ masterPort = ApplicationConfig.masterPort
nodeIp = "hostname -i".!!.trim // only supported by linux/unix
}
val workerConfig = ConfigFactory
diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
index b6bc0127d0d..09c61e5fe9b 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
@@ -23,18 +23,15 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.config.{ApplicationConfig, StorageConfig}
import edu.uci.ics.amber.core.storage.DocumentFactory
+import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
-import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
- COMPLETED,
- FAILED
-}
+import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{COMPLETED, FAILED}
import edu.uci.ics.amber.engine.common.AmberRuntime.scheduleRecurringCallThroughActorSystem
import edu.uci.ics.amber.engine.common.Utils.maptoStatusCode
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.engine.common.{AmberRuntime, Utils}
-import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
import edu.uci.ics.amber.util.JSONUtils.objectMapper
import edu.uci.ics.amber.util.ObjectMapperUtils
import edu.uci.ics.texera.auth.SessionUser
@@ -42,8 +39,8 @@ import edu.uci.ics.texera.config.UserSystemConfig
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth
-import edu.uci.ics.texera.web.resource.{WebsocketPayloadSizeTuner, WorkflowWebsocketResource}
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
+import edu.uci.ics.texera.web.resource.{WebsocketPayloadSizeTuner, WorkflowWebsocketResource}
import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService
import io.dropwizard.Configuration
import io.dropwizard.setup.{Bootstrap, Environment}
@@ -74,7 +71,7 @@ object ComputingUnitMaster {
def main(args: Array[String]): Unit = {
// start actor system master node
- AmberRuntime.startActorMaster(AmberConfig.amberClusterEnabled)
+ AmberRuntime.startActorMaster(ApplicationConfig.amberClusterEnabled)
// start web server
new ComputingUnitMaster().run(
"server",
diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala
index 12182352c38..d90c5c50f73 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala
@@ -19,13 +19,14 @@
package edu.uci.ics.texera.web
-import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime}
+import edu.uci.ics.amber.config.ApplicationConfig
+import edu.uci.ics.amber.engine.common.AmberRuntime
object ComputingUnitWorker {
def main(args: Array[String]): Unit = {
// start actor system worker node
- AmberRuntime.startActorWorker(AmberConfig.amberClusterEnabled)
+ AmberRuntime.startActorWorker(ApplicationConfig.amberClusterEnabled)
}
}
diff --git a/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala b/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala
index fd007da281a..22a67133cd5 100644
--- a/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala
+++ b/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala
@@ -57,6 +57,11 @@ object ApplicationConfig {
val creditPollingIntervalInMs: Int =
getConfSource.getInt("flow-control.credit-poll-interval-in-ms")
+ // clustering
+ val amberClusterEnabled: Boolean = getConfSource.getBoolean("clustering.enabled")
+ val masterIpAddress: String = getConfSource.getString("clustering.master-ip-address")
+ val masterPort: Int = getConfSource.getInt("clustering.master-port")
+
// Network buffering
val defaultDataTransferBatchSize: Int =
getConfSource.getInt("network-buffering.default-data-transfer-batch-size")
From 9e08d9423fdc0c1b58e379a6052bb71f00295caf Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Sun, 29 Jun 2025 14:34:28 -0700
Subject: [PATCH 28/29] reformat
---
.../scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala | 5 ++++-
.../service/resource/ComputingUnitManagingResource.scala | 1 -
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
index 09c61e5fe9b..81a93f94ff2 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
@@ -26,7 +26,10 @@ import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
-import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{COMPLETED, FAILED}
+import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
+ COMPLETED,
+ FAILED
+}
import edu.uci.ics.amber.engine.common.AmberRuntime.scheduleRecurringCallThroughActorSystem
import edu.uci.ics.amber.engine.common.Utils.maptoStatusCode
import edu.uci.ics.amber.engine.common.client.AmberClient
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
index 4a2332bfe7a..ae3ddf378d7 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
@@ -25,7 +25,6 @@ import edu.uci.ics.texera.auth.{JwtAuth, SessionUser}
import edu.uci.ics.texera.config.{ComputingUnitConfig, KubernetesConfig}
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.SqlServer.withTransaction
-import edu.uci.ics.texera.dao.jooq.generated.enums.WorkflowComputingUnitTypeEnum
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.WorkflowComputingUnitDao
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowComputingUnit
import edu.uci.ics.texera.dao.jooq.generated.enums.WorkflowComputingUnitTypeEnum
From e4616284444425e5d4b8e39e2be5f29804e87ef9 Mon Sep 17 00:00:00 2001
From: Shengquan Ni <121989255@qq.com>
Date: Sun, 29 Jun 2025 14:42:12 -0700
Subject: [PATCH 29/29] add r support to worker container
---
deployment/computing-unit-worker.dockerfile | 49 +++++++++++++++++----
1 file changed, 40 insertions(+), 9 deletions(-)
diff --git a/deployment/computing-unit-worker.dockerfile b/deployment/computing-unit-worker.dockerfile
index 0c936eaf94a..323fe3ec2da 100644
--- a/deployment/computing-unit-worker.dockerfile
+++ b/deployment/computing-unit-worker.dockerfile
@@ -43,29 +43,60 @@ FROM eclipse-temurin:11-jre-jammy AS runtime
WORKDIR /core/amber
+COPY --from=build /core/amber/r-requirements.txt /tmp/r-requirements.txt
COPY --from=build /core/amber/requirements.txt /tmp/requirements.txt
COPY --from=build /core/amber/operator-requirements.txt /tmp/operator-requirements.txt
-# Install Python runtime and dependencies
+# Install Python & R runtime dependencies
RUN apt-get update && apt-get install -y \
python3-pip \
python3-dev \
libpq-dev \
+ gfortran \
+ curl \
+ build-essential \
+ libreadline-dev \
+ libncurses-dev \
+ libssl-dev \
+ libxml2-dev \
+ xorg-dev \
+ libbz2-dev \
+ liblzma-dev \
+ libpcre++-dev \
+ libpango1.0-dev \
+ libcurl4-openssl-dev \
+ unzip \
&& apt-get clean
-RUN pip3 install --upgrade pip setuptools wheel
-RUN pip3 install python-lsp-server python-lsp-server[websockets]
-
-# Install requirements with a fallback for wordcloud
-RUN pip3 install -r /tmp/requirements.txt
-RUN pip3 install --no-cache-dir --find-links https://pypi.org/simple/ -r /tmp/operator-requirements.txt || \
- pip3 install --no-cache-dir wordcloud==1.9.2
+# Install R and needed libraries
+ENV R_VERSION=4.3.3
+RUN curl -O https://cran.r-project.org/src/base/R-4/R-${R_VERSION}.tar.gz && \
+ tar -xf R-${R_VERSION}.tar.gz && \
+ cd R-${R_VERSION} && \
+ ./configure --prefix=/usr/local \
+ --enable-R-shlib \
+ --with-blas \
+ --with-lapack && \
+ make -j 4 && \
+ make install && \
+ cd .. && \
+ rm -rf R-${R_VERSION}* && R --version && pip3 install --upgrade pip setuptools wheel && \
+ pip3 install -r /tmp/requirements.txt && \
+ pip3 install -r /tmp/operator-requirements.txt && \
+ pip3 install -r /tmp/r-requirements.txt
+RUN Rscript -e "options(repos = c(CRAN = 'https://cran.r-project.org')); \
+ install.packages(c('coro', 'arrow', 'dplyr'), \
+ Ncpus = parallel::detectCores())"
+ENV LD_LIBRARY_PATH=/usr/local/lib/R/lib:$LD_LIBRARY_PATH
# Copy the built texera binary from the build phase
+COPY --from=build /.git /.git
COPY --from=build /core/amber/target/texera-0.1-SNAPSHOT /core/amber
# Copy resources directories under /core from build phase
-COPY --from=build /core/amber/src/main/resources /core/amber/src/main/resources
COPY --from=build /core/config/src/main/resources /core/config/src/main/resources
+COPY --from=build /core/amber/src/main/resources /core/amber/src/main/resources
+# Copy code for python & R UDF
+COPY --from=build /core/amber/src/main/python /core/amber/src/main/python
CMD ["bin/computing-unit-worker"]