From 84bc2fe4021be32e0ff8ba395359337904149034 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Fri, 15 Aug 2014 20:17:45 +0000 Subject: [PATCH] =?UTF-8?q?MAPREDUCE-6032.=20Made=20MR=20jobs=20write=20jo?= =?UTF-8?q?b=20history=20files=20on=20the=20default=20FS=20when=20the=20cu?= =?UTF-8?q?rrent=20context=E2=80=99s=20FS=20is=20different.=20Contributed?= =?UTF-8?q?=20by=20Benjamin=20Zhitomirsky.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618269 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop-mapreduce-client-app/pom.xml | 6 ++ .../jobhistory/JobHistoryEventHandler.java | 12 ++- .../TestJobHistoryEventHandler.java | 97 ++++++++++++++++++- .../v2/jobhistory/JobHistoryUtils.java | 75 +++++++++++++- .../v2/hs/TestHistoryFileManager.java | 70 ++++++++++++- 6 files changed, 247 insertions(+), 16 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 74a1fc163d6..a48c6c18701 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -224,6 +224,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5999. Fix dead link in InputFormat javadoc (Akira AJISAKA via aw) + MAPREDUCE-6032. Made MR jobs write job history files on the default FS when + the current context’s FS is different. (Benjamin Zhitomirsky via zjshen) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml index 4d1800f15c1..2e597d11d66 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml @@ -73,6 +73,12 @@ org.apache.hadoop hadoop-mapreduce-client-shuffle + + org.apache.hadoop + hadoop-hdfs + test-jar + test + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 8fd7b471600..c566740aa7a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -28,13 +28,13 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -74,7 +74,9 @@ public class JobHistoryEventHandler extends AbstractService private int eventCounter; - //TODO Does the FS object need to be different ? + // Those file systems may differ from the job configuration + // See org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils + // #ensurePathInDefaultFileSystem private FileSystem stagingDirFS; // log Dir FileSystem private FileSystem doneDirFS; // done Dir FileSystem @@ -141,7 +143,7 @@ public class JobHistoryEventHandler extends AbstractService //Check for the existence of the history staging dir. Maybe create it. try { stagingDirPath = - FileSystem.get(conf).makeQualified(new Path(stagingDirStr)); + FileContext.getFileContext(conf).makeQualified(new Path(stagingDirStr)); stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf); mkdir(stagingDirFS, stagingDirPath, new FsPermission( JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS)); @@ -154,7 +156,7 @@ public class JobHistoryEventHandler extends AbstractService //Check for the existence of intermediate done dir. Path doneDirPath = null; try { - doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr)); + doneDirPath = FileContext.getFileContext(conf).makeQualified(new Path(doneDirStr)); doneDirFS = FileSystem.get(doneDirPath.toUri(), conf); // This directory will be in a common location, or this may be a cluster // meant for a single user. Creating based on the conf. Should ideally be @@ -194,7 +196,7 @@ public class JobHistoryEventHandler extends AbstractService //Check/create user directory under intermediate done dir. try { doneDirPrefixPath = - FileSystem.get(conf).makeQualified(new Path(userDoneDirStr)); + FileContext.getFileContext(conf).makeQualified(new Path(userDoneDirStr)); mkdir(doneDirFS, doneDirPrefixPath, new FsPermission( JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS)); } catch (IOException e) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 167df3c8e29..d8a0cc7af52 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.never; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import org.junit.Assert; @@ -35,8 +36,13 @@ import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -52,6 +58,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.assertFalse; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; @@ -60,6 +70,26 @@ public class TestJobHistoryEventHandler { private static final Log LOG = LogFactory .getLog(TestJobHistoryEventHandler.class); + private static MiniDFSCluster dfsCluster = null; + private static String coreSitePath; + + @BeforeClass + public static void setUpClass() throws Exception { + coreSitePath = "." + File.separator + "target" + File.separator + + "test-classes" + File.separator + "core-site.xml"; + Configuration conf = new HdfsConfiguration(); + dfsCluster = new MiniDFSCluster.Builder(conf).build(); + } + + @AfterClass + public static void cleanUpClass() throws Exception { + dfsCluster.shutdown(); + } + + @After + public void cleanTest() throws Exception { + new File(coreSitePath).delete(); + } @Test (timeout=50000) public void testFirstFlushOnCompletionEvent() throws Exception { @@ -325,6 +355,50 @@ public class TestJobHistoryEventHandler { } } + @Test (timeout=50000) + public void testDefaultFsIsUsedForHistory() throws Exception { + // Create default configuration pointing to the minicluster + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + dfsCluster.getURI().toString()); + FileOutputStream os = new FileOutputStream(coreSitePath); + conf.writeXml(os); + os.close(); + + // simulate execution under a non-default namenode + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + "file:///"); + + TestParams t = new TestParams(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.dfsWorkDir); + + JHEvenHandlerForTest realJheh = + new JHEvenHandlerForTest(t.mockAppContext, 0, false); + JHEvenHandlerForTest jheh = spy(realJheh); + jheh.init(conf); + + try { + jheh.start(); + handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent( + TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(), + new Counters(), new Counters()))); + + // If we got here then event handler worked but we don't know with which + // file system. Now we check that history stuff was written to minicluster + FileSystem dfsFileSystem = dfsCluster.getFileSystem(); + assertTrue("Minicluster contains some history files", + dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0); + FileSystem localFileSystem = LocalFileSystem.get(conf); + assertFalse("No history directory on non-default file system", + localFileSystem.exists(new Path(t.dfsWorkDir))); + } finally { + jheh.stop(); + } + } + private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) { jheh.handle(event); } @@ -372,6 +446,7 @@ public class TestJobHistoryEventHandler { private class TestParams { boolean isLastAMRetry; String workDir = setupTestWorkDir(); + String dfsWorkDir = "/" + this.getClass().getCanonicalName(); ApplicationId appId = ApplicationId.newInstance(200, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); @@ -451,10 +526,16 @@ public class TestJobHistoryEventHandler { class JHEvenHandlerForTest extends JobHistoryEventHandler { private EventWriter eventWriter; + private boolean mockHistoryProcessing = true; public JHEvenHandlerForTest(AppContext context, int startCount) { super(context, startCount); } + public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) { + super(context, startCount); + this.mockHistoryProcessing = mockHistoryProcessing; + } + @Override protected void serviceStart() { } @@ -462,7 +543,12 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler { @Override protected EventWriter createEventWriter(Path historyFilePath) throws IOException { - this.eventWriter = mock(EventWriter.class); + if (mockHistoryProcessing) { + this.eventWriter = mock(EventWriter.class); + } + else { + this.eventWriter = super.createEventWriter(historyFilePath); + } return this.eventWriter; } @@ -475,8 +561,13 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler { } @Override - protected void processDoneFiles(JobId jobId){ - // do nothing + protected void processDoneFiles(JobId jobId) throws IOException { + if (!mockHistoryProcessing) { + super.processDoneFiles(jobId); + } + else { + // do nothing + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java index d80fe40958d..167ee20a22e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java @@ -22,20 +22,24 @@ import java.io.File; import java.io.IOException; import java.util.Calendar; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -117,6 +121,7 @@ public class JobHistoryUtils { public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + Path.SEPARATOR + "\\d{2}" + "\\" + Path.SEPARATOR + "\\d{2}"; public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX); private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d"; + private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class); private static final PathFilter CONF_FILTER = new PathFilter() { @Override @@ -183,7 +188,7 @@ public class JobHistoryUtils { Path stagingPath = MRApps.getStagingAreaDir(conf, user); Path path = new Path(stagingPath, jobId); String logDir = path.toString(); - return logDir; + return ensurePathInDefaultFileSystem(logDir, conf); } /** @@ -200,7 +205,7 @@ public class JobHistoryUtils { MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) + "/history/done_intermediate"; } - return doneDirPrefix; + return ensurePathInDefaultFileSystem(doneDirPrefix, conf); } /** @@ -216,7 +221,69 @@ public class JobHistoryUtils { MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) + "/history/done"; } - return doneDirPrefix; + return ensurePathInDefaultFileSystem(doneDirPrefix, conf); + } + + /** + * Get default file system URI for the cluster (used to ensure consistency + * of history done/staging locations) over different context + * + * @return Default file context + */ + private static FileContext getDefaultFileContext() { + // If FS_DEFAULT_NAME_KEY was set solely by core-default.xml then we ignore + // ignore it. This prevents defaulting history paths to file system specified + // by core-default.xml which would not make sense in any case. For a test + // case to exploit this functionality it should create core-site.xml + FileContext fc = null; + Configuration defaultConf = new Configuration(); + String[] sources; + sources = defaultConf.getPropertySources( + CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY); + if (sources != null && + (!Arrays.asList(sources).contains("core-default.xml") || + sources.length > 1)) { + try { + fc = FileContext.getFileContext(defaultConf); + LOG.info("Default file system [" + + fc.getDefaultFileSystem().getUri() + "]"); + } catch (UnsupportedFileSystemException e) { + LOG.error("Unable to create default file context [" + + defaultConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) + + "]", + e); + } + } + else { + LOG.info("Default file system is set solely " + + "by core-default.xml therefore - ignoring"); + } + + return fc; + } + + /** + * Ensure that path belongs to cluster's default file system unless + * 1. it is already fully qualified. + * 2. current job configuration uses default file system + * 3. running from a test case without core-site.xml + * + * @param sourcePath source path + * @param conf the job configuration + * @return full qualified path (if necessary) in default file system + */ + private static String ensurePathInDefaultFileSystem(String sourcePath, Configuration conf) { + Path path = new Path(sourcePath); + FileContext fc = getDefaultFileContext(); + if (fc == null || + fc.getDefaultFileSystem().getUri().toString().equals( + conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "")) || + path.toUri().getAuthority() != null || + path.toUri().getScheme()!= null) { + return sourcePath; + } + + return fc.makeQualified(path).toString(); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java index 34cb74b5a8b..1e07062d69c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java @@ -19,42 +19,74 @@ package org.apache.hadoop.mapreduce.v2.hs; +import java.io.File; +import java.io.FileOutputStream; +import java.util.UUID; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.mapreduce.v2.app.ControlledClock; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.test.CoreTestDriver; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; - -import java.util.UUID; +import org.junit.rules.TestName; public class TestHistoryFileManager { private static MiniDFSCluster dfsCluster = null; + private static MiniDFSCluster dfsCluster2 = null; + private static String coreSitePath; + + @Rule + public TestName name = new TestName(); @BeforeClass public static void setUpClass() throws Exception { + coreSitePath = "." + File.separator + "target" + File.separator + + "test-classes" + File.separator + "core-site.xml"; Configuration conf = new HdfsConfiguration(); + Configuration conf2 = new HdfsConfiguration(); dfsCluster = new MiniDFSCluster.Builder(conf).build(); + conf2.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, + conf.get(MiniDFSCluster.HDFS_MINIDFS_BASEDIR) + "_2"); + dfsCluster2 = new MiniDFSCluster.Builder(conf2).build(); } @AfterClass public static void cleanUpClass() throws Exception { dfsCluster.shutdown(); + dfsCluster2.shutdown(); + } + + @After + public void cleanTest() throws Exception { + new File(coreSitePath).delete(); + } + + private String getDoneDirNameForTest() { + return "/" + name.getMethodName(); + } + + private String getIntermediateDoneDirNameForTest() { + return "/intermediate_" + name.getMethodName(); } private void testTryCreateHistoryDirs(Configuration conf, boolean expected) throws Exception { - conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID()); - conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID()); + conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, getDoneDirNameForTest()); + conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, getIntermediateDoneDirNameForTest()); HistoryFileManager hfm = new HistoryFileManager(); hfm.conf = conf; Assert.assertEquals(expected, hfm.tryCreatingHistoryDirs(false)); @@ -75,6 +107,36 @@ public class TestHistoryFileManager { testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), true); } + @Test + public void testCreateDirsWithAdditionalFileSystem() throws Exception { + dfsCluster.getFileSystem().setSafeMode( + HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + dfsCluster2.getFileSystem().setSafeMode( + HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + Assert.assertFalse(dfsCluster.getFileSystem().isInSafeMode()); + Assert.assertFalse(dfsCluster2.getFileSystem().isInSafeMode()); + + // Set default configuration to the first cluster + Configuration conf = new Configuration(false); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + dfsCluster.getURI().toString()); + FileOutputStream os = new FileOutputStream(coreSitePath); + conf.writeXml(os); + os.close(); + + testTryCreateHistoryDirs(dfsCluster2.getConfiguration(0), true); + + // Directories should be created only in the default file system (dfsCluster) + Assert.assertTrue(dfsCluster.getFileSystem() + .exists(new Path(getDoneDirNameForTest()))); + Assert.assertTrue(dfsCluster.getFileSystem() + .exists(new Path(getIntermediateDoneDirNameForTest()))); + Assert.assertFalse(dfsCluster2.getFileSystem() + .exists(new Path(getDoneDirNameForTest()))); + Assert.assertFalse(dfsCluster2.getFileSystem() + .exists(new Path(getIntermediateDoneDirNameForTest()))); + } + @Test public void testCreateDirsWithFileSystemInSafeMode() throws Exception { dfsCluster.getFileSystem().setSafeMode(