YARN-8263. DockerClient still touches hadoop.tmp.dir. Contributed by Craig Condit

This commit is contained in:
Jason Lowe 2018-08-02 10:43:48 -05:00
parent 5033d7da8f
commit 7526815e32
9 changed files with 30 additions and 95 deletions

View File

@ -944,12 +944,12 @@ public class LinuxContainerExecutor extends ContainerExecutor {
PrivilegedOperationExecutor privOpExecutor =
PrivilegedOperationExecutor.getInstance(super.getConf());
if (DockerCommandExecutor.isRemovable(
DockerCommandExecutor.getContainerStatus(containerId,
super.getConf(), privOpExecutor, nmContext))) {
DockerCommandExecutor.getContainerStatus(containerId, privOpExecutor,
nmContext))) {
LOG.info("Removing Docker container : " + containerId);
DockerRmCommand dockerRmCommand = new DockerRmCommand(containerId);
DockerCommandExecutor.executeDockerCommand(dockerRmCommand, containerId,
null, super.getConf(), privOpExecutor, false, nmContext);
null, privOpExecutor, false, nmContext);
}
} catch (ContainerExecutionException e) {
LOG.warn("Unable to remove docker container: " + containerId);

View File

@ -298,7 +298,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
throws ContainerExecutionException {
this.nmContext = nmContext;
this.conf = conf;
dockerClient = new DockerClient(conf);
dockerClient = new DockerClient();
allowedNetworks.clear();
defaultROMounts.clear();
defaultRWMounts.clear();
@ -973,7 +973,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
String containerIdStr = containerId.toString();
// Check to see if the container already exists for relaunch
DockerCommandExecutor.DockerContainerStatus containerStatus =
DockerCommandExecutor.getContainerStatus(containerIdStr, conf,
DockerCommandExecutor.getContainerStatus(containerIdStr,
privilegedOperationExecutor, nmContext);
if (containerStatus != null &&
DockerCommandExecutor.isStartable(containerStatus)) {
@ -1219,13 +1219,13 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
private void handleContainerStop(String containerId, Map<String, String> env)
throws ContainerExecutionException {
DockerCommandExecutor.DockerContainerStatus containerStatus =
DockerCommandExecutor.getContainerStatus(containerId, conf,
DockerCommandExecutor.getContainerStatus(containerId,
privilegedOperationExecutor, nmContext);
if (DockerCommandExecutor.isStoppable(containerStatus)) {
DockerStopCommand dockerStopCommand = new DockerStopCommand(
containerId).setGracePeriod(dockerStopGracePeriod);
DockerCommandExecutor.executeDockerCommand(dockerStopCommand, containerId,
env, conf, privilegedOperationExecutor, false, nmContext);
env, privilegedOperationExecutor, false, nmContext);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
@ -1247,14 +1247,13 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
if (isContainerRequestedAsPrivileged(container)) {
String containerId = container.getContainerId().toString();
DockerCommandExecutor.DockerContainerStatus containerStatus =
DockerCommandExecutor.getContainerStatus(containerId, conf,
DockerCommandExecutor.getContainerStatus(containerId,
privilegedOperationExecutor, nmContext);
if (DockerCommandExecutor.isKillable(containerStatus)) {
DockerKillCommand dockerKillCommand =
new DockerKillCommand(containerId).setSignal(signal.name());
DockerCommandExecutor.executeDockerCommand(dockerKillCommand,
containerId, env, conf, privilegedOperationExecutor, false,
nmContext);
containerId, env, privilegedOperationExecutor, false, nmContext);
} else {
LOG.debug(
"Container status is {}, skipping kill - {}",
@ -1292,12 +1291,12 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
+ containerId);
} else {
DockerCommandExecutor.DockerContainerStatus containerStatus =
DockerCommandExecutor.getContainerStatus(containerId, conf,
DockerCommandExecutor.getContainerStatus(containerId,
privilegedOperationExecutor, nmContext);
if (DockerCommandExecutor.isRemovable(containerStatus)) {
DockerRmCommand dockerRmCommand = new DockerRmCommand(containerId);
DockerCommandExecutor.executeDockerCommand(dockerRmCommand, containerId,
env, conf, privilegedOperationExecutor, false, nmContext);
env, privilegedOperationExecutor, false, nmContext);
}
}
}

View File

@ -22,7 +22,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -50,58 +49,6 @@ public final class DockerClient {
private static final String TMP_FILE_PREFIX = "docker.";
private static final String TMP_FILE_SUFFIX = ".cmd";
private static final String TMP_ENV_FILE_SUFFIX = ".env";
private final String tmpDirPath;
public DockerClient(Configuration conf) throws ContainerExecutionException {
String tmpDirBase = conf.get("hadoop.tmp.dir");
if (tmpDirBase == null) {
throw new ContainerExecutionException("hadoop.tmp.dir not set!");
}
tmpDirPath = tmpDirBase + "/nm-docker-cmds";
File tmpDir = new File(tmpDirPath);
if (!(tmpDir.exists() || tmpDir.mkdirs())) {
LOG.warn("Unable to create directory: " + tmpDirPath);
throw new ContainerExecutionException("Unable to create directory: " +
tmpDirPath);
}
}
public String writeCommandToTempFile(DockerCommand cmd, String filePrefix)
throws ContainerExecutionException {
try {
File dockerCommandFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix,
TMP_FILE_SUFFIX, new
File(tmpDirPath));
try (
Writer writer = new OutputStreamWriter(
new FileOutputStream(dockerCommandFile), "UTF-8");
PrintWriter printWriter = new PrintWriter(writer);
) {
printWriter.println("[docker-command-execution]");
for (Map.Entry<String, List<String>> entry :
cmd.getDockerCommandWithArguments().entrySet()) {
if (entry.getKey().contains("=")) {
throw new ContainerExecutionException(
"'=' found in entry for docker command file, key = " + entry
.getKey() + "; value = " + entry.getValue());
}
if (entry.getValue().contains("\n")) {
throw new ContainerExecutionException(
"'\\n' found in entry for docker command file, key = " + entry
.getKey() + "; value = " + entry.getValue());
}
printWriter.println(" " + entry.getKey() + "=" + StringUtils
.join(",", entry.getValue()));
}
return dockerCommandFile.getAbsolutePath();
}
} catch (IOException e) {
LOG.warn("Unable to write docker command to temporary file!");
throw new ContainerExecutionException(e);
}
}
private String writeEnvFile(DockerRunCommand cmd, String filePrefix,
File cmdDir) throws IOException {

View File

@ -22,7 +22,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -117,16 +116,15 @@ public abstract class DockerCommand {
* @param dockerCommand Specific command to be run by docker.
* @param containerName
* @param env
* @param conf
* @param nmContext
* @return Returns the PrivilegedOperation object to be used.
* @throws ContainerExecutionException
*/
public PrivilegedOperation preparePrivilegedOperation(
DockerCommand dockerCommand, String containerName, Map<String,
String> env, Configuration conf, Context nmContext)
String> env, Context nmContext)
throws ContainerExecutionException {
DockerClient dockerClient = new DockerClient(conf);
DockerClient dockerClient = new DockerClient();
String commandFile =
dockerClient.writeCommandToTempFile(dockerCommand,
ContainerId.fromString(containerName),

View File

@ -16,7 +16,6 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
@ -68,19 +67,18 @@ public final class DockerCommandExecutor {
* @param dockerCommand the docker command to run.
* @param containerId the id of the container.
* @param env environment for the container.
* @param conf the hadoop configuration.
* @param privilegedOperationExecutor the privileged operations executor.
* @param disableFailureLogging disable logging for known rc failures.
* @return the output of the operation.
* @throws ContainerExecutionException if the operation fails.
*/
public static String executeDockerCommand(DockerCommand dockerCommand,
String containerId, Map<String, String> env, Configuration conf,
String containerId, Map<String, String> env,
PrivilegedOperationExecutor privilegedOperationExecutor,
boolean disableFailureLogging, Context nmContext)
throws ContainerExecutionException {
PrivilegedOperation dockerOp = dockerCommand.preparePrivilegedOperation(
dockerCommand, containerId, env, conf, nmContext);
dockerCommand, containerId, env, nmContext);
if (disableFailureLogging) {
dockerOp.disableFailureLogging();
@ -108,18 +106,16 @@ public final class DockerCommandExecutor {
* an exception and the nonexistent status is returned.
*
* @param containerId the id of the container.
* @param conf the hadoop configuration.
* @param privilegedOperationExecutor the privileged operations executor.
* @return a {@link DockerContainerStatus} representing the current status.
*/
public static DockerContainerStatus getContainerStatus(String containerId,
Configuration conf,
PrivilegedOperationExecutor privilegedOperationExecutor,
Context nmContext) {
try {
DockerContainerStatus dockerContainerStatus;
String currentContainerStatus =
executeStatusCommand(containerId, conf,
executeStatusCommand(containerId,
privilegedOperationExecutor, nmContext);
if (currentContainerStatus == null) {
dockerContainerStatus = DockerContainerStatus.UNKNOWN;
@ -170,13 +166,11 @@ public final class DockerCommandExecutor {
* status.
*
* @param containerId the id of the container.
* @param conf the hadoop configuration.
* @param privilegedOperationExecutor the privileged operations executor.
* @return the current container status.
* @throws ContainerExecutionException if the docker operation fails to run.
*/
private static String executeStatusCommand(String containerId,
Configuration conf,
PrivilegedOperationExecutor privilegedOperationExecutor,
Context nmContext)
throws ContainerExecutionException {
@ -184,8 +178,7 @@ public final class DockerCommandExecutor {
new DockerInspectCommand(containerId).getContainerStatus();
try {
return DockerCommandExecutor.executeDockerCommand(dockerInspectCommand,
containerId, null, conf, privilegedOperationExecutor, true,
nmContext);
containerId, null, privilegedOperationExecutor, true, nmContext);
} catch (ContainerExecutionException e) {
throw new ContainerExecutionException(e);
}

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
@ -58,7 +57,7 @@ public class DockerInspectCommand extends DockerCommand {
@Override
public PrivilegedOperation preparePrivilegedOperation(
DockerCommand dockerCommand, String containerName, Map<String,
String> env, Configuration conf, Context nmContext) {
String> env, Context nmContext) {
PrivilegedOperation dockerOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.INSPECT_DOCKER_CONTAINER);
dockerOp.appendArgs(commandArguments, containerName);

View File

@ -16,7 +16,6 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
@ -37,7 +36,7 @@ public class DockerRmCommand extends DockerCommand {
@Override
public PrivilegedOperation preparePrivilegedOperation(
DockerCommand dockerCommand, String containerName, Map<String,
String> env, Configuration conf, Context nmContext) {
String> env, Context nmContext) {
PrivilegedOperation dockerOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.REMOVE_DOCKER_CONTAINER);
dockerOp.appendArgs(containerName);

View File

@ -68,7 +68,7 @@ public class TestDockerClient {
doReturn(conf).when(mockContext).getConf();
doReturn(dirsHandler).when(mockContext).getLocalDirsHandler();
DockerClient dockerClient = new DockerClient(conf);
DockerClient dockerClient = new DockerClient();
dirsHandler.init(conf);
dirsHandler.start();
String tmpPath = dockerClient.writeCommandToTempFile(dockerCmd, cid,

View File

@ -138,7 +138,7 @@ public class TestDockerCommandExecutor {
DockerStopCommand dockerStopCommand =
new DockerStopCommand(MOCK_CONTAINER_ID);
DockerCommandExecutor.executeDockerCommand(dockerStopCommand,
cId.toString(), env, configuration, mockExecutor, false, nmContext);
cId.toString(), env, mockExecutor, false, nmContext);
List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
.capturePrivilegedOperations(mockExecutor, 1, true);
assertEquals(1, ops.size());
@ -150,7 +150,7 @@ public class TestDockerCommandExecutor {
public void testExecuteDockerRm() throws Exception {
DockerRmCommand dockerCommand = new DockerRmCommand(MOCK_CONTAINER_ID);
DockerCommandExecutor.executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID,
env, configuration, mockExecutor, false, nmContext);
env, mockExecutor, false, nmContext);
List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
.capturePrivilegedOperations(mockExecutor, 1, true);
PrivilegedOperation privOp = ops.get(0);
@ -167,7 +167,7 @@ public class TestDockerCommandExecutor {
public void testExecuteDockerStop() throws Exception {
DockerStopCommand dockerCommand = new DockerStopCommand(MOCK_CONTAINER_ID);
DockerCommandExecutor.executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID,
env, configuration, mockExecutor, false, nmContext);
env, mockExecutor, false, nmContext);
List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
.capturePrivilegedOperations(mockExecutor, 1, true);
List<String> dockerCommands = getValidatedDockerCommands(ops);
@ -185,7 +185,7 @@ public class TestDockerCommandExecutor {
DockerInspectCommand dockerCommand =
new DockerInspectCommand(MOCK_CONTAINER_ID).getContainerStatus();
DockerCommandExecutor.executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID,
env, configuration, mockExecutor, false, nmContext);
env, mockExecutor, false, nmContext);
List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
.capturePrivilegedOperations(mockExecutor, 1, true);
PrivilegedOperation privOp = ops.get(0);
@ -204,7 +204,7 @@ public class TestDockerCommandExecutor {
DockerPullCommand dockerCommand =
new DockerPullCommand(MOCK_IMAGE_NAME);
DockerCommandExecutor.executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID,
env, configuration, mockExecutor, false, nmContext);
env, mockExecutor, false, nmContext);
List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
.capturePrivilegedOperations(mockExecutor, 1, true);
List<String> dockerCommands = getValidatedDockerCommands(ops);
@ -222,7 +222,7 @@ public class TestDockerCommandExecutor {
DockerLoadCommand dockerCommand =
new DockerLoadCommand(MOCK_LOCAL_IMAGE_NAME);
DockerCommandExecutor.executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID,
env, configuration, mockExecutor, false, nmContext);
env, mockExecutor, false, nmContext);
List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
.capturePrivilegedOperations(mockExecutor, 1, true);
List<String> dockerCommands = getValidatedDockerCommands(ops);
@ -244,7 +244,7 @@ public class TestDockerCommandExecutor {
any(PrivilegedOperation.class), eq(null), any(), eq(true), eq(false)))
.thenReturn(status.getName());
assertEquals(status, DockerCommandExecutor.getContainerStatus(
MOCK_CONTAINER_ID, configuration, mockExecutor, nmContext));
MOCK_CONTAINER_ID, mockExecutor, nmContext));
}
}
@ -254,7 +254,7 @@ public class TestDockerCommandExecutor {
new DockerKillCommand(MOCK_CONTAINER_ID)
.setSignal(ContainerExecutor.Signal.QUIT.name());
DockerCommandExecutor.executeDockerCommand(dockerKillCommand,
MOCK_CONTAINER_ID, env, configuration, mockExecutor, false, nmContext);
MOCK_CONTAINER_ID, env, mockExecutor, false, nmContext);
List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
.capturePrivilegedOperations(mockExecutor, 1, true);
List<String> dockerCommands = getValidatedDockerCommands(ops);
@ -275,7 +275,7 @@ public class TestDockerCommandExecutor {
new DockerKillCommand(MOCK_CONTAINER_ID)
.setSignal(ContainerExecutor.Signal.KILL.name());
DockerCommandExecutor.executeDockerCommand(dockerKillCommand,
MOCK_CONTAINER_ID, env, configuration, mockExecutor, false, nmContext);
MOCK_CONTAINER_ID, env, mockExecutor, false, nmContext);
List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
.capturePrivilegedOperations(mockExecutor, 1, true);
List<String> dockerCommands = getValidatedDockerCommands(ops);
@ -296,7 +296,7 @@ public class TestDockerCommandExecutor {
new DockerKillCommand(MOCK_CONTAINER_ID)
.setSignal(ContainerExecutor.Signal.TERM.name());
DockerCommandExecutor.executeDockerCommand(dockerKillCommand,
MOCK_CONTAINER_ID, env, configuration, mockExecutor, false, nmContext);
MOCK_CONTAINER_ID, env, mockExecutor, false, nmContext);
List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
.capturePrivilegedOperations(mockExecutor, 1, true);
List<String> dockerCommands = getValidatedDockerCommands(ops);