diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 74a1fc163d6..a48c6c18701 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -224,6 +224,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5999. Fix dead link in InputFormat javadoc (Akira AJISAKA via aw)
+ MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
+ the current context’s FS is different. (Benjamin Zhitomirsky via zjshen)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
index 4d1800f15c1..2e597d11d66 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
@@ -73,6 +73,12 @@
org.apache.hadoop
hadoop-mapreduce-client-shuffle
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test-jar
+ test
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 8fd7b471600..c566740aa7a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -28,13 +28,13 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -74,7 +74,9 @@ public class JobHistoryEventHandler extends AbstractService
private int eventCounter;
- //TODO Does the FS object need to be different ?
+ // Those file systems may differ from the job configuration
+ // See org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils
+ // #ensurePathInDefaultFileSystem
private FileSystem stagingDirFS; // log Dir FileSystem
private FileSystem doneDirFS; // done Dir FileSystem
@@ -141,7 +143,7 @@ public class JobHistoryEventHandler extends AbstractService
//Check for the existence of the history staging dir. Maybe create it.
try {
stagingDirPath =
- FileSystem.get(conf).makeQualified(new Path(stagingDirStr));
+ FileContext.getFileContext(conf).makeQualified(new Path(stagingDirStr));
stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
mkdir(stagingDirFS, stagingDirPath, new FsPermission(
JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
@@ -154,7 +156,7 @@ public class JobHistoryEventHandler extends AbstractService
//Check for the existence of intermediate done dir.
Path doneDirPath = null;
try {
- doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr));
+ doneDirPath = FileContext.getFileContext(conf).makeQualified(new Path(doneDirStr));
doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
// This directory will be in a common location, or this may be a cluster
// meant for a single user. Creating based on the conf. Should ideally be
@@ -194,7 +196,7 @@ public class JobHistoryEventHandler extends AbstractService
//Check/create user directory under intermediate done dir.
try {
doneDirPrefixPath =
- FileSystem.get(conf).makeQualified(new Path(userDoneDirStr));
+ FileContext.getFileContext(conf).makeQualified(new Path(userDoneDirStr));
mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
} catch (IOException e) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 167df3c8e29..d8a0cc7af52 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.never;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import org.junit.Assert;
@@ -35,8 +36,13 @@ import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -52,6 +58,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertFalse;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -60,6 +70,26 @@ public class TestJobHistoryEventHandler {
private static final Log LOG = LogFactory
.getLog(TestJobHistoryEventHandler.class);
+ private static MiniDFSCluster dfsCluster = null;
+ private static String coreSitePath;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ coreSitePath = "." + File.separator + "target" + File.separator +
+ "test-classes" + File.separator + "core-site.xml";
+ Configuration conf = new HdfsConfiguration();
+ dfsCluster = new MiniDFSCluster.Builder(conf).build();
+ }
+
+ @AfterClass
+ public static void cleanUpClass() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ @After
+ public void cleanTest() throws Exception {
+ new File(coreSitePath).delete();
+ }
@Test (timeout=50000)
public void testFirstFlushOnCompletionEvent() throws Exception {
@@ -325,6 +355,50 @@ public class TestJobHistoryEventHandler {
}
}
+ @Test (timeout=50000)
+ public void testDefaultFsIsUsedForHistory() throws Exception {
+ // Create default configuration pointing to the minicluster
+ Configuration conf = new Configuration();
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+ dfsCluster.getURI().toString());
+ FileOutputStream os = new FileOutputStream(coreSitePath);
+ conf.writeXml(os);
+ os.close();
+
+ // simulate execution under a non-default namenode
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+ "file:///");
+
+ TestParams t = new TestParams();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.dfsWorkDir);
+
+ JHEvenHandlerForTest realJheh =
+ new JHEvenHandlerForTest(t.mockAppContext, 0, false);
+ JHEvenHandlerForTest jheh = spy(realJheh);
+ jheh.init(conf);
+
+ try {
+ jheh.start();
+ handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+
+ handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
+ TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
+ new Counters(), new Counters())));
+
+ // If we got here then event handler worked but we don't know with which
+ // file system. Now we check that history stuff was written to minicluster
+ FileSystem dfsFileSystem = dfsCluster.getFileSystem();
+ assertTrue("Minicluster contains some history files",
+ dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0);
+ FileSystem localFileSystem = LocalFileSystem.get(conf);
+ assertFalse("No history directory on non-default file system",
+ localFileSystem.exists(new Path(t.dfsWorkDir)));
+ } finally {
+ jheh.stop();
+ }
+ }
+
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
jheh.handle(event);
}
@@ -372,6 +446,7 @@ public class TestJobHistoryEventHandler {
private class TestParams {
boolean isLastAMRetry;
String workDir = setupTestWorkDir();
+ String dfsWorkDir = "/" + this.getClass().getCanonicalName();
ApplicationId appId = ApplicationId.newInstance(200, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
@@ -451,10 +526,16 @@ public class TestJobHistoryEventHandler {
class JHEvenHandlerForTest extends JobHistoryEventHandler {
private EventWriter eventWriter;
+ private boolean mockHistoryProcessing = true;
public JHEvenHandlerForTest(AppContext context, int startCount) {
super(context, startCount);
}
+ public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
+ super(context, startCount);
+ this.mockHistoryProcessing = mockHistoryProcessing;
+ }
+
@Override
protected void serviceStart() {
}
@@ -462,7 +543,12 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
@Override
protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
- this.eventWriter = mock(EventWriter.class);
+ if (mockHistoryProcessing) {
+ this.eventWriter = mock(EventWriter.class);
+ }
+ else {
+ this.eventWriter = super.createEventWriter(historyFilePath);
+ }
return this.eventWriter;
}
@@ -475,8 +561,13 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
}
@Override
- protected void processDoneFiles(JobId jobId){
- // do nothing
+ protected void processDoneFiles(JobId jobId) throws IOException {
+ if (!mockHistoryProcessing) {
+ super.processDoneFiles(jobId);
+ }
+ else {
+ // do nothing
+ }
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
index d80fe40958d..167ee20a22e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
@@ -22,20 +22,24 @@ import java.io.File;
import java.io.IOException;
import java.util.Calendar;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -117,6 +121,7 @@ public class JobHistoryUtils {
public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + Path.SEPARATOR + "\\d{2}" + "\\" + Path.SEPARATOR + "\\d{2}";
public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
+ private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
private static final PathFilter CONF_FILTER = new PathFilter() {
@Override
@@ -183,7 +188,7 @@ public class JobHistoryUtils {
Path stagingPath = MRApps.getStagingAreaDir(conf, user);
Path path = new Path(stagingPath, jobId);
String logDir = path.toString();
- return logDir;
+ return ensurePathInDefaultFileSystem(logDir, conf);
}
/**
@@ -200,7 +205,7 @@ public class JobHistoryUtils {
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ "/history/done_intermediate";
}
- return doneDirPrefix;
+ return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
}
/**
@@ -216,7 +221,69 @@ public class JobHistoryUtils {
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ "/history/done";
}
- return doneDirPrefix;
+ return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
+ }
+
+ /**
+ * Get default file system URI for the cluster (used to ensure consistency
+ * of history done/staging locations) over different context
+ *
+ * @return Default file context
+ */
+ private static FileContext getDefaultFileContext() {
+ // If FS_DEFAULT_NAME_KEY was set solely by core-default.xml then we ignore
+ // ignore it. This prevents defaulting history paths to file system specified
+ // by core-default.xml which would not make sense in any case. For a test
+ // case to exploit this functionality it should create core-site.xml
+ FileContext fc = null;
+ Configuration defaultConf = new Configuration();
+ String[] sources;
+ sources = defaultConf.getPropertySources(
+ CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
+ if (sources != null &&
+ (!Arrays.asList(sources).contains("core-default.xml") ||
+ sources.length > 1)) {
+ try {
+ fc = FileContext.getFileContext(defaultConf);
+ LOG.info("Default file system [" +
+ fc.getDefaultFileSystem().getUri() + "]");
+ } catch (UnsupportedFileSystemException e) {
+ LOG.error("Unable to create default file context [" +
+ defaultConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) +
+ "]",
+ e);
+ }
+ }
+ else {
+ LOG.info("Default file system is set solely " +
+ "by core-default.xml therefore - ignoring");
+ }
+
+ return fc;
+ }
+
+ /**
+ * Ensure that path belongs to cluster's default file system unless
+ * 1. it is already fully qualified.
+ * 2. current job configuration uses default file system
+ * 3. running from a test case without core-site.xml
+ *
+ * @param sourcePath source path
+ * @param conf the job configuration
+ * @return full qualified path (if necessary) in default file system
+ */
+ private static String ensurePathInDefaultFileSystem(String sourcePath, Configuration conf) {
+ Path path = new Path(sourcePath);
+ FileContext fc = getDefaultFileContext();
+ if (fc == null ||
+ fc.getDefaultFileSystem().getUri().toString().equals(
+ conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "")) ||
+ path.toUri().getAuthority() != null ||
+ path.toUri().getScheme()!= null) {
+ return sourcePath;
+ }
+
+ return fc.makeQualified(path).toString();
}
/**
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
index 34cb74b5a8b..1e07062d69c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
@@ -19,42 +19,74 @@
package org.apache.hadoop.mapreduce.v2.hs;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.UUID;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.test.CoreTestDriver;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
-
-import java.util.UUID;
+import org.junit.rules.TestName;
public class TestHistoryFileManager {
private static MiniDFSCluster dfsCluster = null;
+ private static MiniDFSCluster dfsCluster2 = null;
+ private static String coreSitePath;
+
+ @Rule
+ public TestName name = new TestName();
@BeforeClass
public static void setUpClass() throws Exception {
+ coreSitePath = "." + File.separator + "target" + File.separator +
+ "test-classes" + File.separator + "core-site.xml";
Configuration conf = new HdfsConfiguration();
+ Configuration conf2 = new HdfsConfiguration();
dfsCluster = new MiniDFSCluster.Builder(conf).build();
+ conf2.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
+ conf.get(MiniDFSCluster.HDFS_MINIDFS_BASEDIR) + "_2");
+ dfsCluster2 = new MiniDFSCluster.Builder(conf2).build();
}
@AfterClass
public static void cleanUpClass() throws Exception {
dfsCluster.shutdown();
+ dfsCluster2.shutdown();
+ }
+
+ @After
+ public void cleanTest() throws Exception {
+ new File(coreSitePath).delete();
+ }
+
+ private String getDoneDirNameForTest() {
+ return "/" + name.getMethodName();
+ }
+
+ private String getIntermediateDoneDirNameForTest() {
+ return "/intermediate_" + name.getMethodName();
}
private void testTryCreateHistoryDirs(Configuration conf, boolean expected)
throws Exception {
- conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID());
- conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID());
+ conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, getDoneDirNameForTest());
+ conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, getIntermediateDoneDirNameForTest());
HistoryFileManager hfm = new HistoryFileManager();
hfm.conf = conf;
Assert.assertEquals(expected, hfm.tryCreatingHistoryDirs(false));
@@ -75,6 +107,36 @@ public class TestHistoryFileManager {
testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), true);
}
+ @Test
+ public void testCreateDirsWithAdditionalFileSystem() throws Exception {
+ dfsCluster.getFileSystem().setSafeMode(
+ HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+ dfsCluster2.getFileSystem().setSafeMode(
+ HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+ Assert.assertFalse(dfsCluster.getFileSystem().isInSafeMode());
+ Assert.assertFalse(dfsCluster2.getFileSystem().isInSafeMode());
+
+ // Set default configuration to the first cluster
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+ dfsCluster.getURI().toString());
+ FileOutputStream os = new FileOutputStream(coreSitePath);
+ conf.writeXml(os);
+ os.close();
+
+ testTryCreateHistoryDirs(dfsCluster2.getConfiguration(0), true);
+
+ // Directories should be created only in the default file system (dfsCluster)
+ Assert.assertTrue(dfsCluster.getFileSystem()
+ .exists(new Path(getDoneDirNameForTest())));
+ Assert.assertTrue(dfsCluster.getFileSystem()
+ .exists(new Path(getIntermediateDoneDirNameForTest())));
+ Assert.assertFalse(dfsCluster2.getFileSystem()
+ .exists(new Path(getDoneDirNameForTest())));
+ Assert.assertFalse(dfsCluster2.getFileSystem()
+ .exists(new Path(getIntermediateDoneDirNameForTest())));
+ }
+
@Test
public void testCreateDirsWithFileSystemInSafeMode() throws Exception {
dfsCluster.getFileSystem().setSafeMode(