YARN-7034. DefaultLinuxContainerRuntime and DockerLinuxContainerRuntime sends client environment variables to container-executor. Contributed by Miklos Szegedi.
This commit is contained in:
parent
c3c8b0ffab
commit
e67c8347c4
@ -80,7 +80,6 @@ public void prepareContainer(ContainerRuntimeContext ctx)
|
|||||||
@Override
|
@Override
|
||||||
public void launchContainer(ContainerRuntimeContext ctx)
|
public void launchContainer(ContainerRuntimeContext ctx)
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
Container container = ctx.getContainer();
|
|
||||||
PrivilegedOperation launchOp = new PrivilegedOperation(
|
PrivilegedOperation launchOp = new PrivilegedOperation(
|
||||||
PrivilegedOperation.OperationType.LAUNCH_CONTAINER);
|
PrivilegedOperation.OperationType.LAUNCH_CONTAINER);
|
||||||
|
|
||||||
@ -116,8 +115,7 @@ public void launchContainer(ContainerRuntimeContext ctx)
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
privilegedOperationExecutor.executePrivilegedOperation(prefixCommands,
|
privilegedOperationExecutor.executePrivilegedOperation(prefixCommands,
|
||||||
launchOp, null, container.getLaunchContext().getEnvironment(),
|
launchOp, null, null, false, false);
|
||||||
false, false);
|
|
||||||
} catch (PrivilegedOperationException e) {
|
} catch (PrivilegedOperationException e) {
|
||||||
LOG.warn("Launch container failed. Exception: ", e);
|
LOG.warn("Launch container failed. Exception: ", e);
|
||||||
|
|
||||||
@ -129,7 +127,6 @@ public void launchContainer(ContainerRuntimeContext ctx)
|
|||||||
@Override
|
@Override
|
||||||
public void signalContainer(ContainerRuntimeContext ctx)
|
public void signalContainer(ContainerRuntimeContext ctx)
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
Container container = ctx.getContainer();
|
|
||||||
PrivilegedOperation signalOp = new PrivilegedOperation(
|
PrivilegedOperation signalOp = new PrivilegedOperation(
|
||||||
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
|
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
|
||||||
|
|
||||||
@ -148,8 +145,7 @@ public void signalContainer(ContainerRuntimeContext ctx)
|
|||||||
.getInstance(conf);
|
.getInstance(conf);
|
||||||
|
|
||||||
executor.executePrivilegedOperation(null,
|
executor.executePrivilegedOperation(null,
|
||||||
signalOp, null, container.getLaunchContext().getEnvironment(),
|
signalOp, null, null, false, false);
|
||||||
false, true);
|
|
||||||
} catch (PrivilegedOperationException e) {
|
} catch (PrivilegedOperationException e) {
|
||||||
//Don't log the failure here. Some kinds of signaling failures are
|
//Don't log the failure here. Some kinds of signaling failures are
|
||||||
// acceptable. Let the calling executor decide what to do.
|
// acceptable. Let the calling executor decide what to do.
|
||||||
|
@ -581,8 +581,7 @@ public void launchContainer(ContainerRuntimeContext ctx)
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
privilegedOperationExecutor.executePrivilegedOperation(null,
|
privilegedOperationExecutor.executePrivilegedOperation(null,
|
||||||
launchOp, null, container.getLaunchContext().getEnvironment(),
|
launchOp, null, null, false, false);
|
||||||
false, false);
|
|
||||||
} catch (PrivilegedOperationException e) {
|
} catch (PrivilegedOperationException e) {
|
||||||
LOG.warn("Launch container failed. Exception: ", e);
|
LOG.warn("Launch container failed. Exception: ", e);
|
||||||
LOG.info("Docker command used: " + runCommand.getCommandWithArguments());
|
LOG.info("Docker command used: " + runCommand.getCommandWithArguments());
|
||||||
@ -595,7 +594,6 @@ public void launchContainer(ContainerRuntimeContext ctx)
|
|||||||
@Override
|
@Override
|
||||||
public void signalContainer(ContainerRuntimeContext ctx)
|
public void signalContainer(ContainerRuntimeContext ctx)
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
Container container = ctx.getContainer();
|
|
||||||
ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
|
ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
|
||||||
|
|
||||||
PrivilegedOperation privOp = null;
|
PrivilegedOperation privOp = null;
|
||||||
@ -626,8 +624,7 @@ public void signalContainer(ContainerRuntimeContext ctx)
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
privilegedOperationExecutor.executePrivilegedOperation(null,
|
privilegedOperationExecutor.executePrivilegedOperation(null,
|
||||||
privOp, null, container.getLaunchContext().getEnvironment(),
|
privOp, null, null, false, false);
|
||||||
false, false);
|
|
||||||
} catch (PrivilegedOperationException e) {
|
} catch (PrivilegedOperationException e) {
|
||||||
throw new ContainerExecutionException("Signal container failed", e
|
throw new ContainerExecutionException("Signal container failed", e
|
||||||
.getExitCode(), e.getOutput(), e.getErrorOutput());
|
.getExitCode(), e.getOutput(), e.getErrorOutput());
|
||||||
@ -655,7 +652,7 @@ public String[] getIpAndHost(Container container) {
|
|||||||
privOp.appendArgs(commandFile);
|
privOp.appendArgs(commandFile);
|
||||||
String output = privilegedOperationExecutor
|
String output = privilegedOperationExecutor
|
||||||
.executePrivilegedOperation(null, privOp, null,
|
.executePrivilegedOperation(null, privOp, null,
|
||||||
container.getLaunchContext().getEnvironment(), true, false);
|
null, true, false);
|
||||||
LOG.info("Docker inspect output for " + containerId + ": " + output);
|
LOG.info("Docker inspect output for " + containerId + ": " + output);
|
||||||
int index = output.lastIndexOf(',');
|
int index = output.lastIndexOf(',');
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
|
@ -72,10 +72,16 @@
|
|||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
||||||
|
import static org.mockito.Matchers.anyListOf;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
public class TestLinuxContainerExecutorWithMocks {
|
public class TestLinuxContainerExecutorWithMocks {
|
||||||
|
|
||||||
@ -91,6 +97,8 @@ public class TestLinuxContainerExecutorWithMocks {
|
|||||||
|
|
||||||
private String tmpMockExecutor;
|
private String tmpMockExecutor;
|
||||||
private LinuxContainerExecutor mockExec = null;
|
private LinuxContainerExecutor mockExec = null;
|
||||||
|
private LinuxContainerExecutor mockExecMockRuntime = null;
|
||||||
|
private PrivilegedOperationExecutor mockPrivilegedExec;
|
||||||
private final File mockParamFile = new File("./params.txt");
|
private final File mockParamFile = new File("./params.txt");
|
||||||
private LocalDirsHandlerService dirsHandler;
|
private LocalDirsHandlerService dirsHandler;
|
||||||
|
|
||||||
@ -140,15 +148,21 @@ public void setup() throws IOException, ContainerExecutionException {
|
|||||||
|
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
LinuxContainerRuntime linuxContainerRuntime;
|
LinuxContainerRuntime linuxContainerRuntime;
|
||||||
|
LinuxContainerRuntime mockLinuxContainerRuntime;
|
||||||
|
|
||||||
setupMockExecutor(MOCK_EXECUTOR, conf);
|
setupMockExecutor(MOCK_EXECUTOR, conf);
|
||||||
linuxContainerRuntime = new DefaultLinuxContainerRuntime(
|
linuxContainerRuntime = new DefaultLinuxContainerRuntime(
|
||||||
PrivilegedOperationExecutor.getInstance(conf));
|
PrivilegedOperationExecutor.getInstance(conf));
|
||||||
|
mockPrivilegedExec = Mockito.mock(PrivilegedOperationExecutor.class);
|
||||||
|
mockLinuxContainerRuntime = new DefaultLinuxContainerRuntime(
|
||||||
|
mockPrivilegedExec);
|
||||||
dirsHandler = new LocalDirsHandlerService();
|
dirsHandler = new LocalDirsHandlerService();
|
||||||
dirsHandler.init(conf);
|
dirsHandler.init(conf);
|
||||||
linuxContainerRuntime.initialize(conf);
|
linuxContainerRuntime.initialize(conf);
|
||||||
mockExec = new LinuxContainerExecutor(linuxContainerRuntime);
|
mockExec = new LinuxContainerExecutor(linuxContainerRuntime);
|
||||||
mockExec.setConf(conf);
|
mockExec.setConf(conf);
|
||||||
|
mockExecMockRuntime = new LinuxContainerExecutor(mockLinuxContainerRuntime);
|
||||||
|
mockExecMockRuntime.setConf(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
@ -605,4 +619,54 @@ protected PrivilegedOperationExecutor getPrivilegedOperationExecutor() {
|
|||||||
e.getMessage().contains("exit code"));
|
e.getMessage().contains("exit code"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerLaunchEnvironment()
|
||||||
|
throws IOException, ConfigurationException,
|
||||||
|
PrivilegedOperationException {
|
||||||
|
String appSubmitter = "nobody";
|
||||||
|
String appId = "APP_ID";
|
||||||
|
String containerId = "CONTAINER_ID";
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
ContainerId cId = mock(ContainerId.class);
|
||||||
|
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||||
|
HashMap<String, String> env = new HashMap<String,String>();
|
||||||
|
env.put("FROM_CLIENT", "1");
|
||||||
|
|
||||||
|
when(container.getContainerId()).thenReturn(cId);
|
||||||
|
when(container.getLaunchContext()).thenReturn(context);
|
||||||
|
|
||||||
|
when(cId.toString()).thenReturn(containerId);
|
||||||
|
|
||||||
|
when(context.getEnvironment()).thenReturn(env);
|
||||||
|
|
||||||
|
Path scriptPath = new Path("file:///bin/echo");
|
||||||
|
Path tokensPath = new Path("file:///dev/null");
|
||||||
|
Path workDir = new Path("/tmp");
|
||||||
|
Path pidFile = new Path(workDir, "pid.txt");
|
||||||
|
|
||||||
|
mockExecMockRuntime.activateContainer(cId, pidFile);
|
||||||
|
mockExecMockRuntime.launchContainer(new ContainerStartContext.Builder()
|
||||||
|
.setContainer(container)
|
||||||
|
.setNmPrivateContainerScriptPath(scriptPath)
|
||||||
|
.setNmPrivateTokensPath(tokensPath)
|
||||||
|
.setUser(appSubmitter)
|
||||||
|
.setAppId(appId)
|
||||||
|
.setContainerWorkDir(workDir)
|
||||||
|
.setLocalDirs(dirsHandler.getLocalDirs())
|
||||||
|
.setLogDirs(dirsHandler.getLogDirs())
|
||||||
|
.setFilecacheDirs(new ArrayList<String>())
|
||||||
|
.setUserLocalDirs(new ArrayList<String>())
|
||||||
|
.setContainerLocalDirs(new ArrayList<String>())
|
||||||
|
.setContainerLogDirs(new ArrayList<String>())
|
||||||
|
.build());
|
||||||
|
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass(
|
||||||
|
PrivilegedOperation.class);
|
||||||
|
// Verify that
|
||||||
|
verify(mockPrivilegedExec, times(1))
|
||||||
|
.executePrivilegedOperation(anyListOf(
|
||||||
|
String.class), opCaptor.capture(), any(
|
||||||
|
File.class), eq((Map<String, String>)null),
|
||||||
|
eq(false), eq(false));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -116,6 +116,7 @@ public void setup() {
|
|||||||
cId = mock(ContainerId.class);
|
cId = mock(ContainerId.class);
|
||||||
context = mock(ContainerLaunchContext.class);
|
context = mock(ContainerLaunchContext.class);
|
||||||
env = new HashMap<String, String>();
|
env = new HashMap<String, String>();
|
||||||
|
env.put("FROM_CLIENT", "1");
|
||||||
image = "busybox:latest";
|
image = "busybox:latest";
|
||||||
|
|
||||||
env.put(DockerLinuxContainerRuntime.ENV_DOCKER_CONTAINER_IMAGE, image);
|
env.put(DockerLinuxContainerRuntime.ENV_DOCKER_CONTAINER_IMAGE, image);
|
||||||
@ -204,7 +205,8 @@ private PrivilegedOperation capturePrivilegedOperation()
|
|||||||
// warning annotation on the entire method
|
// warning annotation on the entire method
|
||||||
verify(mockExecutor, times(1))
|
verify(mockExecutor, times(1))
|
||||||
.executePrivilegedOperation(anyList(), opCaptor.capture(), any(
|
.executePrivilegedOperation(anyList(), opCaptor.capture(), any(
|
||||||
File.class), any(Map.class), eq(false), eq(false));
|
File.class), eq((Map<String, String>)null),
|
||||||
|
eq(false), eq(false));
|
||||||
|
|
||||||
//verification completed. we need to isolate specific invications.
|
//verification completed. we need to isolate specific invications.
|
||||||
// hence, reset mock here
|
// hence, reset mock here
|
||||||
|
Loading…
x
Reference in New Issue
Block a user