From a7b27f051cbdc01f1e5fa02f551bceb68ec15de9 Mon Sep 17 00:00:00 2001 From: okumin Date: Fri, 28 Feb 2025 17:08:34 +0900 Subject: [PATCH 1/3] Add test cases --- .../org/apache/tez/mapreduce/TestMRRJobs.java | 69 ++++++++++++++++++- .../org/apache/tez/test/MiniTezCluster.java | 13 ++-- 2 files changed, 73 insertions(+), 9 deletions(-) diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java index c00ea36cbc..07a4bb00a3 100644 --- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java @@ -21,6 +21,9 @@ import java.io.File; import java.io.IOException; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +91,7 @@ public static void setup() throws IOException { conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l); conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); + //conf.setLong(TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS, 500); mrrTezCluster.init(conf); mrrTezCluster.start(); } @@ -106,6 +110,32 @@ public static void tearDown() { } } + private void assertStagingDir() throws IOException, InterruptedException { + // Wait for the clean-up process to be invoked + while (true) { + int numAllocatedCores = mrrTezCluster.getResourceManager().getResourceScheduler().getRootQueueMetrics() + .getAllocatedVirtualCores(); + LOG.info("Number of cores in use: {}", numAllocatedCores); + if (numAllocatedCores == 0) { + break; + } + Thread.sleep(100L); + } + + String userName = UserGroupInformation.getCurrentUser().getUserName(); + Path userStagingDir = new Path(String.format("%s/%s/.staging", mrrTezCluster.getStagingPath(), userName)); + + Assert.assertTrue(remoteFs.exists(userStagingDir)); + + RemoteIterator directoryTree = remoteFs.listFiles(userStagingDir, true); + int numFiles = 0; + while (directoryTree.hasNext()) { + numFiles += 1; + LOG.info("Path in the staging dir: {}", directoryTree.next().getPath()); + } + Assert.assertEquals(0, numFiles); + } + @Test (timeout = 60000) public void testMRRSleepJob() throws IOException, InterruptedException, ClassNotFoundException { @@ -140,6 +170,7 @@ public void testMRRSleepJob() throws IOException, InterruptedException, Assert.assertTrue("Tracking URL was " + trackingUrl + " but didn't Match Job ID " + jobId , trackingUrl.contains(jobId.substring(jobId.indexOf("_")))); + assertStagingDir(); // FIXME once counters and task progress can be obtained properly // TODO use dag client to test counters and task progress? @@ -190,7 +221,7 @@ public void testRandomWriter() throws IOException, InterruptedException, } } Assert.assertEquals("Number of part files is wrong!", 3, count); - + assertStagingDir(); } @@ -223,6 +254,7 @@ public void testFailingJob() throws IOException, InterruptedException, boolean succeeded = job.waitForCompletion(true); Assert.assertFalse(succeeded); Assert.assertEquals(JobStatus.State.FAILED, job.getJobState()); + assertStagingDir(); // FIXME once counters and task progress can be obtained properly // TODO verify failed task diagnostics @@ -257,11 +289,44 @@ public void testFailingAttempt() throws IOException, InterruptedException, boolean succeeded = job.waitForCompletion(true); Assert.assertTrue(succeeded); Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + assertStagingDir(); // FIXME once counters and task progress can be obtained properly // TODO verify failed task diagnostics } + @Test (timeout = 60000) + public void testFailingSubmission() throws IOException, InterruptedException, + ClassNotFoundException { + + LOG.info("\n\n\nStarting testFailingSubmission()."); + + if (!(new File(MiniTezCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniTezCluster.APPJAR + + " not found. Not running test."); + return; + } + + Configuration sleepConf = new Configuration(mrrTezCluster.getConfig()); + + MRRSleepJob sleepJob = new MRRSleepJob(); + sleepJob.setConf(sleepConf); + + Job job = sleepJob.createJob(1, 1, 1, 1, 1, + 1, 1, 1, 1, 1); + + job.setJarByClass(MRRSleepJob.class); + job.setMaxMapAttempts(1); // speed up failures + job.getConfiguration().set(JobContext.QUEUE_NAME, "non-existent-queue"); + + try { + job.submit(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("to unknown queue: non-existent-queue")); + } + assertStagingDir(); + } + @Test (timeout = 60000) public void testMRRSleepJobWithCompression() throws IOException, InterruptedException, ClassNotFoundException { @@ -298,6 +363,7 @@ public void testMRRSleepJobWithCompression() throws IOException, Assert.assertTrue("Tracking URL was " + trackingUrl + " but didn't Match Job ID " + jobId , trackingUrl.contains(jobId.substring(jobId.indexOf("_")))); + assertStagingDir(); // FIXME once counters and task progress can be obtained properly // TODO use dag client to test counters and task progress? @@ -354,6 +420,7 @@ public Void run() throws Exception { Assert.assertTrue("Tracking URL was " + trackingUrl + " but didn't Match Job ID " + jobId , trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); + assertStagingDir(); return null; } }); diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java index 9af1e604b2..b48f6b8582 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java +++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java @@ -72,7 +72,7 @@ public class MiniTezCluster extends MiniYARNCluster { private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml"; - private Path confFilePath; + private Path stagingPath; private long maxTimeToWaitForAppsOnShutdown; @@ -150,8 +150,7 @@ public void serviceInit(Configuration conf) throws Exception { conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000); try { - Path stagingPath = FileContext.getFileContext(conf).makeQualified( - new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR))); + stagingPath = FileContext.getFileContext(conf).makeQualified(new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR))); /* * Re-configure the staging path on Windows if the file system is localFs. * We need to use a absolute path that contains the drive letter. The unit @@ -211,7 +210,7 @@ public void serviceStart() throws Exception { File workDir = super.getTestWorkDir(); Configuration conf = super.getConfig(); - confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG); + Path confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG); File confFile = new File(confFilePath.toString()); try { confFile.createNewFile(); @@ -222,7 +221,6 @@ public void serviceStart() throws Exception { e.printStackTrace(); throw new RuntimeException(e); } - confFilePath = new Path(confFile.getAbsolutePath()); conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, workDir.getAbsolutePath(), System.getProperty("java.class.path")); LOG.info("Setting yarn-site.xml via YARN-APP-CP at: " @@ -311,8 +309,7 @@ public boolean apply(ApplicationReport appReport) { } } - public Path getConfigFilePath() { - return confFilePath; + public Path getStagingPath() { + return stagingPath; } - } From 64bdfb16d51b750f14c49bf45201835864e20060 Mon Sep 17 00:00:00 2001 From: okumin Date: Fri, 28 Feb 2025 18:08:13 +0900 Subject: [PATCH 2/3] TEZ-4604: tez-mapreduce does not delete files under staging directory --- .../apache/tez/dag/api/TezConfiguration.java | 10 ++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 17 ++++++++++------- .../apache/tez/mapreduce/client/YARNRunner.java | 1 + 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 8862f4b7d6..81530a344a 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -143,6 +143,16 @@ public TezConfiguration(boolean loadDefaults) { "staging.scratch-data.auto-delete"; public static final boolean TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT = true; + /** + * Boolean value. If true then Tez will try to delete the entire TEZ_AM_STAGING_DIR. Otherwise, Tez will delete + * only a subdirectory created by Tez and a client needs to clean up the parent directories. This is typically used + * by tez-mapreduce. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type = "boolean") + public static final String TEZ_AM_STAGING_BASE_DIR_CLEANUP = TEZ_AM_PREFIX + "staging.base.dir.cleanup"; + public static final boolean TEZ_AM_STAGING_BASE_DIR_CLEANUP_DEFAULT = false; + /** * String value. Specifies the name of the shuffle auxiliary service. */ diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 45b5266ff2..8b3d1cf30b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -304,6 +304,7 @@ public class DAGAppMaster extends AbstractService { private boolean recoveryEnabled; private Path recoveryDataDir; private Path currentRecoveryDataDir; + private Path tezBaseStagingDir; private Path tezSystemStagingDir; private FileSystem recoveryFS; @@ -490,6 +491,7 @@ protected void serviceInit(final Configuration conf) throws Exception { } else { dispatcher.enableExitOnDispatchException(); } + this.tezBaseStagingDir = TezCommonUtils.getTezBaseStagingPath(conf); String strAppId = this.appAttemptID.getApplicationId().toString(); this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId); @@ -2201,19 +2203,20 @@ public void serviceStop() throws Exception { if (deleteTezScratchData && this.taskSchedulerManager != null && this.taskSchedulerManager.hasUnregistered()) { // Delete tez scratch data dir - if (this.tezSystemStagingDir != null) { + boolean cleanupBaseStagingDir = this.amConf.getBoolean(TezConfiguration.TEZ_AM_STAGING_BASE_DIR_CLEANUP, + TezConfiguration.TEZ_AM_STAGING_BASE_DIR_CLEANUP_DEFAULT); + Path directory = cleanupBaseStagingDir ? this.tezBaseStagingDir : this.tezSystemStagingDir; + if (directory != null) { try { this.appMasterUgi.doAs(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - FileSystem fs = tezSystemStagingDir.getFileSystem(amConf); - boolean deletedStagingDir = fs.delete(tezSystemStagingDir, true); + FileSystem fs = directory.getFileSystem(amConf); + boolean deletedStagingDir = fs.delete(directory, true); if (!deletedStagingDir) { - LOG.warn("Failed to delete tez scratch data dir, path=" - + tezSystemStagingDir); + LOG.warn("Failed to delete tez scratch data dir, path={}", directory); } else { - LOG.info("Completed deletion of tez scratch data dir, path=" - + tezSystemStagingDir); + LOG.info("Completed deletion of tez scratch data dir, path={}", directory); } return null; } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java index 7aed4a04a8..e734f43e9e 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java @@ -623,6 +623,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) try { dagAMConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, jobSubmitDir); + dagAMConf.setBoolean(TezConfiguration.TEZ_AM_STAGING_BASE_DIR_CLEANUP, true); // Set Tez parameters based on MR parameters. String queueName = jobConf.get(JobContext.QUEUE_NAME, From a1650eb3e7e35b228dcd2412f13f002de28d656b Mon Sep 17 00:00:00 2001 From: okumin Date: Fri, 21 Mar 2025 21:22:10 +0900 Subject: [PATCH 3/3] Add a test case for recovery --- .../apache/tez/mapreduce/TestMRRRecovery.java | 281 ++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRRecovery.java diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRRecovery.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRRecovery.java new file mode 100644 index 0000000000..733334fb9e --- /dev/null +++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRRecovery.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.app.RecoveryParser; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.recovery.RecoveryService; +import org.apache.tez.mapreduce.examples.MRRSleepJob; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.test.MiniTezCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestMRRRecovery { + private static final Logger LOG = LoggerFactory.getLogger(TestMRRRecovery.class); + + private static MiniTezCluster mrrTezCluster; + private static MiniDFSCluster dfsCluster; + private static YarnClient yarnClient; + + private static FileSystem remoteFs; + + private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestMRRJobs.class.getName() + "-tmpDir"; + + @BeforeClass + public static void setup() throws IOException { + try { + Configuration conf = new Configuration(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null).build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + + if (mrrTezCluster == null) { + mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 1, + 1, 1); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + conf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); + conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, true); + mrrTezCluster.init(conf); + mrrTezCluster.start(); + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(mrrTezCluster.getConfig()); + yarnClient.start(); + } + + } + + @AfterClass + public static void tearDown() throws Exception { + if (mrrTezCluster != null) { + mrrTezCluster.stop(); + mrrTezCluster = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + if (yarnClient != null) { + yarnClient.close(); + } + } + + private ApplicationId getApplicationId(Job job) throws Exception { + GetApplicationsRequest request = GetApplicationsRequest.newInstance(); + request.setName(job.getJobName()); + List apps = yarnClient.getApplications(request); + Assert.assertEquals(1, apps.size()); + return apps.get(0).getApplicationId(); + } + + private Path getJobStagingDir(Job job) throws Exception { + String userName = UserGroupInformation.getCurrentUser().getUserName(); + return new Path(String.format("%s/%s/.staging/%s", mrrTezCluster.getStagingPath(), userName, job.getJobID())); + } + + private void assertStagingDir(Job job) throws Exception { + // Wait for the clean-up process to be invoked + while (true) { + int numAllocatedCores = mrrTezCluster.getResourceManager().getResourceScheduler().getRootQueueMetrics() + .getAllocatedVirtualCores(); + LOG.info("Number of cores in use: {}", numAllocatedCores); + if (numAllocatedCores == 0) { + break; + } + Thread.sleep(100L); + } + + Assert.assertFalse(remoteFs.exists(getJobStagingDir(job))); + } + + private void runJobAndKill(Job job) throws Exception { + job.submit(); + + ApplicationId applicationId = getApplicationId(job); + Assert.assertTrue(remoteFs.exists(getJobStagingDir(job))); + TimeUnit.SECONDS.sleep(10); + List attempts1 = yarnClient.getApplicationAttempts(applicationId); + Assert.assertEquals(1, attempts1.size()); + Assert.assertTrue(remoteFs.exists(getJobStagingDir(job))); + yarnClient.failApplicationAttempt(attempts1.get(0).getApplicationAttemptId()); + } + + @Test(timeout = 120000) + public void testSucceed() throws Exception { + Configuration sleepConf = new Configuration(mrrTezCluster.getConfig()); + + MRRSleepJob sleepJob = new MRRSleepJob(); + sleepJob.setConf(sleepConf); + Job job = sleepJob.createJob(1, 1, 1, 1, 1, + 1, 1, 1, 1, 1); + job.setJobName("TestMRRRecovery-testSucceed"); + job.setJarByClass(MRRSleepJob.class); + job.submit(); + Assert.assertTrue(job.waitForCompletion(true)); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + assertStagingDir(job); + + ApplicationId appId = getApplicationId(job); + List attempts = yarnClient.getApplicationAttempts(appId); + Assert.assertEquals(1, attempts.size()); + } + + @Test(timeout = 120000) + public void testAMRecoveryWhileMapperRunning() throws Exception { + Configuration sleepConf = new Configuration(mrrTezCluster.getConfig()); + + MRRSleepJob sleepJob = new MRRSleepJob(); + sleepJob.setConf(sleepConf); + Job job = sleepJob.createJob(1, 1, 1, 1, 30000, + 1, 1, 1, 1, 1); + job.setJobName("TestMRRRecovery-testAMRecoveryWhileMapperRunning"); + job.setJarByClass(MRRSleepJob.class); + runJobAndKill(job); + + TimeUnit.SECONDS.sleep(10); + List attempts2 = yarnClient.getApplicationAttempts(getApplicationId(job)); + Assert.assertEquals(2, attempts2.size()); + Assert.assertTrue(remoteFs.exists(getJobStagingDir(job))); + List historyEvents1 = readRecoveryLog(job); + assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0).size()); + assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 1).size()); + assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 2).size()); + + Assert.assertTrue(job.waitForCompletion(true)); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + assertStagingDir(job); + + ApplicationId appId = getApplicationId(job); + List attempts = yarnClient.getApplicationAttempts(appId); + Assert.assertEquals(2, attempts.size()); + } + + @Test(timeout = 120000) + public void testAMRecoveryWhileReducerRunning() throws Exception { + Configuration sleepConf = new Configuration(mrrTezCluster.getConfig()); + + MRRSleepJob sleepJob = new MRRSleepJob(); + sleepJob.setConf(sleepConf); + Job job = sleepJob.createJob(1, 1, 1, 1, 1, + 1, 30000, 1, 1, 1); + job.setJobName("TestMRRRecovery-testAMRecoveryWhileReducerRunning"); + job.setJarByClass(MRRSleepJob.class); + runJobAndKill(job); + + TimeUnit.SECONDS.sleep(10); + List attempts2 = yarnClient.getApplicationAttempts(getApplicationId(job)); + Assert.assertEquals(2, attempts2.size()); + Assert.assertTrue(remoteFs.exists(getJobStagingDir(job))); + List historyEvents1 = readRecoveryLog(job); + assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0).size()); + assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 1).size()); + assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 2).size()); + + Assert.assertTrue(job.waitForCompletion(true)); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + assertStagingDir(job); + + ApplicationId appId = getApplicationId(job); + List attempts = yarnClient.getApplicationAttempts(appId); + Assert.assertEquals(2, attempts.size()); + } + + private List readRecoveryLog(Job job) throws Exception { + ApplicationId appId = getApplicationId(job); + Configuration tezConf = mrrTezCluster.getConfig(); + Path tezSystemStagingDir = new Path(new Path(getJobStagingDir(job), TezCommonUtils.TEZ_SYSTEM_SUB_DIR), + appId.toString()); + Path recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf); + FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf); + List historyEvents = new ArrayList<>(); + int attemptId = 1; + Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, attemptId); + Path recoveryFilePath = + new Path(currentAttemptRecoveryDataDir, appId.toString().replace("application", "dag") + + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); + if (fs.exists(recoveryFilePath)) { + LOG.info("Read recovery file:" + recoveryFilePath); + historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath))); + } + printHistoryEvents(historyEvents, attemptId); + return historyEvents; + } + + private void printHistoryEvents(List historyEvents, int attemptId) { + LOG.info("RecoveryLogs from attempt:" + attemptId); + for(HistoryEvent historyEvent : historyEvents) { + LOG.info("Parsed event from recovery stream" + + ", eventType=" + historyEvent.getEventType() + + ", event=" + historyEvent); + } + LOG.info(""); + } + + private List findTaskAttemptFinishedEvent( + List historyEvents, int vertexId) { + List resultEvents = new ArrayList<>(); + for (HistoryEvent historyEvent : historyEvents) { + if (historyEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) { + TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent) historyEvent; + if (taFinishedEvent.getState() == TaskAttemptState.KILLED) { + continue; + } + if (taFinishedEvent.getVertexID().getId() == vertexId && taFinishedEvent.getTaskID().getId() == 0) { + resultEvents.add(taFinishedEvent); + } + } + } + return resultEvents; + } +}