YARN-2701. Potential race condition in startLocalizer when using LinuxContainerExecutor. Contributed by Xuan Gong
This commit is contained in:
parent
7aab5fa1bd
commit
2839365f23
|
@ -699,6 +699,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2588. Standby RM fails to transitionToActive if previous
|
YARN-2588. Standby RM fails to transitionToActive if previous
|
||||||
transitionToActive failed with ZK exception. (Rohith Sharmaks via jianhe)
|
transitionToActive failed with ZK exception. (Rohith Sharmaks via jianhe)
|
||||||
|
|
||||||
|
YARN-2701. Potential race condition in startLocalizer when using
|
||||||
|
LinuxContainerExecutor. (Xuan Gong via jianhe)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -220,7 +221,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
if (javaLibPath != null) {
|
if (javaLibPath != null) {
|
||||||
command.add("-Djava.library.path=" + javaLibPath);
|
command.add("-Djava.library.path=" + javaLibPath);
|
||||||
}
|
}
|
||||||
ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr, localDirs);
|
buildMainArgs(command, user, appId, locId, nmAddr, localDirs);
|
||||||
String[] commandArray = command.toArray(new String[command.size()]);
|
String[] commandArray = command.toArray(new String[command.size()]);
|
||||||
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
|
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -241,6 +242,13 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void buildMainArgs(List<String> command, String user, String appId,
|
||||||
|
String locId, InetSocketAddress nmAddr, List<String> localDirs) {
|
||||||
|
ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
|
||||||
|
localDirs);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int launchContainer(Container container,
|
public int launchContainer(Container container,
|
||||||
Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
|
Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
|
||||||
|
|
|
@ -451,59 +451,39 @@ char *get_tmp_directory(const char *work_dir) {
|
||||||
* with the desired permissions.
|
* with the desired permissions.
|
||||||
*/
|
*/
|
||||||
int mkdirs(const char* path, mode_t perm) {
|
int mkdirs(const char* path, mode_t perm) {
|
||||||
struct stat sb;
|
char *buffer = strdup(path);
|
||||||
char * npath;
|
char *token;
|
||||||
char * p;
|
int cwd = open("/", O_RDONLY);
|
||||||
if (stat(path, &sb) == 0) {
|
if (cwd == -1) {
|
||||||
if (S_ISDIR (sb.st_mode)) {
|
fprintf(LOGFILE, "Can't open / in %s - %s\n", path, strerror(errno));
|
||||||
return 0;
|
free(buffer);
|
||||||
} else {
|
|
||||||
fprintf(LOGFILE, "Path %s is file not dir\n", path);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
npath = strdup(path);
|
|
||||||
if (npath == NULL) {
|
|
||||||
fprintf(LOGFILE, "Not enough memory to copy path string");
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
/* Skip leading slashes. */
|
for(token = strtok(buffer, "/"); token != NULL; token = strtok(NULL, "/")) {
|
||||||
p = npath;
|
if (mkdirat(cwd, token, perm) != 0) {
|
||||||
while (*p == '/') {
|
if (errno != EEXIST) {
|
||||||
p++;
|
fprintf(LOGFILE, "Can't create directory %s in %s - %s\n",
|
||||||
}
|
token, path, strerror(errno));
|
||||||
|
close(cwd);
|
||||||
while (NULL != (p = strchr(p, '/'))) {
|
free(buffer);
|
||||||
*p = '\0';
|
return -1;
|
||||||
if (stat(npath, &sb) != 0) {
|
|
||||||
if (mkdir(npath, perm) != 0) {
|
|
||||||
fprintf(LOGFILE, "Can't create directory %s in %s - %s\n", npath,
|
|
||||||
path, strerror(errno));
|
|
||||||
free(npath);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
} else if (!S_ISDIR (sb.st_mode)) {
|
}
|
||||||
fprintf(LOGFILE, "Path %s is file not dir\n", npath);
|
int new_dir = openat(cwd, token, O_RDONLY);
|
||||||
free(npath);
|
close(cwd);
|
||||||
|
cwd = new_dir;
|
||||||
|
if (cwd == -1) {
|
||||||
|
fprintf(LOGFILE, "Can't open %s in %s - %s\n", token, path,
|
||||||
|
strerror(errno));
|
||||||
|
free(buffer);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*p++ = '/'; /* restore slash */
|
|
||||||
while (*p == '/')
|
|
||||||
p++;
|
|
||||||
}
|
}
|
||||||
|
free(buffer);
|
||||||
/* Create the final directory component. */
|
close(cwd);
|
||||||
if (mkdir(npath, perm) != 0) {
|
|
||||||
fprintf(LOGFILE, "Can't create directory %s - %s\n", npath,
|
|
||||||
strerror(errno));
|
|
||||||
free(npath);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
free(npath);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Function to prepare the container directories.
|
* Function to prepare the container directories.
|
||||||
* It creates the container work and log directories.
|
* It creates the container work and log directories.
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
@ -29,8 +31,11 @@ import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -52,6 +57,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
|
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -108,10 +115,12 @@ public class TestLinuxContainerExecutor {
|
||||||
private LinuxContainerExecutor exec = null;
|
private LinuxContainerExecutor exec = null;
|
||||||
private String appSubmitter = null;
|
private String appSubmitter = null;
|
||||||
private LocalDirsHandlerService dirsHandler;
|
private LocalDirsHandlerService dirsHandler;
|
||||||
|
private Configuration conf;
|
||||||
|
private FileContext files;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
FileContext files = FileContext.getLocalFSFileContext();
|
files = FileContext.getLocalFSFileContext();
|
||||||
Path workSpacePath = new Path(workSpace.getAbsolutePath());
|
Path workSpacePath = new Path(workSpace.getAbsolutePath());
|
||||||
files.mkdir(workSpacePath, null, true);
|
files.mkdir(workSpacePath, null, true);
|
||||||
FileUtil.chmod(workSpace.getAbsolutePath(), "777");
|
FileUtil.chmod(workSpace.getAbsolutePath(), "777");
|
||||||
|
@ -123,7 +132,11 @@ public class TestLinuxContainerExecutor {
|
||||||
new FsPermission("777"), false);
|
new FsPermission("777"), false);
|
||||||
String exec_path = System.getProperty("container-executor.path");
|
String exec_path = System.getProperty("container-executor.path");
|
||||||
if(exec_path != null && !exec_path.isEmpty()) {
|
if(exec_path != null && !exec_path.isEmpty()) {
|
||||||
Configuration conf = new Configuration(false);
|
conf = new Configuration(false);
|
||||||
|
conf.setClass("fs.AbstractFileSystem.file.impl",
|
||||||
|
org.apache.hadoop.fs.local.LocalFs.class,
|
||||||
|
org.apache.hadoop.fs.AbstractFileSystem.class);
|
||||||
|
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "xuan");
|
||||||
LOG.info("Setting "+YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
|
LOG.info("Setting "+YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
|
||||||
+"="+exec_path);
|
+"="+exec_path);
|
||||||
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
|
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
|
||||||
|
@ -212,6 +225,59 @@ public class TestLinuxContainerExecutor {
|
||||||
dirsHandler.getLogDirs());
|
dirsHandler.getLogDirs());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerLocalizer() throws Exception {
|
||||||
|
if (!shouldRun()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||||
|
List<String> logDirs = dirsHandler.getLogDirs();
|
||||||
|
for (String localDir : localDirs) {
|
||||||
|
Path userDir =
|
||||||
|
new Path(localDir, ContainerLocalizer.USERCACHE);
|
||||||
|
files.mkdir(userDir, new FsPermission("777"), false);
|
||||||
|
// $local/filecache
|
||||||
|
Path fileDir =
|
||||||
|
new Path(localDir, ContainerLocalizer.FILECACHE);
|
||||||
|
files.mkdir(fileDir, new FsPermission("777"), false);
|
||||||
|
}
|
||||||
|
String locId = "container_01_01";
|
||||||
|
Path nmPrivateContainerTokensPath =
|
||||||
|
dirsHandler.getLocalPathForWrite(
|
||||||
|
ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
|
||||||
|
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
|
||||||
|
locId));
|
||||||
|
files.create(nmPrivateContainerTokensPath, EnumSet.of(CREATE, OVERWRITE));
|
||||||
|
Configuration config = new YarnConfiguration(conf);
|
||||||
|
InetSocketAddress nmAddr = config.getSocketAddr(
|
||||||
|
YarnConfiguration.NM_BIND_HOST,
|
||||||
|
YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
|
||||||
|
String appId = "application_01_01";
|
||||||
|
exec = new LinuxContainerExecutor() {
|
||||||
|
@Override
|
||||||
|
public void buildMainArgs(List<String> command, String user, String appId,
|
||||||
|
String locId, InetSocketAddress nmAddr, List<String> localDirs) {
|
||||||
|
MockContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
|
||||||
|
localDirs);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
exec.setConf(conf);
|
||||||
|
|
||||||
|
exec.startLocalizer(nmPrivateContainerTokensPath, nmAddr, appSubmitter,
|
||||||
|
appId, locId, localDirs, logDirs);
|
||||||
|
|
||||||
|
String locId2 = "container_01_02";
|
||||||
|
Path nmPrivateContainerTokensPath2 =
|
||||||
|
dirsHandler
|
||||||
|
.getLocalPathForWrite(ResourceLocalizationService.NM_PRIVATE_DIR
|
||||||
|
+ Path.SEPARATOR
|
||||||
|
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId2));
|
||||||
|
files.create(nmPrivateContainerTokensPath2, EnumSet.of(CREATE, OVERWRITE));
|
||||||
|
exec.startLocalizer(nmPrivateContainerTokensPath2, nmAddr, appSubmitter,
|
||||||
|
appId, locId2, localDirs, logDirs);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testContainerLaunch() throws IOException {
|
public void testContainerLaunch() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue