From 3a6f8b878501cc6961a8388813f33bbeb5ebae34 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Thu, 16 Jan 2014 17:10:20 +0000 Subject: [PATCH] MAPREDUCE-5724. JobHistoryServer does not start if HDFS is not running. (tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1558852 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/jobhistory/JHAdminConfig.java | 7 + .../hadoop-mapreduce-client-hs/pom.xml | 10 ++ .../mapreduce/v2/hs/HistoryFileManager.java | 141 ++++++++++++++---- .../v2/hs/TestHistoryFileManager.java | 139 +++++++++++++++++ 5 files changed, 269 insertions(+), 31 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 35a538e3132..14dc7013fe2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -272,6 +272,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5689. MRAppMaster does not preempt reducers when scheduled maps cannot be fulfilled. (lohit via kasha) + MAPREDUCE-5724. JobHistoryServer does not start if HDFS is not running. + (tucu) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java index ee7dae93cac..2cc233688b8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java @@ -77,6 +77,13 @@ public class JHAdminConfig { public static final String MR_HISTORY_DONE_DIR = MR_HISTORY_PREFIX + "done-dir"; + /** + * Maximum time the History server will wait for the FileSystem for History + * files to become available. Default value is -1, forever. + */ + public static final String MR_HISTORY_MAX_START_WAIT_TIME = + MR_HISTORY_PREFIX + "maximum-start-wait-time-millis"; + public static final long DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME = -1; /** * Path where history files should be stored after a job finished and before * they are pulled into the job history server. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml index 31587782f43..3ac191fb86b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml @@ -33,6 +33,10 @@ + + org.apache.hadoop + hadoop-hdfs + org.apache.hadoop hadoop-mapreduce-client-common @@ -53,6 +57,12 @@ test-jar test + + org.apache.hadoop + hadoop-hdfs + test-jar + test + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index d0861002408..f53f18896d9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.hs; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.ConnectException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -69,6 +70,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; /** * This class provides a way to interact with history files in a thread safe @@ -464,7 +467,8 @@ public class HistoryFileManager extends AbstractService { private JobACLsManager aclsMgr; - private Configuration conf; + @VisibleForTesting + Configuration conf; private String serialNumberFormat; @@ -491,36 +495,10 @@ public class HistoryFileManager extends AbstractService { + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) + "d"); - String doneDirPrefix = null; - doneDirPrefix = JobHistoryUtils - .getConfiguredHistoryServerDoneDirPrefix(conf); - try { - doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( - new Path(doneDirPrefix)); - doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); - doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); - mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( - JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); - } catch (IOException e) { - throw new YarnRuntimeException("Error creating done directory: [" - + doneDirPrefixPath + "]", e); - } - - String intermediateDoneDirPrefix = null; - intermediateDoneDirPrefix = JobHistoryUtils - .getConfiguredHistoryIntermediateDoneDirPrefix(conf); - try { - intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( - new Path(intermediateDoneDirPrefix)); - intermediateDoneDirFc = FileContext.getFileContext( - intermediateDoneDirPath.toUri(), conf); - mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( - JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); - } catch (IOException e) { - LOG.info("error creating done directory on dfs " + e); - throw new YarnRuntimeException("Error creating intermediate done directory: [" - + intermediateDoneDirPath + "]", e); - } + long maxFSWaitTime = conf.getLong( + JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, + JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); + createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime); this.aclsMgr = new JobACLsManager(conf); @@ -544,6 +522,107 @@ public class HistoryFileManager extends AbstractService { super.serviceInit(conf); } + @VisibleForTesting + void createHistoryDirs(Clock clock, long intervalCheckMillis, + long timeOutMillis) throws IOException { + long start = clock.getTime(); + boolean done = false; + int counter = 0; + while (!done && + ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) { + done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec + try { + Thread.sleep(intervalCheckMillis); + } catch (InterruptedException ex) { + throw new YarnRuntimeException(ex); + } + } + if (!done) { + throw new YarnRuntimeException("Timed out '" + timeOutMillis+ + "ms' waiting for FileSystem to become available"); + } + } + + /** + * DistributedFileSystem returns a RemoteException with a message stating + * SafeModeException in it. So this is only way to check it is because of + * being in safe mode. + */ + private boolean isBecauseSafeMode(Throwable ex) { + return ex.toString().contains("SafeModeException"); + } + + /** + * Returns TRUE if the history dirs were created, FALSE if they could not + * be created because the FileSystem is not reachable or in safe mode and + * throws and exception otherwise. + */ + @VisibleForTesting + boolean tryCreatingHistoryDirs(boolean logWait) throws IOException { + boolean succeeded = true; + String doneDirPrefix = JobHistoryUtils. + getConfiguredHistoryServerDoneDirPrefix(conf); + try { + doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( + new Path(doneDirPrefix)); + doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); + doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); + mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( + JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); + } catch (ConnectException ex) { + if (logWait) { + LOG.info("Waiting for FileSystem at " + + doneDirPrefixPath.toUri().getAuthority() + "to be available"); + } + succeeded = false; + } catch (IOException e) { + if (isBecauseSafeMode(e)) { + succeeded = false; + if (logWait) { + LOG.info("Waiting for FileSystem at " + + doneDirPrefixPath.toUri().getAuthority() + + "to be out of safe mode"); + } + } else { + throw new YarnRuntimeException("Error creating done directory: [" + + doneDirPrefixPath + "]", e); + } + } + if (succeeded) { + String intermediateDoneDirPrefix = JobHistoryUtils. + getConfiguredHistoryIntermediateDoneDirPrefix(conf); + try { + intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( + new Path(intermediateDoneDirPrefix)); + intermediateDoneDirFc = FileContext.getFileContext( + intermediateDoneDirPath.toUri(), conf); + mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( + JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); + } catch (ConnectException ex) { + succeeded = false; + if (logWait) { + LOG.info("Waiting for FileSystem at " + + intermediateDoneDirPath.toUri().getAuthority() + + "to be available"); + } + } catch (IOException e) { + if (isBecauseSafeMode(e)) { + succeeded = false; + if (logWait) { + LOG.info("Waiting for FileSystem at " + + intermediateDoneDirPath.toUri().getAuthority() + + "to be out of safe mode"); + } + } else { + throw new YarnRuntimeException( + "Error creating intermediate done directory: [" + + intermediateDoneDirPath + "]", e); + } + } + } + return succeeded; + } + @Override public void serviceStop() throws Exception { ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java new file mode 100644 index 00000000000..de617202da5 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.mapreduce.v2.app.ControlledClock; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.UUID; + +public class TestHistoryFileManager { + private static MiniDFSCluster dfsCluster = null; + + @BeforeClass + public static void setUpClass() throws Exception { + Configuration conf = new HdfsConfiguration(); + dfsCluster = new MiniDFSCluster.Builder(conf).build(); + } + + @AfterClass + public static void cleanUpClass() throws Exception { + dfsCluster.shutdown(); + } + + private void testTryCreateHistoryDirs(Configuration conf, boolean expected) + throws Exception { + conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID()); + conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID()); + HistoryFileManager hfm = new HistoryFileManager(); + hfm.conf = conf; + Assert.assertEquals(expected, hfm.tryCreatingHistoryDirs(false)); + } + + @Test + public void testCreateDirsWithoutFileSystem() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:1"); + testTryCreateHistoryDirs(conf, false); + } + + @Test + public void testCreateDirsWithFileSystem() throws Exception { + dfsCluster.getFileSystem().setSafeMode( + HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + Assert.assertFalse(dfsCluster.getFileSystem().isInSafeMode()); + testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), true); + } + + @Test + public void testCreateDirsWithFileSystemInSafeMode() throws Exception { + dfsCluster.getFileSystem().setSafeMode( + HdfsConstants.SafeModeAction.SAFEMODE_ENTER); + Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode()); + testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), false); + } + + private void testCreateHistoryDirs(Configuration conf, Clock clock) + throws Exception { + conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID()); + conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID()); + HistoryFileManager hfm = new HistoryFileManager(); + hfm.conf = conf; + hfm.createHistoryDirs(clock, 500, 2000); + } + + @Test + public void testCreateDirsWithFileSystemBecomingAvailBeforeTimeout() + throws Exception { + dfsCluster.getFileSystem().setSafeMode( + HdfsConstants.SafeModeAction.SAFEMODE_ENTER); + Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode()); + new Thread() { + @Override + public void run() { + try { + Thread.sleep(500); + dfsCluster.getFileSystem().setSafeMode( + HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode()); + } catch (Exception ex) { + Assert.fail(ex.toString()); + } + } + }.start(); + testCreateHistoryDirs(dfsCluster.getConfiguration(0), new SystemClock()); + } + + @Test(expected = YarnRuntimeException.class) + public void testCreateDirsWithFileSystemNotBecomingAvailBeforeTimeout() + throws Exception { + dfsCluster.getFileSystem().setSafeMode( + HdfsConstants.SafeModeAction.SAFEMODE_ENTER); + Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode()); + final ControlledClock clock = new ControlledClock(new SystemClock()); + clock.setTime(1); + new Thread() { + @Override + public void run() { + try { + Thread.sleep(500); + clock.setTime(3000); + } catch (Exception ex) { + Assert.fail(ex.toString()); + } + } + }.start(); + testCreateHistoryDirs(dfsCluster.getConfiguration(0), clock); + } + +}