YARN-2566. DefaultContainerExecutor should pick a working directory randomly. (Zhihai Xu via kasha)
(cherry picked from commit cc93e7e683fa74eb1a7aa2b357a36667bd21086a) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
This commit is contained in:
@ -171,7 +171,7 @@
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
public final class FileContext {
public class FileContext {
public static final Log LOG = LogFactory.getLog(FileContext.class);
@ -577,6 +577,9 @@ Release 2.6.0 - UNRELEASED
queue to which the apps were submitted is changed across RM restart.
(Craig Welch & Chang Li via jianhe)
YARN-2566. DefaultContainerExecutor should pick a working directory randomly.
(Zhihai Xu via kasha)
Release 2.5.1 - 2014-09-05
@ -840,10 +840,10 @@ public class YarnConfiguration extends Configuration {
NM_DISK_HEALTH_CHECK_PREFIX + "max-disk-utilization-per-disk-percentage";
* By default, 100% of the disk can be used before it is marked as offline.
* By default, 90% of the disk can be used before it is marked as offline.
* The minimum space that must be available on a local dir for it to be used.
@ -948,7 +948,7 @@
for full disk. This applies to yarn-nodemanager.local-dirs and
@ -31,6 +31,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -95,8 +96,8 @@ public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
createAppDirs(localDirs, user, appId);
createAppLogDirs(appId, logDirs);
// TODO: Why pick first app dir. The same in LCE why not random?
Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
// randomly choose the local directory
Path appStorageDir = getWorkingDir(localDirs, user, appId);
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
@ -454,6 +455,10 @@ private Path getFirstApplicationDir(List<String> localDirs, String user,
return getApplicationDir(new Path(localDirs.get(0)), user, appId);
private long getDiskFreeSpace(Path base) throws IOException {
return lfs.getFsStatus(base).getRemaining();
private Path getApplicationDir(Path base, String user, String appId) {
return new Path(getAppcacheDir(base, user), appId);
@ -480,6 +485,56 @@ private void createDir(Path dirPath, FsPermission perms,
private Path getWorkingDir(List<String> localDirs, String user,
String appId) throws IOException {
Path appStorageDir = null;
long totalAvailable = 0L;
long[] availableOnDisk = new long[localDirs.size()];
int i = 0;
// randomly choose the app directory
// the chance of picking a directory is proportional to
// the available space on the directory.
// firstly calculate the sum of all available space on these directories
for (String localDir : localDirs) {
Path curBase = getApplicationDir(new Path(localDir),
user, appId);
long space = 0L;
try {
space = getDiskFreeSpace(curBase);
} catch (IOException e) {
LOG.warn("Unable to get Free Space for " + curBase.toString(), e);
availableOnDisk[i++] = space;
totalAvailable += space;
// throw an IOException if totalAvailable is 0.
if (totalAvailable <= 0L) {
throw new IOException("Not able to find a working directory for "
+ user);
// make probability to pick a directory proportional to
// the available space on the directory.
Random r = new Random();
long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
int dir = 0;
// skip zero available space directory,
// because totalAvailable is greater than 0 and randomPosition
// is less than totalAvailable, we can find a valid directory
// with nonzero available space.
while (availableOnDisk[dir] == 0L) {
while (randomPosition > availableOnDisk[dir]) {
randomPosition -= availableOnDisk[dir++];
appStorageDir = getApplicationDir(new Path(localDirs.get(dir)),
user, appId);
return appStorageDir;
* Initialize the local directories for a particular user.
* <ul>.mkdir
@ -27,6 +27,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
@ -41,14 +42,6 @@
import java.util.List;
import java.util.Random;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
@ -57,20 +50,30 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.After;
import org.junit.Assert;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -296,6 +299,102 @@ public Object answer(InvocationOnMock invocationOnMock)
@Test(timeout = 30000)
public void testStartLocalizer()
throws IOException, InterruptedException {
InetSocketAddress localizationServerAddress;
final Path firstDir = new Path(BASE_TMP_PATH, "localDir1");
List<String> localDirs = new ArrayList<String>();
final Path secondDir = new Path(BASE_TMP_PATH, "localDir2");
List<String> logDirs = new ArrayList<String>();
final Path logDir = new Path(BASE_TMP_PATH, "logDir");
final Path tokenDir = new Path(BASE_TMP_PATH, "tokenDir");
FsPermission perms = new FsPermission((short)0770);
Configuration conf = new Configuration();
localizationServerAddress = conf.getSocketAddr(
final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf));
final FileContext.Util mockUtil = spy(mockLfs.util());
doAnswer(new Answer() {
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
return mockUtil;
doAnswer(new Answer() {
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
Path dest = (Path) invocationOnMock.getArguments()[1];
if (dest.toString().contains(firstDir.toString())) {
// throw an Exception when copy token to the first local dir
// to simulate no space on the first drive
throw new IOException("No space on this drive " +
} else {
// copy token to the second local dir
DataOutputStream tokenOut = null;
try {
Credentials credentials = new Credentials();
tokenOut = mockLfs.create(dest,
} finally {
if (tokenOut != null) {
return null;
}).when(mockUtil).copy(any(Path.class), any(Path.class));
doAnswer(new Answer() {
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
Path p = (Path) invocationOnMock.getArguments()[0];
// let second local directory return more free space than
// first local directory
if (p.toString().contains(firstDir.toString())) {
return new FsStatus(2000, 2000, 0);
} else {
return new FsStatus(1000, 0, 1000);
DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(
localDirs.toArray(new String[localDirs.size()]));
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString());
mockLfs.mkdir(tokenDir, perms, true);
Path nmPrivateCTokensPath = new Path(tokenDir, "test.tokens");
String appSubmitter = "nobody";
String appId = "APP_ID";
String locId = "LOC_ID";
try {
mockExec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
appSubmitter, appId, locId, localDirs, logDirs);
} catch (IOException e) {
Assert.fail("StartLocalizer failed to copy token file " + e);
} finally {
mockExec.deleteAsUser(appSubmitter, firstDir);
mockExec.deleteAsUser(appSubmitter, secondDir);
mockExec.deleteAsUser(appSubmitter, logDir);
// @Test
// public void testInit() throws IOException, InterruptedException {
// Configuration conf = new Configuration();
Reference in New Issue
Block a user