From bb303d929db73c1524bc4ade2997967e29e51138 Mon Sep 17 00:00:00 2001 From: Lukasz Celeban Date: Wed, 27 Nov 2019 16:19:55 +0100 Subject: [PATCH 1/7] Add concept of global resources which can be shared across applications --- .../common/config/AkkeeperResource.scala | 6 ++++ .../akkeeper/common/config/Configs.scala | 17 ++++++++++ .../akkeeper/launcher/LaunchArguments.scala | 4 ++- .../akkeeper/launcher/yarn/YarnLauncher.scala | 11 ++++++- .../yarn/YarnLocalResourceManager.scala | 33 ++++++++++++------- .../deploy/yarn/YarnApplicationMaster.scala | 10 ++++++ 6 files changed, 67 insertions(+), 14 deletions(-) create mode 100644 akkeeper-common/src/main/scala/akkeeper/common/config/AkkeeperResource.scala diff --git a/akkeeper-common/src/main/scala/akkeeper/common/config/AkkeeperResource.scala b/akkeeper-common/src/main/scala/akkeeper/common/config/AkkeeperResource.scala new file mode 100644 index 0000000..5ac34f2 --- /dev/null +++ b/akkeeper-common/src/main/scala/akkeeper/common/config/AkkeeperResource.scala @@ -0,0 +1,6 @@ +package akkeeper.common.config + +import java.net.URI + +final case class AkkeeperResource(uri: URI, localPath: String, archive: Boolean = false) + diff --git a/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala b/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala index 161a65d..4a4dacd 100644 --- a/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala +++ b/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala @@ -15,6 +15,7 @@ */ package akkeeper.common.config +import java.net.URI import java.time.{Duration => JavaDuration} import java.util.concurrent.TimeUnit @@ -34,6 +35,14 @@ private[akkeeper] final class AkkeeperConfig(akkeeperConfig: Config) { Seq.empty } } + + lazy val globalResources: Seq[AkkeeperResource] = { + if (akkeeperConfig.hasPath("globalResources")) { + akkeeperConfig.getConfigList("globalResources").asScala.map(akkeeperResourceFromConfig) + } else { + Seq.empty + } + } } private[akkeeper] final class AkkeeperAkkaConfig(akkeeperAkkaConfig: Config) { @@ -120,4 +129,12 @@ object ConfigUtils { environment = config.getMapOfStrings("environment") ) } + + private[config] def akkeeperResourceFromConfig(config: Config): AkkeeperResource = { + AkkeeperResource( + new URI(config.getString("uri")), + config.getString("localPath"), + config.getBoolean("archive") + ) + } } diff --git a/akkeeper-launcher/src/main/scala/akkeeper/launcher/LaunchArguments.scala b/akkeeper-launcher/src/main/scala/akkeeper/launcher/LaunchArguments.scala index 0e8b406..d0f44bc 100644 --- a/akkeeper-launcher/src/main/scala/akkeeper/launcher/LaunchArguments.scala +++ b/akkeeper-launcher/src/main/scala/akkeeper/launcher/LaunchArguments.scala @@ -17,13 +17,15 @@ package akkeeper.launcher import java.net.URI +import akkeeper.common.config.AkkeeperResource +import akkeeper.launcher.LaunchArguments._ import com.typesafe.config.Config -import LaunchArguments._ final case class LaunchArguments(akkeeperJarPath: URI = new URI("."), userJar: URI = new URI("."), otherJars: Seq[URI] = Seq.empty, resources: Seq[URI] = Seq.empty, + globalResources: Seq[AkkeeperResource] = Seq.empty, masterJvmArgs: Seq[String] = Seq.empty, userConfig: Option[Config] = None, pollInterval: Long = DefaultPollInterval, diff --git a/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala b/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala index 34712c6..8fb0d58 100644 --- a/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala +++ b/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala @@ -154,6 +154,13 @@ final class YarnLauncher(yarnConf: YarnConfiguration, config.withValue("akkeeper.yarn.staging-directory", ConfigValueFactory.fromAnyRef(stagingDir)) } + private def addGlobalResourcesToUserConfig(config: Config, resources: Seq[AkkeeperResource]): Config = { + val configValue = resources.map { r => + Map("uri" -> r.uri.toString, "localPath" -> r.localPath, "archive" -> r.archive).asJava + } + config.withValue("akkeeper.globalResources", ConfigValueFactory.fromAnyRef(configValue.asJava)) + } + private def launchWithClient(yarnClient: YarnLauncherClient, config: Config, args: LaunchArguments): Future[LaunchResult] = { @@ -176,7 +183,9 @@ final class YarnLauncher(yarnConf: YarnConfiguration, val baseStagingDir = config.yarn.stagingDirectory.getOrElse(YarnUtils.defaultStagingDirectory(yarnConf)) val stagingDir = YarnUtils.appStagingDirectory(yarnConf, Some(baseStagingDir), appId.toString) - val updatedUserConfig = args.userConfig.map(addStagingDirToUserConfig(_, baseStagingDir)) + val updatedUserConfig = args.userConfig + .map(addStagingDirToUserConfig(_, baseStagingDir)) + .map(addGlobalResourcesToUserConfig(_, args.globalResources)) val localResources = buildLocalResources(stagingDir, args, updatedUserConfig) val cmd = buildCmd(appId, config, args) logger.debug(s"Akkeeper Master command: ${cmd.mkString(" ")}") diff --git a/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala b/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala index 64aea92..6d4fc62 100644 --- a/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala +++ b/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala @@ -37,16 +37,19 @@ private[akkeeper] final class YarnLocalResourceManager(conf: Configuration, } } - private def create(fs: FileSystem, status: FileStatus): LocalResource = { + private def create(fs: FileSystem, + status: FileStatus, + localResourceType: LocalResourceType, + localResourceVisibility: LocalResourceVisibility): LocalResource = { LocalResource.newInstance( - ConverterUtils.getYarnUrlFromURI(fs.makeQualified(status.getPath).toUri), - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + ConverterUtils.getYarnUrlFromURI(fs. makeQualified(status.getPath).toUri), + localResourceType, localResourceVisibility, status.getLen, status.getModificationTime ) } - private def copyResourceToStagingDir(dstFs: FileSystem, srcStream: InputStream, - dstPath: String): Path = { + private def copyResourceToStagingDir(srcStream: InputStream, dstPath: String): Path = { + val dstFs = stagingDirPath.getFileSystem(conf) val dst = new Path(stagingDirPath, dstPath) withStream(dstFs.create(dst)) { out => IOUtils.copy(srcStream, out) @@ -55,11 +58,19 @@ private[akkeeper] final class YarnLocalResourceManager(conf: Configuration, dst } + def getExistingResource( + dstPath: Path, + localResourceType: LocalResourceType = LocalResourceType.FILE, + localResourceVisibility: LocalResourceVisibility = LocalResourceVisibility.APPLICATION + ): LocalResource = { + val dstFs = dstPath.getFileSystem(conf) + val dstStatus = dstFs.getFileStatus(dstPath) + create(dstFs, dstStatus, localResourceType, localResourceVisibility) + } + def createLocalResource(srcStream: InputStream, dstPath: String): LocalResource = { - val dstFs = stagingDirPath.getFileSystem(conf) - val dst = copyResourceToStagingDir(dstFs, srcStream, dstPath) - val dstStatus = dstFs.getFileStatus(dst) - create(dstFs, dstStatus) + val dst = copyResourceToStagingDir(srcStream, dstPath) + getExistingResource(dst) } def createLocalResource(srcPath: String, dstPath: String): LocalResource = { @@ -71,9 +82,7 @@ private[akkeeper] final class YarnLocalResourceManager(conf: Configuration, } def getExistingLocalResource(dstPath: Path): LocalResource = { - val fs = dstPath.getFileSystem(conf) - val dstStatus = fs.getFileStatus(new Path(stagingDirPath, dstPath)) - create(fs, dstStatus) + getExistingResource(new Path(stagingDirPath, dstPath)) } def getExistingLocalResource(dstPath: String): LocalResource = { diff --git a/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala b/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala index 89fa87c..59077d0 100644 --- a/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala +++ b/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala @@ -119,9 +119,19 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi // Retrieve a content of the resources/ directory. addExistingResources(LocalResourceNames.ResourcesDirName) + localResources ++= getAkkeeperGlobalResources() localResources.toMap } + private def getAkkeeperGlobalResources(): Map[String, LocalResource] = { + config.config.akkeeper.globalResources.map { r => + val resourceType = if (r.archive) LocalResourceType.ARCHIVE else LocalResourceType.FILE + val resource = localResourceManager.getExistingResource( + new Path(r.uri), resourceType, LocalResourceVisibility.PRIVATE) + (r.localPath, resource) + }.toMap + } + private def buildActorLaunchContextResource(containerDefinition: ContainerDefinition, instanceId: InstanceId): LocalResource = { import spray.json._ From 2bc46c5056465c35ebb7350191c8fc79f64334a0 Mon Sep 17 00:00:00 2001 From: Lukasz Celeban Date: Thu, 19 Dec 2019 13:00:43 +0100 Subject: [PATCH 2/7] rename getExistingResource and getExistingLocalResource --- .../akkeeper/yarn/YarnLocalResourceManager.scala | 14 +++++++------- .../yarn/YarnLocalResourceManagerSpec.scala | 2 +- .../deploy/yarn/YarnApplicationMaster.scala | 12 ++++++------ 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala b/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala index 6d4fc62..50b8822 100644 --- a/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala +++ b/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala @@ -42,7 +42,7 @@ private[akkeeper] final class YarnLocalResourceManager(conf: Configuration, localResourceType: LocalResourceType, localResourceVisibility: LocalResourceVisibility): LocalResource = { LocalResource.newInstance( - ConverterUtils.getYarnUrlFromURI(fs. makeQualified(status.getPath).toUri), + ConverterUtils.getYarnUrlFromURI(fs.makeQualified(status.getPath).toUri), localResourceType, localResourceVisibility, status.getLen, status.getModificationTime ) @@ -58,7 +58,7 @@ private[akkeeper] final class YarnLocalResourceManager(conf: Configuration, dst } - def getExistingResource( + def getLocalResource( dstPath: Path, localResourceType: LocalResourceType = LocalResourceType.FILE, localResourceVisibility: LocalResourceVisibility = LocalResourceVisibility.APPLICATION @@ -70,7 +70,7 @@ private[akkeeper] final class YarnLocalResourceManager(conf: Configuration, def createLocalResource(srcStream: InputStream, dstPath: String): LocalResource = { val dst = copyResourceToStagingDir(srcStream, dstPath) - getExistingResource(dst) + getLocalResource(dst) } def createLocalResource(srcPath: String, dstPath: String): LocalResource = { @@ -81,11 +81,11 @@ private[akkeeper] final class YarnLocalResourceManager(conf: Configuration, } } - def getExistingLocalResource(dstPath: Path): LocalResource = { - getExistingResource(new Path(stagingDirPath, dstPath)) + def getLocalResourceFromStagingDir(dstPath: Path): LocalResource = { + getLocalResource(new Path(stagingDirPath, dstPath)) } - def getExistingLocalResource(dstPath: String): LocalResource = { - getExistingLocalResource(new Path(dstPath)) + def getLocalResourceFromStagingDir(dstPath: String): LocalResource = { + getLocalResourceFromStagingDir(new Path(dstPath)) } } diff --git a/akkeeper-yarn/src/test/scala/akkeeper/yarn/YarnLocalResourceManagerSpec.scala b/akkeeper-yarn/src/test/scala/akkeeper/yarn/YarnLocalResourceManagerSpec.scala index 8c1f0d2..bff5dfd 100644 --- a/akkeeper-yarn/src/test/scala/akkeeper/yarn/YarnLocalResourceManagerSpec.scala +++ b/akkeeper-yarn/src/test/scala/akkeeper/yarn/YarnLocalResourceManagerSpec.scala @@ -101,7 +101,7 @@ class YarnLocalResourceManagerSpec extends FlatSpec with Matchers with BeforeAnd val expectedPath = new Path(stagingDir, expectedFileName).toString manager.createLocalResource(resource, expectedFileName) - val actualResult = manager.getExistingLocalResource(expectedFileName) + val actualResult = manager.getLocalResourceFromStagingDir(expectedFileName) validateLocalResource(actualResult, expectedPath) validateResourcePayload("/application-container-test.conf", expectedPath) } diff --git a/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala b/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala index 59077d0..fa10342 100644 --- a/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala +++ b/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala @@ -76,7 +76,7 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi // Distribute the user configuration. try { - val instanceConfigResource = localResourceManager.getExistingLocalResource( + val instanceConfigResource = localResourceManager.getLocalResourceFromStagingDir( LocalResourceNames.UserConfigName) localResources.put(LocalResourceNames.UserConfigName, instanceConfigResource) } catch { @@ -86,18 +86,18 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi // Retrieve the Akkeeper Assembly jar. val akkeeperJarResource = localResourceManager - .getExistingLocalResource(LocalResourceNames.AkkeeperJarName) + .getLocalResourceFromStagingDir(LocalResourceNames.AkkeeperJarName) localResources.put(LocalResourceNames.AkkeeperJarName, akkeeperJarResource) // Retrieve the user jar. val userJarResource = localResourceManager - .getExistingLocalResource(LocalResourceNames.UserJarName) + .getLocalResourceFromStagingDir(LocalResourceNames.UserJarName) localResources.put(LocalResourceNames.UserJarName, userJarResource) // Retrieve the keytab if present. config.principal.foreach(_ => { val keytabResource = localResourceManager - .getExistingLocalResource(LocalResourceNames.KeytabName) + .getLocalResourceFromStagingDir(LocalResourceNames.KeytabName) localResources.put(LocalResourceNames.KeytabName, keytabResource) }) @@ -107,7 +107,7 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi val resources = fs.listStatus(new Path(stagingDirectory, directory)) resources.foreach(status => { val fileName = directory + "/" + status.getPath.getName - val resource = localResourceManager.getExistingLocalResource(fileName) + val resource = localResourceManager.getLocalResourceFromStagingDir(fileName) localResources.put(fileName, resource) }) } catch { @@ -126,7 +126,7 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi private def getAkkeeperGlobalResources(): Map[String, LocalResource] = { config.config.akkeeper.globalResources.map { r => val resourceType = if (r.archive) LocalResourceType.ARCHIVE else LocalResourceType.FILE - val resource = localResourceManager.getExistingResource( + val resource = localResourceManager.getLocalResource( new Path(r.uri), resourceType, LocalResourceVisibility.PRIVATE) (r.localPath, resource) }.toMap From 48aa9c4bb0b8645c83b37d1d2e1194cd0592866e Mon Sep 17 00:00:00 2001 From: Michal Zak Date: Thu, 19 Dec 2019 15:48:20 -0800 Subject: [PATCH 3/7] Introduce launcher config --- .../akkeeper/launcher/yarn/YarnLauncher.scala | 54 ++++++++++--------- .../akkeeper/yarn/LocalResourceNames.scala | 1 + .../scala/akkeeper/master/MasterRunner.scala | 8 ++- 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala b/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala index 8fb0d58..b03ab0d 100644 --- a/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala +++ b/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala @@ -24,7 +24,7 @@ import akkeeper.common.config._ import akkeeper.launcher._ import akkeeper.yarn._ import akkeeper.yarn.client.YarnLauncherClient -import com.typesafe.config.{Config, ConfigRenderOptions, ConfigValueFactory} +import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueFactory} import org.apache.commons.io.FilenameUtils import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -94,40 +94,43 @@ final class YarnLauncher(yarnConf: YarnConfiguration, private def buildLocalResources(stagingDir: String, args: LaunchArguments, - userConfig: Option[Config]): util.HashMap[String, LocalResource] = { - val resourceManger = new YarnLocalResourceManager(yarnConf, stagingDir) + launcherConfig: Config): util.HashMap[String, LocalResource] = { + val resourceManager = new YarnLocalResourceManager(yarnConf, stagingDir) val localResources = new util.HashMap[String, LocalResource]() - val akkeeperJar = resourceManger.createLocalResource(args.akkeeperJarPath.toString, + def uploadConfig(localResourceName: String, config: Config): Unit = { + val userConfigString = config.root().render(ConfigRenderOptions.concise()) + val configResource = resourceManager.createLocalResource( + new ByteArrayInputStream(userConfigString.getBytes("UTF-8")), localResourceName) + localResources.put(localResourceName, configResource) + } + + val akkeeperJar = resourceManager.createLocalResource(args.akkeeperJarPath.toString, LocalResourceNames.AkkeeperJarName) localResources.put(LocalResourceNames.AkkeeperJarName, akkeeperJar) // Add a user jar to the staging directory. No need to include it // into master local resources. - resourceManger.createLocalResource(args.userJar.toString, + resourceManager.createLocalResource(args.userJar.toString, LocalResourceNames.UserJarName) // Just upload the third-party jars. No need to include them // into master local resources. - uploadGenericFiles(args.otherJars, LocalResourceNames.ExtraJarsDirName, resourceManger) + uploadGenericFiles(args.otherJars, LocalResourceNames.ExtraJarsDirName, resourceManager) // Distribute resources. - uploadGenericFiles(args.resources, LocalResourceNames.ResourcesDirName, resourceManger) + uploadGenericFiles(args.resources, LocalResourceNames.ResourcesDirName, resourceManager) args.principal.foreach(_ => { // Distribute keytab. - val keytabResource = resourceManger.createLocalResource(args.keytab.get.toString, + val keytabResource = resourceManager.createLocalResource(args.keytab.get.toString, LocalResourceNames.KeytabName) localResources.put(LocalResourceNames.KeytabName, keytabResource) }) - userConfig.foreach(config => { - val userConfigString = config.root().render(ConfigRenderOptions.concise()) - val configResource = resourceManger.createLocalResource( - new ByteArrayInputStream(userConfigString.getBytes("UTF-8")), - LocalResourceNames.UserConfigName) - localResources.put(LocalResourceNames.UserConfigName, configResource) - }) + args.userConfig.foreach(uploadConfig(LocalResourceNames.UserConfigName, _)) + uploadConfig(LocalResourceNames.LauncherConfigName, launcherConfig) + localResources } @@ -150,15 +153,16 @@ final class YarnLauncher(yarnConf: YarnConfiguration, YarnUtils.buildCmd(mainClass, jvmArgs = jvmArgs, appArgs = appArgs) } - private def addStagingDirToUserConfig(config: Config, stagingDir: String): Config = { - config.withValue("akkeeper.yarn.staging-directory", ConfigValueFactory.fromAnyRef(stagingDir)) - } - - private def addGlobalResourcesToUserConfig(config: Config, resources: Seq[AkkeeperResource]): Config = { - val configValue = resources.map { r => + private def buildLauncherConfig(stagingDir: String, resources: Seq[AkkeeperResource]): Config = { + val globalResourcesConfigValue = resources.map { r => Map("uri" -> r.uri.toString, "localPath" -> r.localPath, "archive" -> r.archive).asJava } - config.withValue("akkeeper.globalResources", ConfigValueFactory.fromAnyRef(configValue.asJava)) + ConfigFactory.parseMap( + Map( + "akkeeper.yarn.staging-directory" -> stagingDir, + "akkeeper.globalResources" -> globalResourcesConfigValue.asJava + ).asJava + ) } private def launchWithClient(yarnClient: YarnLauncherClient, @@ -183,10 +187,8 @@ final class YarnLauncher(yarnConf: YarnConfiguration, val baseStagingDir = config.yarn.stagingDirectory.getOrElse(YarnUtils.defaultStagingDirectory(yarnConf)) val stagingDir = YarnUtils.appStagingDirectory(yarnConf, Some(baseStagingDir), appId.toString) - val updatedUserConfig = args.userConfig - .map(addStagingDirToUserConfig(_, baseStagingDir)) - .map(addGlobalResourcesToUserConfig(_, args.globalResources)) - val localResources = buildLocalResources(stagingDir, args, updatedUserConfig) + val launcherConfig = buildLauncherConfig(baseStagingDir, args.globalResources) + val localResources = buildLocalResources(stagingDir, args, launcherConfig) val cmd = buildCmd(appId, config, args) logger.debug(s"Akkeeper Master command: ${cmd.mkString(" ")}") diff --git a/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala b/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala index ad1ddaa..9561eed 100644 --- a/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala +++ b/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala @@ -19,6 +19,7 @@ private[akkeeper] object LocalResourceNames { val AkkeeperJarName = "akkeeper.jar" val UserJarName = "user.jar" val UserConfigName = "user_config.conf" + val LauncherConfigName = "launcher_config.conf" val ActorLaunchContextsName = "actors.json" val KeytabName = "akkeeper.keytab" diff --git a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala index b226257..7da147c 100644 --- a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala +++ b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala @@ -15,6 +15,8 @@ */ package akkeeper.master +import java.io.File + import akka.actor._ import akka.cluster.Cluster import akka.http.scaladsl.Http @@ -121,8 +123,10 @@ private[master] class YarnMasterRunner extends MasterRunner { def run(masterArgs: MasterArguments): Unit = { val config = masterArgs.config - .map(c => ConfigFactory.parseFile(c).withFallback(ConfigFactory.load())) - .getOrElse(ConfigFactory.load()) + .map(c => ConfigFactory.parseFile(c)) + .getOrElse(ConfigFactory.empty()) + .withFallback(ConfigFactory.parseFile(new File(LocalResourceNames.LauncherConfigName))) + .withFallback(ConfigFactory.load()) // Create and start the Kerberos ticket renewer if necessary. val ticketRenewer = masterArgs.principal.map(principal => { From a724f0ad4c2fc8770d6f8cade493b3faed03d8bb Mon Sep 17 00:00:00 2001 From: Lukasz Celeban Date: Mon, 23 Dec 2019 19:50:45 +0100 Subject: [PATCH 4/7] Rename config properties --- .../src/main/scala/akkeeper/common/config/Configs.scala | 6 +++--- .../main/scala/akkeeper/launcher/yarn/YarnLauncher.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala b/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala index 4a4dacd..5c6f0a0 100644 --- a/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala +++ b/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala @@ -37,8 +37,8 @@ private[akkeeper] final class AkkeeperConfig(akkeeperConfig: Config) { } lazy val globalResources: Seq[AkkeeperResource] = { - if (akkeeperConfig.hasPath("globalResources")) { - akkeeperConfig.getConfigList("globalResources").asScala.map(akkeeperResourceFromConfig) + if (akkeeperConfig.hasPath("global-resources")) { + akkeeperConfig.getConfigList("global-resources").asScala.map(akkeeperResourceFromConfig) } else { Seq.empty } @@ -133,7 +133,7 @@ object ConfigUtils { private[config] def akkeeperResourceFromConfig(config: Config): AkkeeperResource = { AkkeeperResource( new URI(config.getString("uri")), - config.getString("localPath"), + config.getString("local-path"), config.getBoolean("archive") ) } diff --git a/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala b/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala index b03ab0d..2d89b62 100644 --- a/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala +++ b/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala @@ -155,12 +155,12 @@ final class YarnLauncher(yarnConf: YarnConfiguration, private def buildLauncherConfig(stagingDir: String, resources: Seq[AkkeeperResource]): Config = { val globalResourcesConfigValue = resources.map { r => - Map("uri" -> r.uri.toString, "localPath" -> r.localPath, "archive" -> r.archive).asJava + Map("uri" -> r.uri.toString, "local-path" -> r.localPath, "archive" -> r.archive).asJava } ConfigFactory.parseMap( Map( "akkeeper.yarn.staging-directory" -> stagingDir, - "akkeeper.globalResources" -> globalResourcesConfigValue.asJava + "akkeeper.global-resources" -> globalResourcesConfigValue.asJava ).asJava ) } From 0ca720a23b472fed5007b29bd010bf656e11b6c0 Mon Sep 17 00:00:00 2001 From: Lukasz Celeban Date: Mon, 23 Dec 2019 19:55:31 +0100 Subject: [PATCH 5/7] Rename createLocalResource to uploadLocalResource --- .../scala/akkeeper/launcher/yarn/YarnLauncher.scala | 10 +++++----- .../scala/akkeeper/yarn/YarnLocalResourceManager.scala | 6 +++--- .../akkeeper/yarn/YarnLocalResourceManagerSpec.scala | 10 +++++----- .../akkeeper/deploy/yarn/YarnApplicationMaster.scala | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala b/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala index 2d89b62..0215cdc 100644 --- a/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala +++ b/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala @@ -88,7 +88,7 @@ final class YarnLauncher(yarnConf: YarnConfiguration, files.foreach(file => { val fileName = FilenameUtils.getName(file.getPath) val localPath = dstLocalDir + "/" + fileName - resourceManger.createLocalResource(file.toString, localPath) + resourceManger.uploadLocalResource(file.toString, localPath) }) } @@ -100,18 +100,18 @@ final class YarnLauncher(yarnConf: YarnConfiguration, def uploadConfig(localResourceName: String, config: Config): Unit = { val userConfigString = config.root().render(ConfigRenderOptions.concise()) - val configResource = resourceManager.createLocalResource( + val configResource = resourceManager.uploadLocalResource( new ByteArrayInputStream(userConfigString.getBytes("UTF-8")), localResourceName) localResources.put(localResourceName, configResource) } - val akkeeperJar = resourceManager.createLocalResource(args.akkeeperJarPath.toString, + val akkeeperJar = resourceManager.uploadLocalResource(args.akkeeperJarPath.toString, LocalResourceNames.AkkeeperJarName) localResources.put(LocalResourceNames.AkkeeperJarName, akkeeperJar) // Add a user jar to the staging directory. No need to include it // into master local resources. - resourceManager.createLocalResource(args.userJar.toString, + resourceManager.uploadLocalResource(args.userJar.toString, LocalResourceNames.UserJarName) // Just upload the third-party jars. No need to include them @@ -123,7 +123,7 @@ final class YarnLauncher(yarnConf: YarnConfiguration, args.principal.foreach(_ => { // Distribute keytab. - val keytabResource = resourceManager.createLocalResource(args.keytab.get.toString, + val keytabResource = resourceManager.uploadLocalResource(args.keytab.get.toString, LocalResourceNames.KeytabName) localResources.put(LocalResourceNames.KeytabName, keytabResource) }) diff --git a/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala b/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala index 50b8822..dcd0008 100644 --- a/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala +++ b/akkeeper-yarn/src/main/scala/akkeeper/yarn/YarnLocalResourceManager.scala @@ -68,16 +68,16 @@ private[akkeeper] final class YarnLocalResourceManager(conf: Configuration, create(dstFs, dstStatus, localResourceType, localResourceVisibility) } - def createLocalResource(srcStream: InputStream, dstPath: String): LocalResource = { + def uploadLocalResource(srcStream: InputStream, dstPath: String): LocalResource = { val dst = copyResourceToStagingDir(srcStream, dstPath) getLocalResource(dst) } - def createLocalResource(srcPath: String, dstPath: String): LocalResource = { + def uploadLocalResource(srcPath: String, dstPath: String): LocalResource = { val path = new Path(srcPath) val srcFs = path.getFileSystem(conf) withStream(srcFs.open(path)) { srcStream => - createLocalResource(srcStream, dstPath) + uploadLocalResource(srcStream, dstPath) } } diff --git a/akkeeper-yarn/src/test/scala/akkeeper/yarn/YarnLocalResourceManagerSpec.scala b/akkeeper-yarn/src/test/scala/akkeeper/yarn/YarnLocalResourceManagerSpec.scala index bff5dfd..5f514e3 100644 --- a/akkeeper-yarn/src/test/scala/akkeeper/yarn/YarnLocalResourceManagerSpec.scala +++ b/akkeeper-yarn/src/test/scala/akkeeper/yarn/YarnLocalResourceManagerSpec.scala @@ -63,7 +63,7 @@ class YarnLocalResourceManagerSpec extends FlatSpec with Matchers with BeforeAnd val expectedFileName = UUID.randomUUID().toString val expectedPath = new Path(stagingDir, expectedFileName).toString - val actualResult = manager.createLocalResource(resource, expectedFileName) + val actualResult = manager.uploadLocalResource(resource, expectedFileName) validateLocalResource(actualResult, expectedPath) validateResourcePayload("/application-container-test.conf", expectedPath) } @@ -74,7 +74,7 @@ class YarnLocalResourceManagerSpec extends FlatSpec with Matchers with BeforeAnd val expectedFileName = UUID.randomUUID().toString val expectedPath = new Path(stagingDir, expectedFileName).toString - val actualResult = manager.createLocalResource(resource, expectedFileName) + val actualResult = manager.uploadLocalResource(resource, expectedFileName) validateLocalResource(actualResult, expectedPath) validateResourcePayload("/application-container-test.conf", expectedPath) resource.close() @@ -86,10 +86,10 @@ class YarnLocalResourceManagerSpec extends FlatSpec with Matchers with BeforeAnd val expectedFileName = UUID.randomUUID().toString val expectedPath = new Path(stagingDir, expectedFileName).toString - manager.createLocalResource(resource, expectedFileName) + manager.uploadLocalResource(resource, expectedFileName) val newExpectedFileName = UUID.randomUUID().toString val newExpectedPath = new Path(stagingDir, newExpectedFileName).toString - val actualResult = manager.createLocalResource(expectedPath, newExpectedFileName) + val actualResult = manager.uploadLocalResource(expectedPath, newExpectedFileName) validateLocalResource(actualResult, newExpectedPath) validateResourcePayload("/application-container-test.conf", newExpectedPath) } @@ -100,7 +100,7 @@ class YarnLocalResourceManagerSpec extends FlatSpec with Matchers with BeforeAnd val expectedFileName = UUID.randomUUID().toString val expectedPath = new Path(stagingDir, expectedFileName).toString - manager.createLocalResource(resource, expectedFileName) + manager.uploadLocalResource(resource, expectedFileName) val actualResult = manager.getLocalResourceFromStagingDir(expectedFileName) validateLocalResource(actualResult, expectedPath) validateResourcePayload("/application-container-test.conf", expectedPath) diff --git a/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala b/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala index fa10342..e4dd687 100644 --- a/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala +++ b/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala @@ -137,7 +137,7 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi import spray.json._ import akkeeper.api.ContainerDefinitionJsonProtocol._ val jsonStr = containerDefinition.actors.toJson.compactPrint - localResourceManager.createLocalResource(new ByteArrayInputStream(jsonStr.getBytes("UTF-8")), + localResourceManager.uploadLocalResource(new ByteArrayInputStream(jsonStr.getBytes("UTF-8")), s"actors_$instanceId.json") } From c9210aa4738b9050a0695aa849595bc21a8e2623 Mon Sep 17 00:00:00 2001 From: Lukasz Celeban Date: Mon, 23 Dec 2019 19:56:52 +0100 Subject: [PATCH 6/7] Rename launcher_config.conf to application.conf --- .../src/main/scala/akkeeper/yarn/LocalResourceNames.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala b/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala index 9561eed..51fc01a 100644 --- a/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala +++ b/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala @@ -19,7 +19,7 @@ private[akkeeper] object LocalResourceNames { val AkkeeperJarName = "akkeeper.jar" val UserJarName = "user.jar" val UserConfigName = "user_config.conf" - val LauncherConfigName = "launcher_config.conf" + val LauncherConfigName = "application.conf" val ActorLaunchContextsName = "actors.json" val KeytabName = "akkeeper.keytab" From 33114613cbae9dddf0530989c4d4a1f17c341ef5 Mon Sep 17 00:00:00 2001 From: Lukasz Celeban Date: Tue, 24 Dec 2019 13:27:17 +0100 Subject: [PATCH 7/7] rename LauncherConfigName to ApplicationConfigName --- .../src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala | 2 +- .../src/main/scala/akkeeper/yarn/LocalResourceNames.scala | 2 +- akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala b/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala index 0215cdc..018897c 100644 --- a/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala +++ b/akkeeper-launcher/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala @@ -129,7 +129,7 @@ final class YarnLauncher(yarnConf: YarnConfiguration, }) args.userConfig.foreach(uploadConfig(LocalResourceNames.UserConfigName, _)) - uploadConfig(LocalResourceNames.LauncherConfigName, launcherConfig) + uploadConfig(LocalResourceNames.ApplicationConfigName, launcherConfig) localResources } diff --git a/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala b/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala index 51fc01a..4906c49 100644 --- a/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala +++ b/akkeeper-yarn/src/main/scala/akkeeper/yarn/LocalResourceNames.scala @@ -19,7 +19,7 @@ private[akkeeper] object LocalResourceNames { val AkkeeperJarName = "akkeeper.jar" val UserJarName = "user.jar" val UserConfigName = "user_config.conf" - val LauncherConfigName = "application.conf" + val ApplicationConfigName = "application.conf" val ActorLaunchContextsName = "actors.json" val KeytabName = "akkeeper.keytab" diff --git a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala index 7da147c..859f5fd 100644 --- a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala +++ b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala @@ -125,7 +125,7 @@ private[master] class YarnMasterRunner extends MasterRunner { val config = masterArgs.config .map(c => ConfigFactory.parseFile(c)) .getOrElse(ConfigFactory.empty()) - .withFallback(ConfigFactory.parseFile(new File(LocalResourceNames.LauncherConfigName))) + .withFallback(ConfigFactory.parseFile(new File(LocalResourceNames.ApplicationConfigName))) .withFallback(ConfigFactory.load()) // Create and start the Kerberos ticket renewer if necessary.