MAPREDUCE-6032. Made MR jobs write job history files on the default FS when the current context’s FS is different. Contributed by Benjamin Zhitomirsky.

svn merge --ignore-ancestry -c 1618269 ../../trunk/
svn merge --ignore-ancestry -c 1618270 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1618271 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-08-15 20:33:55 +00:00
parent 24102a4e6c
commit a7a79a198b
6 changed files with 247 additions and 16 deletions

View File

@ -78,6 +78,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5878. some standard JDK APIs are not part of system classes
defaults (Sangjin Lee via jlowe)
MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
the current context's FS is different. (Benjamin Zhitomirsky via zjshen)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -73,6 +73,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -28,13 +28,13 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@ -74,7 +74,9 @@ public class JobHistoryEventHandler extends AbstractService
private int eventCounter;
//TODO Does the FS object need to be different ?
// Those file systems may differ from the job configuration
// See org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils
// #ensurePathInDefaultFileSystem
private FileSystem stagingDirFS; // log Dir FileSystem
private FileSystem doneDirFS; // done Dir FileSystem
@ -141,7 +143,7 @@ public class JobHistoryEventHandler extends AbstractService
//Check for the existence of the history staging dir. Maybe create it.
try {
stagingDirPath =
FileSystem.get(conf).makeQualified(new Path(stagingDirStr));
FileContext.getFileContext(conf).makeQualified(new Path(stagingDirStr));
stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
mkdir(stagingDirFS, stagingDirPath, new FsPermission(
JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
@ -154,7 +156,7 @@ public class JobHistoryEventHandler extends AbstractService
//Check for the existence of intermediate done dir.
Path doneDirPath = null;
try {
doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr));
doneDirPath = FileContext.getFileContext(conf).makeQualified(new Path(doneDirStr));
doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
// This directory will be in a common location, or this may be a cluster
// meant for a single user. Creating based on the conf. Should ideally be
@ -194,7 +196,7 @@ public class JobHistoryEventHandler extends AbstractService
//Check/create user directory under intermediate done dir.
try {
doneDirPrefixPath =
FileSystem.get(conf).makeQualified(new Path(userDoneDirStr));
FileContext.getFileContext(conf).makeQualified(new Path(userDoneDirStr));
mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
} catch (IOException e) {

View File

@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.never;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.junit.Assert;
@ -35,8 +36,13 @@ import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -52,6 +58,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.After;
import org.junit.AfterClass;
import static org.junit.Assert.assertFalse;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@ -60,6 +70,26 @@ public class TestJobHistoryEventHandler {
private static final Log LOG = LogFactory
.getLog(TestJobHistoryEventHandler.class);
private static MiniDFSCluster dfsCluster = null;
private static String coreSitePath;
@BeforeClass
public static void setUpClass() throws Exception {
coreSitePath = "." + File.separator + "target" + File.separator +
"test-classes" + File.separator + "core-site.xml";
Configuration conf = new HdfsConfiguration();
dfsCluster = new MiniDFSCluster.Builder(conf).build();
}
@AfterClass
public static void cleanUpClass() throws Exception {
dfsCluster.shutdown();
}
@After
public void cleanTest() throws Exception {
new File(coreSitePath).delete();
}
@Test (timeout=50000)
public void testFirstFlushOnCompletionEvent() throws Exception {
@ -325,6 +355,50 @@ public class TestJobHistoryEventHandler {
}
}
@Test (timeout=50000)
public void testDefaultFsIsUsedForHistory() throws Exception {
// Create default configuration pointing to the minicluster
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
dfsCluster.getURI().toString());
FileOutputStream os = new FileOutputStream(coreSitePath);
conf.writeXml(os);
os.close();
// simulate execution under a non-default namenode
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
"file:///");
TestParams t = new TestParams();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.dfsWorkDir);
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
// If we got here then event handler worked but we don't know with which
// file system. Now we check that history stuff was written to minicluster
FileSystem dfsFileSystem = dfsCluster.getFileSystem();
assertTrue("Minicluster contains some history files",
dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0);
FileSystem localFileSystem = LocalFileSystem.get(conf);
assertFalse("No history directory on non-default file system",
localFileSystem.exists(new Path(t.dfsWorkDir)));
} finally {
jheh.stop();
}
}
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
jheh.handle(event);
}
@ -372,6 +446,7 @@ public class TestJobHistoryEventHandler {
private class TestParams {
boolean isLastAMRetry;
String workDir = setupTestWorkDir();
String dfsWorkDir = "/" + this.getClass().getCanonicalName();
ApplicationId appId = ApplicationId.newInstance(200, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
@ -451,10 +526,16 @@ public class TestJobHistoryEventHandler {
class JHEvenHandlerForTest extends JobHistoryEventHandler {
private EventWriter eventWriter;
private boolean mockHistoryProcessing = true;
public JHEvenHandlerForTest(AppContext context, int startCount) {
super(context, startCount);
}
public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
super(context, startCount);
this.mockHistoryProcessing = mockHistoryProcessing;
}
@Override
protected void serviceStart() {
}
@ -462,7 +543,12 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
@Override
protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
if (mockHistoryProcessing) {
this.eventWriter = mock(EventWriter.class);
}
else {
this.eventWriter = super.createEventWriter(historyFilePath);
}
return this.eventWriter;
}
@ -475,9 +561,14 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
}
@Override
protected void processDoneFiles(JobId jobId){
protected void processDoneFiles(JobId jobId) throws IOException {
if (!mockHistoryProcessing) {
super.processDoneFiles(jobId);
}
else {
// do nothing
}
}
}
/**

View File

@ -22,20 +22,24 @@ import java.io.File;
import java.io.IOException;
import java.util.Calendar;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -117,6 +121,7 @@ public class JobHistoryUtils {
public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + Path.SEPARATOR + "\\d{2}" + "\\" + Path.SEPARATOR + "\\d{2}";
public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
private static final PathFilter CONF_FILTER = new PathFilter() {
@Override
@ -183,7 +188,7 @@ public class JobHistoryUtils {
Path stagingPath = MRApps.getStagingAreaDir(conf, user);
Path path = new Path(stagingPath, jobId);
String logDir = path.toString();
return logDir;
return ensurePathInDefaultFileSystem(logDir, conf);
}
/**
@ -200,7 +205,7 @@ public class JobHistoryUtils {
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ "/history/done_intermediate";
}
return doneDirPrefix;
return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
}
/**
@ -216,7 +221,69 @@ public class JobHistoryUtils {
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ "/history/done";
}
return doneDirPrefix;
return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
}
/**
* Get default file system URI for the cluster (used to ensure consistency
* of history done/staging locations) over different context
*
* @return Default file context
*/
private static FileContext getDefaultFileContext() {
// If FS_DEFAULT_NAME_KEY was set solely by core-default.xml then we ignore
// ignore it. This prevents defaulting history paths to file system specified
// by core-default.xml which would not make sense in any case. For a test
// case to exploit this functionality it should create core-site.xml
FileContext fc = null;
Configuration defaultConf = new Configuration();
String[] sources;
sources = defaultConf.getPropertySources(
CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
if (sources != null &&
(!Arrays.asList(sources).contains("core-default.xml") ||
sources.length > 1)) {
try {
fc = FileContext.getFileContext(defaultConf);
LOG.info("Default file system [" +
fc.getDefaultFileSystem().getUri() + "]");
} catch (UnsupportedFileSystemException e) {
LOG.error("Unable to create default file context [" +
defaultConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) +
"]",
e);
}
}
else {
LOG.info("Default file system is set solely " +
"by core-default.xml therefore - ignoring");
}
return fc;
}
/**
* Ensure that path belongs to cluster's default file system unless
* 1. it is already fully qualified.
* 2. current job configuration uses default file system
* 3. running from a test case without core-site.xml
*
* @param sourcePath source path
* @param conf the job configuration
* @return full qualified path (if necessary) in default file system
*/
private static String ensurePathInDefaultFileSystem(String sourcePath, Configuration conf) {
Path path = new Path(sourcePath);
FileContext fc = getDefaultFileContext();
if (fc == null ||
fc.getDefaultFileSystem().getUri().toString().equals(
conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "")) ||
path.toUri().getAuthority() != null ||
path.toUri().getScheme()!= null) {
return sourcePath;
}
return fc.makeQualified(path).toString();
}
/**

View File

@ -19,42 +19,74 @@
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.File;
import java.io.FileOutputStream;
import java.util.UUID;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.test.CoreTestDriver;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import java.util.UUID;
import org.junit.rules.TestName;
public class TestHistoryFileManager {
private static MiniDFSCluster dfsCluster = null;
private static MiniDFSCluster dfsCluster2 = null;
private static String coreSitePath;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpClass() throws Exception {
coreSitePath = "." + File.separator + "target" + File.separator +
"test-classes" + File.separator + "core-site.xml";
Configuration conf = new HdfsConfiguration();
Configuration conf2 = new HdfsConfiguration();
dfsCluster = new MiniDFSCluster.Builder(conf).build();
conf2.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
conf.get(MiniDFSCluster.HDFS_MINIDFS_BASEDIR) + "_2");
dfsCluster2 = new MiniDFSCluster.Builder(conf2).build();
}
@AfterClass
public static void cleanUpClass() throws Exception {
dfsCluster.shutdown();
dfsCluster2.shutdown();
}
@After
public void cleanTest() throws Exception {
new File(coreSitePath).delete();
}
private String getDoneDirNameForTest() {
return "/" + name.getMethodName();
}
private String getIntermediateDoneDirNameForTest() {
return "/intermediate_" + name.getMethodName();
}
private void testTryCreateHistoryDirs(Configuration conf, boolean expected)
throws Exception {
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID());
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID());
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, getDoneDirNameForTest());
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, getIntermediateDoneDirNameForTest());
HistoryFileManager hfm = new HistoryFileManager();
hfm.conf = conf;
Assert.assertEquals(expected, hfm.tryCreatingHistoryDirs(false));
@ -75,6 +107,36 @@ public class TestHistoryFileManager {
testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), true);
}
@Test
public void testCreateDirsWithAdditionalFileSystem() throws Exception {
dfsCluster.getFileSystem().setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
dfsCluster2.getFileSystem().setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
Assert.assertFalse(dfsCluster.getFileSystem().isInSafeMode());
Assert.assertFalse(dfsCluster2.getFileSystem().isInSafeMode());
// Set default configuration to the first cluster
Configuration conf = new Configuration(false);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
dfsCluster.getURI().toString());
FileOutputStream os = new FileOutputStream(coreSitePath);
conf.writeXml(os);
os.close();
testTryCreateHistoryDirs(dfsCluster2.getConfiguration(0), true);
// Directories should be created only in the default file system (dfsCluster)
Assert.assertTrue(dfsCluster.getFileSystem()
.exists(new Path(getDoneDirNameForTest())));
Assert.assertTrue(dfsCluster.getFileSystem()
.exists(new Path(getIntermediateDoneDirNameForTest())));
Assert.assertFalse(dfsCluster2.getFileSystem()
.exists(new Path(getDoneDirNameForTest())));
Assert.assertFalse(dfsCluster2.getFileSystem()
.exists(new Path(getIntermediateDoneDirNameForTest())));
}
@Test
public void testCreateDirsWithFileSystemInSafeMode() throws Exception {
dfsCluster.getFileSystem().setSafeMode(