MAPREDUCE-6607. Enable regex pattern matching when mapreduce.task.files.preserve.filepattern is set. Contributed by Kai Sasaki.

This commit is contained in:
Akira Ajisaka 2016-05-23 07:25:52 +09:00
parent 1fb74e569a
commit a59f30272d
2 changed files with 154 additions and 5 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Paths;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@ -34,6 +35,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
@ -549,9 +552,27 @@ public class MRAppMaster extends CompositeService {
});
}
protected boolean keepJobFiles(JobConf conf) {
return (conf.getKeepTaskFilesPattern() != null || conf
.getKeepFailedTaskFiles());
private boolean isJobNamePatternMatch(JobConf conf, String jobTempDir) {
// Matched staging files should be preserved after job is finished.
if (conf.getKeepTaskFilesPattern() != null && jobTempDir != null) {
String jobFileName = Paths.get(jobTempDir).getFileName().toString();
Pattern pattern = Pattern.compile(conf.getKeepTaskFilesPattern());
Matcher matcher = pattern.matcher(jobFileName);
return matcher.find();
} else {
return false;
}
}
private boolean isKeepFailedTaskFiles(JobConf conf) {
// TODO: Decide which failed task files that should
// be kept are in application log directory.
return conf.getKeepFailedTaskFiles();
}
protected boolean keepJobFiles(JobConf conf, String jobTempDir) {
return isJobNamePatternMatch(conf, jobTempDir)
|| isKeepFailedTaskFiles(conf);
}
/**
@ -574,10 +595,10 @@ public class MRAppMaster extends CompositeService {
*/
public void cleanupStagingDir() throws IOException {
/* make sure we clean the staging files */
String jobTempDir = null;
String jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
FileSystem fs = getFileSystem(getConfig());
try {
if (!keepJobFiles(new JobConf(getConfig()))) {
if (!keepJobFiles(new JobConf(getConfig()), jobTempDir)) {
jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
if (jobTempDir == null) {
LOG.warn("Job Staging directory is null");

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@ -76,6 +77,11 @@ import org.junit.Test;
private final static RecordFactory recordFactory = RecordFactoryProvider.
getRecordFactory(null);
@After
public void tearDown() {
conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, false);
}
@Test
public void testDeletionofStagingOnUnregistrationFailure()
throws IOException {
@ -245,6 +251,128 @@ import org.junit.Test;
verify(fs).delete(stagingJobPath, true);
}
@Test
public void testByPreserveFailedStaging() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
// TODO: Decide which failed task files that should
// be kept are in application log directory.
// Currently all files are not deleted from staging dir.
conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationId appId
= ApplicationId.newInstance(System.currentTimeMillis(), 0);
ApplicationAttemptId attemptId
= ApplicationAttemptId.newInstance(appId, 1);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.FAILED, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
verify(fs, times(0)).delete(stagingJobPath, true);
}
@Test
public void testPreservePatternMatchedStaging() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
// The staging files that are matched to the pattern
// should not be deleted
conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir");
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationId appId
= ApplicationId.newInstance(System.currentTimeMillis(), 0);
ApplicationAttemptId attemptId
= ApplicationAttemptId.newInstance(appId, 1);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
verify(fs, times(0)).delete(stagingJobPath, true);
}
@Test
public void testNotPreserveNotPatternMatchedStaging() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "NotMatching");
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationId appId
= ApplicationId.newInstance(System.currentTimeMillis(), 0);
ApplicationAttemptId attemptId
= ApplicationAttemptId.newInstance(appId, 1);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
//Staging dir should be deleted because it is not matched with
//PRESERVE_FILES_PATTERN
verify(fs, times(1)).delete(stagingJobPath, true);
}
@Test
public void testPreservePatternMatchedAndFailedStaging() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
// When RESERVE_FILES_PATTERN and PRESERVE_FAILED_TASK_FILES are set,
// files in staging dir are always kept.
conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir");
conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationId appId
= ApplicationId.newInstance(System.currentTimeMillis(), 0);
ApplicationAttemptId attemptId
= ApplicationAttemptId.newInstance(appId, 1);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
verify(fs, times(0)).delete(stagingJobPath, true);
}
private class TestMRApp extends MRAppMaster {
ContainerAllocator allocator;
boolean testIsLastAMRetry = false;