YARN-4253. Standardize on using PrivilegedOperationExecutor for all invocations of container-executor in LinuxContainerExecutor. Contributed by Sidharta Seethana.

This commit is contained in:
Varun Vasudev 2015-10-14 14:56:48 +05:30
parent 2a98724342
commit 8d59293089
2 changed files with 144 additions and 134 deletions

View File

@ -931,6 +931,9 @@ Release 2.8.0 - UNRELEASED
YARN-4017. container-executor overuses PATH_MAX. (Sidharta Seethana via vvasudev)
YARN-4253. Standardize on using PrivilegedOperationExecutor for all
invocations of container-executor in LinuxContainerExecutor. (Sidharta Seethana via vvasudev)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -26,8 +26,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -78,7 +76,6 @@ public class LinuxContainerExecutor extends ContainerExecutor {
private String nonsecureLocalUser;
private Pattern nonsecureLocalUserPattern;
private String containerExecutorExe;
private LCEResourcesHandler resourcesHandler;
private boolean containerSchedPriorityIsSet = false;
private int containerSchedPriorityAdjustment = 0;
@ -97,28 +94,28 @@ public class LinuxContainerExecutor extends ContainerExecutor {
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
containerExecutorExe = getContainerExecutorExecutablePath(conf);
resourcesHandler = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
resourcesHandler.setConf(conf);
if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != null) {
containerSchedPriorityIsSet = true;
containerSchedPriorityAdjustment = conf
.getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY)
!= null) {
containerSchedPriorityIsSet = true;
containerSchedPriorityAdjustment = conf
.getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
}
nonsecureLocalUser = conf.get(
YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
nonsecureLocalUserPattern = Pattern.compile(
conf.get(YarnConfiguration.NM_NONSECURE_MODE_USER_PATTERN_KEY,
YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_USER_PATTERN));
YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_USER_PATTERN));
containerLimitUsers = conf.getBoolean(
YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS,
YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LIMIT_USERS);
YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS,
YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LIMIT_USERS);
if (!containerLimitUsers) {
LOG.warn(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS +
": impersonation without authentication enabled");
@ -128,14 +125,14 @@ public class LinuxContainerExecutor extends ContainerExecutor {
void verifyUsernamePattern(String user) {
if (!UserGroupInformation.isSecurityEnabled() &&
!nonsecureLocalUserPattern.matcher(user).matches()) {
throw new IllegalArgumentException("Invalid user name '" + user + "'," +
" it must match '" + nonsecureLocalUserPattern.pattern() + "'");
}
throw new IllegalArgumentException("Invalid user name '" + user + "'," +
" it must match '" + nonsecureLocalUserPattern.pattern() + "'");
}
}
String getRunAsUser(String user) {
if (UserGroupInformation.isSecurityEnabled() ||
!containerLimitUsers) {
!containerLimitUsers) {
return user;
} else {
return nonsecureLocalUser;
@ -147,10 +144,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
File hadoopBin = new File(yarnHomeEnvVar, "bin");
String defaultPath =
new File(hadoopBin, "container-executor").getAbsolutePath();
new File(hadoopBin, "container-executor").getAbsolutePath();
return null == conf
? defaultPath
: conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath);
? defaultPath
: conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH,
defaultPath);
}
protected void addSchedPriorityCommand(List<String> command) {
@ -160,31 +158,29 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
}
@Override
public void init() throws IOException {
// Send command to executor which will just start up,
@Override
public void init() throws IOException {
Configuration conf = super.getConf();
// Send command to executor which will just start up,
// verify configuration/permissions and exit
List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe,
"--checksetup"));
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
if (LOG.isDebugEnabled()) {
LOG.debug("checkLinuxExecutorSetup: " + Arrays.toString(commandArray));
}
try {
shExec.execute();
} catch (ExitCodeException e) {
int exitCode = shExec.getExitCode();
PrivilegedOperation checkSetupOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.CHECK_SETUP, (String) null);
PrivilegedOperationExecutor privilegedOperationExecutor =
PrivilegedOperationExecutor.getInstance(conf);
privilegedOperationExecutor.executePrivilegedOperation(checkSetupOp,
false);
} catch (PrivilegedOperationException e) {
int exitCode = e.getExitCode();
LOG.warn("Exit code from container executor initialization is : "
+ exitCode, e);
logOutput(shExec.getOutput());
throw new IOException("Linux container executor not configured properly"
+ " (error=" + exitCode + ")", e);
}
Configuration conf = super.getConf();
try {
resourceHandlerChain = ResourceHandlerModule
.getConfiguredResourceHandlerChain(conf);
@ -193,7 +189,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
} catch (ResourceHandlerException e) {
LOG.error("Failed to bootstrap configured resource subsystems! ", e);
throw new IOException("Failed to bootstrap configured resource subsystems!");
throw new IOException(
"Failed to bootstrap configured resource subsystems!");
}
try {
@ -204,7 +201,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
this.linuxContainerRuntime = runtime;
}
} catch (ContainerExecutionException e) {
throw new IOException("Failed to initialize linux container runtime(s)!");
throw new IOException("Failed to initialize linux container runtime(s)!");
}
resourcesHandler.init(this);
@ -221,50 +218,59 @@ public class LinuxContainerExecutor extends ContainerExecutor {
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
verifyUsernamePattern(user);
String runAsUser = getRunAsUser(user);
List<String> command = new ArrayList<String>();
addSchedPriorityCommand(command);
command.addAll(Arrays.asList(containerExecutorExe,
runAsUser,
user,
Integer.toString(PrivilegedOperation.RunAsUserCommand.INITIALIZE_CONTAINER.getValue()),
appId,
nmPrivateContainerTokensPath.toUri().getPath().toString(),
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
localDirs),
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
logDirs)));
PrivilegedOperation initializeContainerOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.INITIALIZE_CONTAINER, (String) null);
List<String> prefixCommands = new ArrayList<>();
addSchedPriorityCommand(prefixCommands);
initializeContainerOp.appendArgs(
runAsUser,
user,
Integer.toString(
PrivilegedOperation.RunAsUserCommand.INITIALIZE_CONTAINER
.getValue()),
appId,
nmPrivateContainerTokensPath.toUri().getPath().toString(),
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
localDirs),
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
logDirs));
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");
command.add(jvm.toString());
command.add("-classpath");
command.add(System.getProperty("java.class.path"));
new File(new File(System.getProperty("java.home"), "bin"), "java");
initializeContainerOp.appendArgs(jvm.toString());
initializeContainerOp.appendArgs("-classpath");
initializeContainerOp.appendArgs(System.getProperty("java.class.path"));
String javaLibPath = System.getProperty("java.library.path");
if (javaLibPath != null) {
command.add("-Djava.library.path=" + javaLibPath);
}
command.addAll(ContainerLocalizer.getJavaOpts(getConf()));
buildMainArgs(command, user, appId, locId, nmAddr, localDirs);
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
if (LOG.isDebugEnabled()) {
LOG.debug("initApplication: " + Arrays.toString(commandArray));
initializeContainerOp.appendArgs("-Djava.library.path=" + javaLibPath);
}
initializeContainerOp.appendArgs(ContainerLocalizer.getJavaOpts(getConf()));
List<String> localizerArgs = new ArrayList<>();
buildMainArgs(localizerArgs, user, appId, locId, nmAddr, localDirs);
initializeContainerOp.appendArgs(localizerArgs);
try {
shExec.execute();
if (LOG.isDebugEnabled()) {
logOutput(shExec.getOutput());
}
} catch (ExitCodeException e) {
int exitCode = shExec.getExitCode();
Configuration conf = super.getConf();
PrivilegedOperationExecutor privilegedOperationExecutor =
PrivilegedOperationExecutor.getInstance(conf);
privilegedOperationExecutor.executePrivilegedOperation(prefixCommands,
initializeContainerOp, null, null, false);
} catch (PrivilegedOperationException e) {
int exitCode = e.getExitCode();
LOG.warn("Exit code from container " + locId + " startLocalizer is : "
+ exitCode, e);
logOutput(shExec.getOutput());
throw new IOException("Application " + appId + " initialization failed" +
" (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
" (exitCode=" + exitCode + ") with output: " + e.getOutput(), e);
}
}
@ -292,7 +298,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
ContainerId containerId = container.getContainerId();
String containerIdStr = ConverterUtils.toString(containerId);
resourcesHandler.preExecute(containerId,
container.getResource());
String resourcesOptions = resourcesHandler.getResourcesOption(
@ -307,21 +313,21 @@ public class LinuxContainerExecutor extends ContainerExecutor {
if (ops != null) {
List<PrivilegedOperation> resourceOps = new ArrayList<>();
resourceOps.add(new PrivilegedOperation
(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
resourceOps.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
resourcesOptions));
for (PrivilegedOperation op : ops) {
switch (op.getOperationType()) {
case ADD_PID_TO_CGROUP:
resourceOps.add(op);
break;
case TC_MODIFY_STATE:
tcCommandFile = op.getArguments().get(0);
break;
default:
LOG.warn("PrivilegedOperation type unsupported in launch: "
+ op.getOperationType());
case ADD_PID_TO_CGROUP:
resourceOps.add(op);
break;
case TC_MODIFY_STATE:
tcCommandFile = op.getArguments().get(0);
break;
default:
LOG.warn("PrivilegedOperation type unsupported in launch: "
+ op.getOperationType());
}
}
@ -333,7 +339,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
resourcesOptions = operation.getArguments().get(0);
} catch (PrivilegedOperationException e) {
LOG.error("Failed to squash cgroup operations!", e);
throw new ResourceHandlerException("Failed to squash cgroup operations!");
throw new ResourceHandlerException(
"Failed to squash cgroup operations!");
}
}
}
@ -346,7 +353,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
try {
Path pidFilePath = getPidFilePath(containerId);
if (pidFilePath != null) {
List<String> prefixCommands= new ArrayList<>();
List<String> prefixCommands = new ArrayList<>();
ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext
.Builder(container);
@ -376,7 +383,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
linuxContainerRuntime.launchContainer(builder.build());
} else {
LOG.info("Container was marked as inactive. Returning terminated error");
LOG.info(
"Container was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
} catch (ContainerExecutionException e) {
@ -388,7 +396,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
LOG.warn("Exception from container-launch with container ID: "
+ containerId + " and exit code: " + exitCode , e);
+ containerId + " and exit code: " + exitCode, e);
StringBuilder builder = new StringBuilder();
builder.append("Exception from container-launch.\n");
@ -481,7 +489,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
linuxContainerRuntime.signalContainer(runtimeContext);
} catch (ContainerExecutionException e) {
int retCode = e.getExitCode();
if (retCode == PrivilegedOperation.ResultCode.INVALID_CONTAINER_PID.getValue()) {
if (retCode == PrivilegedOperation.ResultCode.INVALID_CONTAINER_PID
.getValue()) {
return false;
}
LOG.warn("Error in signalling container " + pid + " with " + signal
@ -501,17 +510,20 @@ public class LinuxContainerExecutor extends ContainerExecutor {
List<Path> baseDirs = ctx.getBasedirs();
verifyUsernamePattern(user);
String runAsUser = getRunAsUser(user);
String runAsUser = getRunAsUser(user);
String dirString = dir == null ? "" : dir.toUri().getPath();
List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe,
runAsUser,
user,
Integer.toString(PrivilegedOperation.
RunAsUserCommand.DELETE_AS_USER.getValue()),
dirString));
PrivilegedOperation deleteAsUserOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.DELETE_AS_USER, (String) null);
deleteAsUserOp.appendArgs(
runAsUser,
user,
Integer.toString(PrivilegedOperation.
RunAsUserCommand.DELETE_AS_USER.getValue()),
dirString);
List<String> pathsToDelete = new ArrayList<String>();
if (baseDirs == null || baseDirs.size() == 0) {
LOG.info("Deleting absolute path : " + dir);
@ -521,28 +533,24 @@ public class LinuxContainerExecutor extends ContainerExecutor {
Path del = dir == null ? baseDir : new Path(baseDir, dir);
LOG.info("Deleting path : " + del);
pathsToDelete.add(del.toString());
command.add(baseDir.toUri().getPath());
deleteAsUserOp.appendArgs(baseDir.toUri().getPath());
}
}
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
if (LOG.isDebugEnabled()) {
LOG.debug("deleteAsUser: " + Arrays.toString(commandArray));
}
try {
shExec.execute();
if (LOG.isDebugEnabled()) {
logOutput(shExec.getOutput());
}
} catch (IOException e) {
int exitCode = shExec.getExitCode();
Configuration conf = super.getConf();
PrivilegedOperationExecutor privilegedOperationExecutor =
PrivilegedOperationExecutor.getInstance(conf);
privilegedOperationExecutor.executePrivilegedOperation(deleteAsUserOp,
false);
} catch (PrivilegedOperationException e) {
int exitCode = e.getExitCode();
LOG.error("DeleteAsUser for " + StringUtils.join(" ", pathsToDelete)
+ " returned with exit code: " + exitCode, e);
LOG.error("Output from LinuxContainerExecutor's deleteAsUser follows:");
logOutput(shExec.getOutput());
}
}
@Override
public boolean isContainerAlive(ContainerLivenessContext ctx)
throws IOException {
@ -560,26 +568,25 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
public void mountCgroups(List<String> cgroupKVs, String hierarchy)
throws IOException {
List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe, "--mount-cgroups", hierarchy));
command.addAll(cgroupKVs);
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
if (LOG.isDebugEnabled()) {
LOG.debug("mountCgroups: " + Arrays.toString(commandArray));
}
throws IOException {
try {
shExec.execute();
} catch (IOException e) {
int ret_code = shExec.getExitCode();
LOG.warn("Exception in LinuxContainerExecutor mountCgroups ", e);
logOutput(shExec.getOutput());
throw new IOException("Problem mounting cgroups " + cgroupKVs +
"; exit code = " + ret_code + " and output: " + shExec.getOutput(), e);
PrivilegedOperation mountCGroupsOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.MOUNT_CGROUPS, hierarchy);
Configuration conf = super.getConf();
mountCGroupsOp.appendArgs(cgroupKVs);
PrivilegedOperationExecutor privilegedOperationExecutor =
PrivilegedOperationExecutor.getInstance(conf);
privilegedOperationExecutor.executePrivilegedOperation(mountCGroupsOp,
false);
} catch (PrivilegedOperationException e) {
int exitCode = e.getExitCode();
LOG.warn("Exception in LinuxContainerExecutor mountCgroups ", e);
throw new IOException("Problem mounting cgroups " + cgroupKVs +
"; exit code = " + exitCode + " and output: " + e.getOutput(),
e);
}
}
}
}