MAPREDUCE-5724. JobHistoryServer does not start if HDFS is not running. (tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1558853 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f5e4562408
commit
0c84c76954
|
@ -126,6 +126,9 @@ Release 2.4.0 - UNRELEASED
|
|||
MAPREDUCE-5689. MRAppMaster does not preempt reducers when scheduled maps
|
||||
cannot be fulfilled. (lohit via kasha)
|
||||
|
||||
MAPREDUCE-5724. JobHistoryServer does not start if HDFS is not running.
|
||||
(tucu)
|
||||
|
||||
Release 2.3.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -77,6 +77,13 @@ public class JHAdminConfig {
|
|||
public static final String MR_HISTORY_DONE_DIR =
|
||||
MR_HISTORY_PREFIX + "done-dir";
|
||||
|
||||
/**
|
||||
* Maximum time the History server will wait for the FileSystem for History
|
||||
* files to become available. Default value is -1, forever.
|
||||
*/
|
||||
public static final String MR_HISTORY_MAX_START_WAIT_TIME =
|
||||
MR_HISTORY_PREFIX + "maximum-start-wait-time-millis";
|
||||
public static final long DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME = -1;
|
||||
/**
|
||||
* Path where history files should be stored after a job finished and before
|
||||
* they are pulled into the job history server.
|
||||
|
|
|
@ -33,6 +33,10 @@
|
|||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-common</artifactId>
|
||||
|
@ -53,6 +57,12 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.hs;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -69,6 +70,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
|
||||
/**
|
||||
* This class provides a way to interact with history files in a thread safe
|
||||
|
@ -464,7 +467,8 @@ public class HistoryFileManager extends AbstractService {
|
|||
|
||||
private JobACLsManager aclsMgr;
|
||||
|
||||
private Configuration conf;
|
||||
@VisibleForTesting
|
||||
Configuration conf;
|
||||
|
||||
private String serialNumberFormat;
|
||||
|
||||
|
@ -491,36 +495,10 @@ public class HistoryFileManager extends AbstractService {
|
|||
+ (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
|
||||
+ "d");
|
||||
|
||||
String doneDirPrefix = null;
|
||||
doneDirPrefix = JobHistoryUtils
|
||||
.getConfiguredHistoryServerDoneDirPrefix(conf);
|
||||
try {
|
||||
doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
|
||||
new Path(doneDirPrefix));
|
||||
doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
|
||||
doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
|
||||
mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
|
||||
JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException("Error creating done directory: ["
|
||||
+ doneDirPrefixPath + "]", e);
|
||||
}
|
||||
|
||||
String intermediateDoneDirPrefix = null;
|
||||
intermediateDoneDirPrefix = JobHistoryUtils
|
||||
.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
|
||||
try {
|
||||
intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
|
||||
new Path(intermediateDoneDirPrefix));
|
||||
intermediateDoneDirFc = FileContext.getFileContext(
|
||||
intermediateDoneDirPath.toUri(), conf);
|
||||
mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
|
||||
JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
|
||||
} catch (IOException e) {
|
||||
LOG.info("error creating done directory on dfs " + e);
|
||||
throw new YarnRuntimeException("Error creating intermediate done directory: ["
|
||||
+ intermediateDoneDirPath + "]", e);
|
||||
}
|
||||
long maxFSWaitTime = conf.getLong(
|
||||
JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
|
||||
createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime);
|
||||
|
||||
this.aclsMgr = new JobACLsManager(conf);
|
||||
|
||||
|
@ -544,6 +522,107 @@ public class HistoryFileManager extends AbstractService {
|
|||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void createHistoryDirs(Clock clock, long intervalCheckMillis,
|
||||
long timeOutMillis) throws IOException {
|
||||
long start = clock.getTime();
|
||||
boolean done = false;
|
||||
int counter = 0;
|
||||
while (!done &&
|
||||
((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) {
|
||||
done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec
|
||||
try {
|
||||
Thread.sleep(intervalCheckMillis);
|
||||
} catch (InterruptedException ex) {
|
||||
throw new YarnRuntimeException(ex);
|
||||
}
|
||||
}
|
||||
if (!done) {
|
||||
throw new YarnRuntimeException("Timed out '" + timeOutMillis+
|
||||
"ms' waiting for FileSystem to become available");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* DistributedFileSystem returns a RemoteException with a message stating
|
||||
* SafeModeException in it. So this is only way to check it is because of
|
||||
* being in safe mode.
|
||||
*/
|
||||
private boolean isBecauseSafeMode(Throwable ex) {
|
||||
return ex.toString().contains("SafeModeException");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns TRUE if the history dirs were created, FALSE if they could not
|
||||
* be created because the FileSystem is not reachable or in safe mode and
|
||||
* throws and exception otherwise.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
boolean tryCreatingHistoryDirs(boolean logWait) throws IOException {
|
||||
boolean succeeded = true;
|
||||
String doneDirPrefix = JobHistoryUtils.
|
||||
getConfiguredHistoryServerDoneDirPrefix(conf);
|
||||
try {
|
||||
doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
|
||||
new Path(doneDirPrefix));
|
||||
doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
|
||||
doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
|
||||
mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
|
||||
JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
|
||||
} catch (ConnectException ex) {
|
||||
if (logWait) {
|
||||
LOG.info("Waiting for FileSystem at " +
|
||||
doneDirPrefixPath.toUri().getAuthority() + "to be available");
|
||||
}
|
||||
succeeded = false;
|
||||
} catch (IOException e) {
|
||||
if (isBecauseSafeMode(e)) {
|
||||
succeeded = false;
|
||||
if (logWait) {
|
||||
LOG.info("Waiting for FileSystem at " +
|
||||
doneDirPrefixPath.toUri().getAuthority() +
|
||||
"to be out of safe mode");
|
||||
}
|
||||
} else {
|
||||
throw new YarnRuntimeException("Error creating done directory: ["
|
||||
+ doneDirPrefixPath + "]", e);
|
||||
}
|
||||
}
|
||||
if (succeeded) {
|
||||
String intermediateDoneDirPrefix = JobHistoryUtils.
|
||||
getConfiguredHistoryIntermediateDoneDirPrefix(conf);
|
||||
try {
|
||||
intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
|
||||
new Path(intermediateDoneDirPrefix));
|
||||
intermediateDoneDirFc = FileContext.getFileContext(
|
||||
intermediateDoneDirPath.toUri(), conf);
|
||||
mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
|
||||
JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
|
||||
} catch (ConnectException ex) {
|
||||
succeeded = false;
|
||||
if (logWait) {
|
||||
LOG.info("Waiting for FileSystem at " +
|
||||
intermediateDoneDirPath.toUri().getAuthority() +
|
||||
"to be available");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (isBecauseSafeMode(e)) {
|
||||
succeeded = false;
|
||||
if (logWait) {
|
||||
LOG.info("Waiting for FileSystem at " +
|
||||
intermediateDoneDirPath.toUri().getAuthority() +
|
||||
"to be out of safe mode");
|
||||
}
|
||||
} else {
|
||||
throw new YarnRuntimeException(
|
||||
"Error creating intermediate done directory: ["
|
||||
+ intermediateDoneDirPath + "]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return succeeded;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStop() throws Exception {
|
||||
ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs;
|
||||
|
||||
|
||||
import junit.framework.Assert;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public class TestHistoryFileManager {
|
||||
private static MiniDFSCluster dfsCluster = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpClass() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
dfsCluster = new MiniDFSCluster.Builder(conf).build();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanUpClass() throws Exception {
|
||||
dfsCluster.shutdown();
|
||||
}
|
||||
|
||||
private void testTryCreateHistoryDirs(Configuration conf, boolean expected)
|
||||
throws Exception {
|
||||
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID());
|
||||
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID());
|
||||
HistoryFileManager hfm = new HistoryFileManager();
|
||||
hfm.conf = conf;
|
||||
Assert.assertEquals(expected, hfm.tryCreatingHistoryDirs(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDirsWithoutFileSystem() throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:1");
|
||||
testTryCreateHistoryDirs(conf, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDirsWithFileSystem() throws Exception {
|
||||
dfsCluster.getFileSystem().setSafeMode(
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
|
||||
Assert.assertFalse(dfsCluster.getFileSystem().isInSafeMode());
|
||||
testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDirsWithFileSystemInSafeMode() throws Exception {
|
||||
dfsCluster.getFileSystem().setSafeMode(
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
|
||||
Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode());
|
||||
testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), false);
|
||||
}
|
||||
|
||||
private void testCreateHistoryDirs(Configuration conf, Clock clock)
|
||||
throws Exception {
|
||||
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID());
|
||||
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID());
|
||||
HistoryFileManager hfm = new HistoryFileManager();
|
||||
hfm.conf = conf;
|
||||
hfm.createHistoryDirs(clock, 500, 2000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDirsWithFileSystemBecomingAvailBeforeTimeout()
|
||||
throws Exception {
|
||||
dfsCluster.getFileSystem().setSafeMode(
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
|
||||
Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode());
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
dfsCluster.getFileSystem().setSafeMode(
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
|
||||
Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode());
|
||||
} catch (Exception ex) {
|
||||
Assert.fail(ex.toString());
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
testCreateHistoryDirs(dfsCluster.getConfiguration(0), new SystemClock());
|
||||
}
|
||||
|
||||
@Test(expected = YarnRuntimeException.class)
|
||||
public void testCreateDirsWithFileSystemNotBecomingAvailBeforeTimeout()
|
||||
throws Exception {
|
||||
dfsCluster.getFileSystem().setSafeMode(
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
|
||||
Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode());
|
||||
final ControlledClock clock = new ControlledClock(new SystemClock());
|
||||
clock.setTime(1);
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
clock.setTime(3000);
|
||||
} catch (Exception ex) {
|
||||
Assert.fail(ex.toString());
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
testCreateHistoryDirs(dfsCluster.getConfiguration(0), clock);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue