YARN-2566. DefaultContainerExecutor should pick a working directory randomly. (Zhihai Xu via kasha)
(cherry picked from commit cc93e7e683
)
This commit is contained in:
parent
a1116b56a4
commit
88455173e8
|
@ -171,7 +171,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
|
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
|
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
|
||||||
public final class FileContext {
|
public class FileContext {
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(FileContext.class);
|
public static final Log LOG = LogFactory.getLog(FileContext.class);
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -613,6 +613,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
queue to which the apps were submitted is changed across RM restart.
|
queue to which the apps were submitted is changed across RM restart.
|
||||||
(Craig Welch & Chang Li via jianhe)
|
(Craig Welch & Chang Li via jianhe)
|
||||||
|
|
||||||
|
YARN-2566. DefaultContainerExecutor should pick a working directory randomly.
|
||||||
|
(Zhihai Xu via kasha)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -840,10 +840,10 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
|
public static final String NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
|
||||||
NM_DISK_HEALTH_CHECK_PREFIX + "max-disk-utilization-per-disk-percentage";
|
NM_DISK_HEALTH_CHECK_PREFIX + "max-disk-utilization-per-disk-percentage";
|
||||||
/**
|
/**
|
||||||
* By default, 100% of the disk can be used before it is marked as offline.
|
* By default, 90% of the disk can be used before it is marked as offline.
|
||||||
*/
|
*/
|
||||||
public static final float DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
|
public static final float DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
|
||||||
100.0F;
|
90.0F;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The minimum space that must be available on a local dir for it to be used.
|
* The minimum space that must be available on a local dir for it to be used.
|
||||||
|
|
|
@ -948,7 +948,7 @@
|
||||||
for full disk. This applies to yarn-nodemanager.local-dirs and
|
for full disk. This applies to yarn-nodemanager.local-dirs and
|
||||||
yarn.nodemanager.log-dirs.</description>
|
yarn.nodemanager.log-dirs.</description>
|
||||||
<name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
|
<name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
|
||||||
<value>100.0</value>
|
<value>90.0</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -103,8 +104,8 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
createAppDirs(localDirs, user, appId);
|
createAppDirs(localDirs, user, appId);
|
||||||
createAppLogDirs(appId, logDirs, user);
|
createAppLogDirs(appId, logDirs, user);
|
||||||
|
|
||||||
// TODO: Why pick first app dir. The same in LCE why not random?
|
// randomly choose the local directory
|
||||||
Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
|
Path appStorageDir = getWorkingDir(localDirs, user, appId);
|
||||||
|
|
||||||
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
|
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
|
||||||
Path tokenDst = new Path(appStorageDir, tokenFn);
|
Path tokenDst = new Path(appStorageDir, tokenFn);
|
||||||
|
@ -466,6 +467,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
return getApplicationDir(new Path(localDirs.get(0)), user, appId);
|
return getApplicationDir(new Path(localDirs.get(0)), user, appId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getDiskFreeSpace(Path base) throws IOException {
|
||||||
|
return lfs.getFsStatus(base).getRemaining();
|
||||||
|
}
|
||||||
|
|
||||||
private Path getApplicationDir(Path base, String user, String appId) {
|
private Path getApplicationDir(Path base, String user, String appId) {
|
||||||
return new Path(getAppcacheDir(base, user), appId);
|
return new Path(getAppcacheDir(base, user), appId);
|
||||||
}
|
}
|
||||||
|
@ -484,6 +489,56 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
ContainerLocalizer.FILECACHE);
|
ContainerLocalizer.FILECACHE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Path getWorkingDir(List<String> localDirs, String user,
|
||||||
|
String appId) throws IOException {
|
||||||
|
Path appStorageDir = null;
|
||||||
|
long totalAvailable = 0L;
|
||||||
|
long[] availableOnDisk = new long[localDirs.size()];
|
||||||
|
int i = 0;
|
||||||
|
// randomly choose the app directory
|
||||||
|
// the chance of picking a directory is proportional to
|
||||||
|
// the available space on the directory.
|
||||||
|
// firstly calculate the sum of all available space on these directories
|
||||||
|
for (String localDir : localDirs) {
|
||||||
|
Path curBase = getApplicationDir(new Path(localDir),
|
||||||
|
user, appId);
|
||||||
|
long space = 0L;
|
||||||
|
try {
|
||||||
|
space = getDiskFreeSpace(curBase);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to get Free Space for " + curBase.toString(), e);
|
||||||
|
}
|
||||||
|
availableOnDisk[i++] = space;
|
||||||
|
totalAvailable += space;
|
||||||
|
}
|
||||||
|
|
||||||
|
// throw an IOException if totalAvailable is 0.
|
||||||
|
if (totalAvailable <= 0L) {
|
||||||
|
throw new IOException("Not able to find a working directory for "
|
||||||
|
+ user);
|
||||||
|
}
|
||||||
|
|
||||||
|
// make probability to pick a directory proportional to
|
||||||
|
// the available space on the directory.
|
||||||
|
Random r = new Random();
|
||||||
|
long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
|
||||||
|
int dir = 0;
|
||||||
|
// skip zero available space directory,
|
||||||
|
// because totalAvailable is greater than 0 and randomPosition
|
||||||
|
// is less than totalAvailable, we can find a valid directory
|
||||||
|
// with nonzero available space.
|
||||||
|
while (availableOnDisk[dir] == 0L) {
|
||||||
|
dir++;
|
||||||
|
}
|
||||||
|
while (randomPosition > availableOnDisk[dir]) {
|
||||||
|
randomPosition -= availableOnDisk[dir++];
|
||||||
|
}
|
||||||
|
appStorageDir = getApplicationDir(new Path(localDirs.get(dir)),
|
||||||
|
user, appId);
|
||||||
|
|
||||||
|
return appStorageDir;
|
||||||
|
}
|
||||||
|
|
||||||
protected void createDir(Path dirPath, FsPermission perms,
|
protected void createDir(Path dirPath, FsPermission perms,
|
||||||
boolean createParent, String user) throws IOException {
|
boolean createParent, String user) throws IOException {
|
||||||
lfs.mkdir(dirPath, perms, createParent);
|
lfs.mkdir(dirPath, perms, createParent);
|
||||||
|
|
|
@ -27,6 +27,7 @@ import static org.mockito.Mockito.any;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
|
@ -41,14 +42,6 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||||
|
@ -57,20 +50,30 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.FsStatus;
|
||||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -296,6 +299,102 @@ public class TestDefaultContainerExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testStartLocalizer()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
InetSocketAddress localizationServerAddress;
|
||||||
|
final Path firstDir = new Path(BASE_TMP_PATH, "localDir1");
|
||||||
|
List<String> localDirs = new ArrayList<String>();
|
||||||
|
final Path secondDir = new Path(BASE_TMP_PATH, "localDir2");
|
||||||
|
List<String> logDirs = new ArrayList<String>();
|
||||||
|
final Path logDir = new Path(BASE_TMP_PATH, "logDir");
|
||||||
|
final Path tokenDir = new Path(BASE_TMP_PATH, "tokenDir");
|
||||||
|
FsPermission perms = new FsPermission((short)0770);
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
localizationServerAddress = conf.getSocketAddr(
|
||||||
|
YarnConfiguration.NM_BIND_HOST,
|
||||||
|
YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
|
||||||
|
|
||||||
|
final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf));
|
||||||
|
final FileContext.Util mockUtil = spy(mockLfs.util());
|
||||||
|
doAnswer(new Answer() {
|
||||||
|
@Override
|
||||||
|
public Object answer(InvocationOnMock invocationOnMock)
|
||||||
|
throws Throwable {
|
||||||
|
return mockUtil;
|
||||||
|
}
|
||||||
|
}).when(mockLfs).util();
|
||||||
|
doAnswer(new Answer() {
|
||||||
|
@Override
|
||||||
|
public Object answer(InvocationOnMock invocationOnMock)
|
||||||
|
throws Throwable {
|
||||||
|
Path dest = (Path) invocationOnMock.getArguments()[1];
|
||||||
|
if (dest.toString().contains(firstDir.toString())) {
|
||||||
|
// throw an Exception when copy token to the first local dir
|
||||||
|
// to simulate no space on the first drive
|
||||||
|
throw new IOException("No space on this drive " +
|
||||||
|
dest.toString());
|
||||||
|
} else {
|
||||||
|
// copy token to the second local dir
|
||||||
|
DataOutputStream tokenOut = null;
|
||||||
|
try {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
tokenOut = mockLfs.create(dest,
|
||||||
|
EnumSet.of(CREATE, OVERWRITE));
|
||||||
|
credentials.writeTokenStorageToStream(tokenOut);
|
||||||
|
} finally {
|
||||||
|
if (tokenOut != null) {
|
||||||
|
tokenOut.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(mockUtil).copy(any(Path.class), any(Path.class));
|
||||||
|
doAnswer(new Answer() {
|
||||||
|
@Override
|
||||||
|
public Object answer(InvocationOnMock invocationOnMock)
|
||||||
|
throws Throwable {
|
||||||
|
Path p = (Path) invocationOnMock.getArguments()[0];
|
||||||
|
// let second local directory return more free space than
|
||||||
|
// first local directory
|
||||||
|
if (p.toString().contains(firstDir.toString())) {
|
||||||
|
return new FsStatus(2000, 2000, 0);
|
||||||
|
} else {
|
||||||
|
return new FsStatus(1000, 0, 1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).when(mockLfs).getFsStatus(any(Path.class));
|
||||||
|
|
||||||
|
DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(
|
||||||
|
mockLfs));
|
||||||
|
mockExec.setConf(conf);
|
||||||
|
localDirs.add(mockLfs.makeQualified(firstDir).toString());
|
||||||
|
localDirs.add(mockLfs.makeQualified(secondDir).toString());
|
||||||
|
logDirs.add(mockLfs.makeQualified(logDir).toString());
|
||||||
|
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
|
||||||
|
localDirs.toArray(new String[localDirs.size()]));
|
||||||
|
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString());
|
||||||
|
mockLfs.mkdir(tokenDir, perms, true);
|
||||||
|
Path nmPrivateCTokensPath = new Path(tokenDir, "test.tokens");
|
||||||
|
String appSubmitter = "nobody";
|
||||||
|
String appId = "APP_ID";
|
||||||
|
String locId = "LOC_ID";
|
||||||
|
try {
|
||||||
|
mockExec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
|
||||||
|
appSubmitter, appId, locId, localDirs, logDirs);
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("StartLocalizer failed to copy token file " + e);
|
||||||
|
} finally {
|
||||||
|
mockExec.deleteAsUser(appSubmitter, firstDir);
|
||||||
|
mockExec.deleteAsUser(appSubmitter, secondDir);
|
||||||
|
mockExec.deleteAsUser(appSubmitter, logDir);
|
||||||
|
deleteTmpFiles();
|
||||||
|
}
|
||||||
|
}
|
||||||
// @Test
|
// @Test
|
||||||
// public void testInit() throws IOException, InterruptedException {
|
// public void testInit() throws IOException, InterruptedException {
|
||||||
// Configuration conf = new Configuration();
|
// Configuration conf = new Configuration();
|
||||||
|
|
Loading…
Reference in New Issue