diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
index 4d56a4ee4f7..57bd3b51253 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
@@ -171,7 +171,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
@InterfaceAudience.Public
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
-public final class FileContext {
+public class FileContext {
public static final Log LOG = LogFactory.getLog(FileContext.class);
/**
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e00f5ec751a..9d5dbd106f2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -577,6 +577,9 @@ Release 2.6.0 - UNRELEASED
queue to which the apps were submitted is changed across RM restart.
(Craig Welch & Chang Li via jianhe)
+ YARN-2566. DefaultContainerExecutor should pick a working directory randomly.
+ (Zhihai Xu via kasha)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d60cbc2bdf4..056b9633e2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -840,10 +840,10 @@ public class YarnConfiguration extends Configuration {
public static final String NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
NM_DISK_HEALTH_CHECK_PREFIX + "max-disk-utilization-per-disk-percentage";
/**
- * By default, 100% of the disk can be used before it is marked as offline.
+ * By default, 90% of the disk can be used before it is marked as offline.
*/
public static final float DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
- 100.0F;
+ 90.0F;
/**
* The minimum space that must be available on a local dir for it to be used.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 5cb1988e1a1..54b4f635306 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -948,7 +948,7 @@
for full disk. This applies to yarn-nodemanager.local-dirs and
yarn.nodemanager.log-dirs.
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage
- 100.0
+ 90.0
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index a7af1c59b6e..1d5f5b69db5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -95,8 +96,8 @@ public class DefaultContainerExecutor extends ContainerExecutor {
createAppDirs(localDirs, user, appId);
createAppLogDirs(appId, logDirs);
- // TODO: Why pick first app dir. The same in LCE why not random?
- Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
+ // randomly choose the local directory
+ Path appStorageDir = getWorkingDir(localDirs, user, appId);
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
@@ -454,6 +455,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
return getApplicationDir(new Path(localDirs.get(0)), user, appId);
}
+ private long getDiskFreeSpace(Path base) throws IOException {
+ return lfs.getFsStatus(base).getRemaining();
+ }
+
private Path getApplicationDir(Path base, String user, String appId) {
return new Path(getAppcacheDir(base, user), appId);
}
@@ -480,6 +485,56 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
}
+ private Path getWorkingDir(List localDirs, String user,
+ String appId) throws IOException {
+ Path appStorageDir = null;
+ long totalAvailable = 0L;
+ long[] availableOnDisk = new long[localDirs.size()];
+ int i = 0;
+ // randomly choose the app directory
+ // the chance of picking a directory is proportional to
+ // the available space on the directory.
+ // firstly calculate the sum of all available space on these directories
+ for (String localDir : localDirs) {
+ Path curBase = getApplicationDir(new Path(localDir),
+ user, appId);
+ long space = 0L;
+ try {
+ space = getDiskFreeSpace(curBase);
+ } catch (IOException e) {
+ LOG.warn("Unable to get Free Space for " + curBase.toString(), e);
+ }
+ availableOnDisk[i++] = space;
+ totalAvailable += space;
+ }
+
+ // throw an IOException if totalAvailable is 0.
+ if (totalAvailable <= 0L) {
+ throw new IOException("Not able to find a working directory for "
+ + user);
+ }
+
+ // make probability to pick a directory proportional to
+ // the available space on the directory.
+ Random r = new Random();
+ long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
+ int dir = 0;
+ // skip zero available space directory,
+ // because totalAvailable is greater than 0 and randomPosition
+ // is less than totalAvailable, we can find a valid directory
+ // with nonzero available space.
+ while (availableOnDisk[dir] == 0L) {
+ dir++;
+ }
+ while (randomPosition > availableOnDisk[dir]) {
+ randomPosition -= availableOnDisk[dir++];
+ }
+ appStorageDir = getApplicationDir(new Path(localDirs.get(dir)),
+ user, appId);
+
+ return appStorageDir;
+ }
+
/**
* Initialize the local directories for a particular user.
* .mkdir
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
index 9c86c71c833..7cdbd1df54c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.junit.Assert.assertTrue;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
@@ -41,14 +42,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Random;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
@@ -57,20 +50,30 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.junit.After;
-import org.junit.Assert;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -296,6 +299,102 @@ public class TestDefaultContainerExecutor {
}
}
+ @Test(timeout = 30000)
+ public void testStartLocalizer()
+ throws IOException, InterruptedException {
+ InetSocketAddress localizationServerAddress;
+ final Path firstDir = new Path(BASE_TMP_PATH, "localDir1");
+ List localDirs = new ArrayList();
+ final Path secondDir = new Path(BASE_TMP_PATH, "localDir2");
+ List logDirs = new ArrayList();
+ final Path logDir = new Path(BASE_TMP_PATH, "logDir");
+ final Path tokenDir = new Path(BASE_TMP_PATH, "tokenDir");
+ FsPermission perms = new FsPermission((short)0770);
+
+ Configuration conf = new Configuration();
+ localizationServerAddress = conf.getSocketAddr(
+ YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_LOCALIZER_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
+
+ final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf));
+ final FileContext.Util mockUtil = spy(mockLfs.util());
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock)
+ throws Throwable {
+ return mockUtil;
+ }
+ }).when(mockLfs).util();
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock)
+ throws Throwable {
+ Path dest = (Path) invocationOnMock.getArguments()[1];
+ if (dest.toString().contains(firstDir.toString())) {
+ // throw an Exception when copy token to the first local dir
+ // to simulate no space on the first drive
+ throw new IOException("No space on this drive " +
+ dest.toString());
+ } else {
+ // copy token to the second local dir
+ DataOutputStream tokenOut = null;
+ try {
+ Credentials credentials = new Credentials();
+ tokenOut = mockLfs.create(dest,
+ EnumSet.of(CREATE, OVERWRITE));
+ credentials.writeTokenStorageToStream(tokenOut);
+ } finally {
+ if (tokenOut != null) {
+ tokenOut.close();
+ }
+ }
+ }
+ return null;
+ }
+ }).when(mockUtil).copy(any(Path.class), any(Path.class));
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock)
+ throws Throwable {
+ Path p = (Path) invocationOnMock.getArguments()[0];
+ // let second local directory return more free space than
+ // first local directory
+ if (p.toString().contains(firstDir.toString())) {
+ return new FsStatus(2000, 2000, 0);
+ } else {
+ return new FsStatus(1000, 0, 1000);
+ }
+ }
+ }).when(mockLfs).getFsStatus(any(Path.class));
+
+ DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(
+ mockLfs));
+ mockExec.setConf(conf);
+ localDirs.add(mockLfs.makeQualified(firstDir).toString());
+ localDirs.add(mockLfs.makeQualified(secondDir).toString());
+ logDirs.add(mockLfs.makeQualified(logDir).toString());
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
+ localDirs.toArray(new String[localDirs.size()]));
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString());
+ mockLfs.mkdir(tokenDir, perms, true);
+ Path nmPrivateCTokensPath = new Path(tokenDir, "test.tokens");
+ String appSubmitter = "nobody";
+ String appId = "APP_ID";
+ String locId = "LOC_ID";
+ try {
+ mockExec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
+ appSubmitter, appId, locId, localDirs, logDirs);
+ } catch (IOException e) {
+ Assert.fail("StartLocalizer failed to copy token file " + e);
+ } finally {
+ mockExec.deleteAsUser(appSubmitter, firstDir);
+ mockExec.deleteAsUser(appSubmitter, secondDir);
+ mockExec.deleteAsUser(appSubmitter, logDir);
+ deleteTmpFiles();
+ }
+ }
// @Test
// public void testInit() throws IOException, InterruptedException {
// Configuration conf = new Configuration();