diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 73090535922..40078fff033 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.nio.file.Paths; import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -34,6 +35,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -549,9 +552,27 @@ public class MRAppMaster extends CompositeService { }); } - protected boolean keepJobFiles(JobConf conf) { - return (conf.getKeepTaskFilesPattern() != null || conf - .getKeepFailedTaskFiles()); + private boolean isJobNamePatternMatch(JobConf conf, String jobTempDir) { + // Matched staging files should be preserved after job is finished. + if (conf.getKeepTaskFilesPattern() != null && jobTempDir != null) { + String jobFileName = Paths.get(jobTempDir).getFileName().toString(); + Pattern pattern = Pattern.compile(conf.getKeepTaskFilesPattern()); + Matcher matcher = pattern.matcher(jobFileName); + return matcher.find(); + } else { + return false; + } + } + + private boolean isKeepFailedTaskFiles(JobConf conf) { + // TODO: Decide which failed task files that should + // be kept are in application log directory. + return conf.getKeepFailedTaskFiles(); + } + + protected boolean keepJobFiles(JobConf conf, String jobTempDir) { + return isJobNamePatternMatch(conf, jobTempDir) + || isKeepFailedTaskFiles(conf); } /** @@ -574,10 +595,10 @@ public class MRAppMaster extends CompositeService { */ public void cleanupStagingDir() throws IOException { /* make sure we clean the staging files */ - String jobTempDir = null; + String jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR); FileSystem fs = getFileSystem(getConfig()); try { - if (!keepJobFiles(new JobConf(getConfig()))) { + if (!keepJobFiles(new JobConf(getConfig()), jobTempDir)) { jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR); if (jobTempDir == null) { LOG.warn("Job Staging directory is null"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index fc64996a8e6..00b15f892b7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -76,6 +77,11 @@ import org.junit.Test; private final static RecordFactory recordFactory = RecordFactoryProvider. getRecordFactory(null); + @After + public void tearDown() { + conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, false); + } + @Test public void testDeletionofStagingOnUnregistrationFailure() throws IOException { @@ -245,6 +251,128 @@ import org.junit.Test; verify(fs).delete(stagingJobPath, true); } + @Test + public void testByPreserveFailedStaging() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + // TODO: Decide which failed task files that should + // be kept are in application log directory. + // Currently all files are not deleted from staging dir. + conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + //Staging Dir exists + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationId appId + = ApplicationId.newInstance(System.currentTimeMillis(), 0); + ApplicationAttemptId attemptId + = ApplicationAttemptId.newInstance(appId, 1); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.FAILED, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + appMaster.init(conf); + appMaster.start(); + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry()); + verify(fs, times(0)).delete(stagingJobPath, true); + } + + @Test + public void testPreservePatternMatchedStaging() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + // The staging files that are matched to the pattern + // should not be deleted + conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir"); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + //Staging Dir exists + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationId appId + = ApplicationId.newInstance(System.currentTimeMillis(), 0); + ApplicationAttemptId attemptId + = ApplicationAttemptId.newInstance(appId, 1); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + appMaster.init(conf); + appMaster.start(); + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry()); + verify(fs, times(0)).delete(stagingJobPath, true); + } + + @Test + public void testNotPreserveNotPatternMatchedStaging() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "NotMatching"); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + //Staging Dir exists + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationId appId + = ApplicationId.newInstance(System.currentTimeMillis(), 0); + ApplicationAttemptId attemptId + = ApplicationAttemptId.newInstance(appId, 1); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + appMaster.init(conf); + appMaster.start(); + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry()); + //Staging dir should be deleted because it is not matched with + //PRESERVE_FILES_PATTERN + verify(fs, times(1)).delete(stagingJobPath, true); + } + + @Test + public void testPreservePatternMatchedAndFailedStaging() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + // When RESERVE_FILES_PATTERN and PRESERVE_FAILED_TASK_FILES are set, + // files in staging dir are always kept. + conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir"); + conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + //Staging Dir exists + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationId appId + = ApplicationId.newInstance(System.currentTimeMillis(), 0); + ApplicationAttemptId attemptId + = ApplicationAttemptId.newInstance(appId, 1); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + appMaster.init(conf); + appMaster.start(); + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry()); + verify(fs, times(0)).delete(stagingJobPath, true); + } + private class TestMRApp extends MRAppMaster { ContainerAllocator allocator; boolean testIsLastAMRetry = false;