YARN-7034. DefaultLinuxContainerRuntime and DockerLinuxContainerRuntime sends client environment variables to container-executor. Contributed by Miklos Szegedi.

This commit is contained in:
Junping Du 2017-09-21 14:01:16 -07:00
parent a92ef030a2
commit e5e1851d80
5 changed files with 73 additions and 16 deletions

View File

@ -204,7 +204,7 @@ public class PrivilegedOperationExecutor {
public String executePrivilegedOperation(PrivilegedOperation operation,
boolean grabOutput) throws PrivilegedOperationException {
return executePrivilegedOperation(null, operation, null, null, grabOutput,
true);
false);
}
//Utility functions for squashing together operations in supported ways

View File

@ -80,7 +80,6 @@ public class DefaultLinuxContainerRuntime implements LinuxContainerRuntime {
@Override
public void launchContainer(ContainerRuntimeContext ctx)
throws ContainerExecutionException {
Container container = ctx.getContainer();
PrivilegedOperation launchOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.LAUNCH_CONTAINER);
@ -116,8 +115,7 @@ public class DefaultLinuxContainerRuntime implements LinuxContainerRuntime {
try {
privilegedOperationExecutor.executePrivilegedOperation(prefixCommands,
launchOp, null, container.getLaunchContext().getEnvironment(),
false, false);
launchOp, null, null, false, false);
} catch (PrivilegedOperationException e) {
LOG.warn("Launch container failed. Exception: ", e);
@ -129,7 +127,6 @@ public class DefaultLinuxContainerRuntime implements LinuxContainerRuntime {
@Override
public void signalContainer(ContainerRuntimeContext ctx)
throws ContainerExecutionException {
Container container = ctx.getContainer();
PrivilegedOperation signalOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
@ -148,8 +145,7 @@ public class DefaultLinuxContainerRuntime implements LinuxContainerRuntime {
.getInstance(conf);
executor.executePrivilegedOperation(null,
signalOp, null, container.getLaunchContext().getEnvironment(),
false, true);
signalOp, null, null, false, false);
} catch (PrivilegedOperationException e) {
//Don't log the failure here. Some kinds of signaling failures are
// acceptable. Let the calling executor decide what to do.

View File

@ -549,8 +549,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
try {
privilegedOperationExecutor.executePrivilegedOperation(null,
launchOp, null, container.getLaunchContext().getEnvironment(),
false, false);
launchOp, null, null, false, false);
} catch (PrivilegedOperationException e) {
LOG.warn("Launch container failed. Exception: ", e);
LOG.info("Docker command used: " + runCommand.getCommandWithArguments());
@ -563,7 +562,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
@Override
public void signalContainer(ContainerRuntimeContext ctx)
throws ContainerExecutionException {
Container container = ctx.getContainer();
ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
PrivilegedOperation privOp = null;
@ -594,8 +592,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
try {
privilegedOperationExecutor.executePrivilegedOperation(null,
privOp, null, container.getLaunchContext().getEnvironment(),
false, false);
privOp, null, null, false, false);
} catch (PrivilegedOperationException e) {
throw new ContainerExecutionException("Signal container failed", e
.getExitCode(), e.getOutput(), e.getErrorOutput());
@ -623,7 +620,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
privOp.appendArgs(commandFile);
String output = privilegedOperationExecutor
.executePrivilegedOperation(null, privOp, null,
container.getLaunchContext().getEnvironment(), true, false);
null, true, false);
LOG.info("Docker inspect output for " + containerId + ": " + output);
int index = output.lastIndexOf(',');
if (index == -1) {

View File

@ -72,10 +72,16 @@ import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class TestLinuxContainerExecutorWithMocks {
@ -91,6 +97,8 @@ public class TestLinuxContainerExecutorWithMocks {
private String tmpMockExecutor;
private LinuxContainerExecutor mockExec = null;
private LinuxContainerExecutor mockExecMockRuntime = null;
private PrivilegedOperationExecutor mockPrivilegedExec;
private final File mockParamFile = new File("./params.txt");
private LocalDirsHandlerService dirsHandler;
@ -140,22 +148,28 @@ public class TestLinuxContainerExecutorWithMocks {
Configuration conf = new YarnConfiguration();
LinuxContainerRuntime linuxContainerRuntime;
LinuxContainerRuntime mockLinuxContainerRuntime;
setupMockExecutor(MOCK_EXECUTOR, conf);
linuxContainerRuntime = new DefaultLinuxContainerRuntime(
PrivilegedOperationExecutor.getInstance(conf));
mockPrivilegedExec = Mockito.mock(PrivilegedOperationExecutor.class);
mockLinuxContainerRuntime = new DefaultLinuxContainerRuntime(
mockPrivilegedExec);
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
linuxContainerRuntime.initialize(conf);
mockExec = new LinuxContainerExecutor(linuxContainerRuntime);
mockExec.setConf(conf);
mockExecMockRuntime = new LinuxContainerExecutor(mockLinuxContainerRuntime);
mockExecMockRuntime.setConf(conf);
}
@After
public void tearDown() {
deleteMockParamFile();
}
@Test
public void testContainerLaunch()
throws IOException, ConfigurationException {
@ -168,7 +182,7 @@ public class TestLinuxContainerExecutorWithMocks {
ContainerId cId = mock(ContainerId.class);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
@ -605,4 +619,53 @@ public class TestLinuxContainerExecutorWithMocks {
e.getMessage().contains("exit code"));
}
}
@Test
public void testContainerLaunchEnvironment()
throws IOException, ConfigurationException,
PrivilegedOperationException {
String appSubmitter = "nobody";
String appId = "APP_ID";
String containerId = "CONTAINER_ID";
Container container = mock(Container.class);
ContainerId cId = mock(ContainerId.class);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String, String>();
env.put("FROM_CLIENT", "1");
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.toString()).thenReturn(containerId);
when(context.getEnvironment()).thenReturn(env);
Path scriptPath = new Path("file:///bin/echo");
Path tokensPath = new Path("file:///dev/null");
Path workDir = new Path("/tmp");
Path pidFile = new Path(workDir, "pid.txt");
mockExecMockRuntime.activateContainer(cId, pidFile);
mockExecMockRuntime.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.setFilecacheDirs(new ArrayList<>())
.setUserLocalDirs(new ArrayList<>())
.setContainerLocalDirs(new ArrayList<>())
.setContainerLogDirs(new ArrayList<>())
.build());
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass(
PrivilegedOperation.class);
// Verify that
verify(mockPrivilegedExec, times(1))
.executePrivilegedOperation(anyListOf(
String.class), opCaptor.capture(), any(
File.class), eq(null), eq(false), eq(false));
}
}

View File

@ -116,6 +116,7 @@ public class TestDockerContainerRuntime {
cId = mock(ContainerId.class);
context = mock(ContainerLaunchContext.class);
env = new HashMap<String, String>();
env.put("FROM_CLIENT", "1");
image = "busybox:latest";
env.put(DockerLinuxContainerRuntime.ENV_DOCKER_CONTAINER_IMAGE, image);
@ -204,7 +205,7 @@ public class TestDockerContainerRuntime {
// warning annotation on the entire method
verify(mockExecutor, times(1))
.executePrivilegedOperation(anyList(), opCaptor.capture(), any(
File.class), any(Map.class), eq(false), eq(false));
File.class), eq(null), eq(false), eq(false));
//verification completed. we need to isolate specific invications.
// hence, reset mock here