YARN-5430. Return container's ip and host from NM ContainerStatus call. Contributed by Jian He.

This commit is contained in:
Varun Vasudev 2016-08-26 16:30:18 +05:30
parent 27c3b86252
commit bfb4d95059
18 changed files with 304 additions and 17 deletions

View File

@ -25,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
/**
* {@code ContainerStatus} represents the current status of a
* {@code Container}.
@ -151,4 +153,28 @@ public abstract class ContainerStatus {
@Private
@Unstable
public abstract void setCapability(Resource capability);
/**
* Get all the IP addresses with which the container run.
* @return The IP address where the container runs.
*/
@Public
@Unstable
public abstract List<String> getIPs();
@Private
@Unstable
public abstract void setIPs(List<String> ips);
/**
* Get the hostname where the container runs.
* @return The hostname where the container runs.
*/
@Public
@Unstable
public abstract String getHost();
@Private
@Unstable
public abstract void setHost(String host);
}

View File

@ -524,6 +524,7 @@ message ContainerStatusProto {
optional int32 exit_status = 4 [default = -1000];
optional ResourceProto capability = 5;
optional ExecutionTypeProto executionType = 6 [default = GUARANTEED];
repeated StringStringMapProto container_attributes = 7;
}
enum ContainerExitStatusProto {

View File

@ -21,11 +21,13 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
@ -33,7 +35,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProtoOrBuilder;
import com.google.protobuf.TextFormat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@Private
@Unstable
@ -43,8 +49,11 @@ public class ContainerStatusPBImpl extends ContainerStatus {
boolean viaProto = false;
private ContainerId containerId = null;
private static final String HOST = "HOST";
private static final String IPS = "IPS";
private Map<String, String> containerAttributes = new HashMap<>();
public ContainerStatusPBImpl() {
builder = ContainerStatusProto.newBuilder();
}
@ -94,6 +103,9 @@ public class ContainerStatusPBImpl extends ContainerStatus {
if (containerId != null) {
builder.setContainerId(convertToProtoFormat(this.containerId));
}
if (containerAttributes != null && !containerAttributes.isEmpty()) {
addContainerAttributesToProto();
}
}
private synchronized void mergeLocalToProto() {
@ -111,6 +123,57 @@ public class ContainerStatusPBImpl extends ContainerStatus {
viaProto = false;
}
private void addContainerAttributesToProto() {
maybeInitBuilder();
builder.clearContainerAttributes();
if (containerAttributes == null) {
return;
}
Iterable<YarnProtos.StringStringMapProto> iterable =
new Iterable<YarnProtos.StringStringMapProto>() {
@Override
public Iterator<YarnProtos.StringStringMapProto> iterator() {
return new Iterator<YarnProtos.StringStringMapProto>() {
private Iterator<String> keyIter =
containerAttributes.keySet().iterator();
@Override public void remove() {
throw new UnsupportedOperationException();
}
@Override public YarnProtos.StringStringMapProto next() {
String key = keyIter.next();
String value = containerAttributes.get(key);
if (value == null) {
value = "";
}
return YarnProtos.StringStringMapProto.newBuilder().setKey(key)
.setValue((value)).build();
}
@Override public boolean hasNext() {
return keyIter.hasNext();
}
};
}
};
builder.addAllContainerAttributes(iterable);
}
private void initContainerAttributes() {
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
List<YarnProtos.StringStringMapProto> list = p.getContainerAttributesList();
for (YarnProtos.StringStringMapProto c : list) {
if (!containerAttributes.containsKey(c.getKey())) {
this.containerAttributes.put(c.getKey(), c.getValue());
}
}
}
@Override
public synchronized ExecutionType getExecutionType() {
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
@ -211,6 +274,44 @@ public class ContainerStatusPBImpl extends ContainerStatus {
builder.setCapability(convertToProtoFormat(capability));
}
@Override
public synchronized List<String> getIPs() {
if (!containerAttributes.containsKey(IPS)) {
initContainerAttributes();
}
String ips = containerAttributes.get((IPS));
return ips == null ? null : Arrays.asList(ips.split(","));
}
@Override
public synchronized void setIPs(List<String> ips) {
maybeInitBuilder();
if (ips == null) {
containerAttributes.remove(IPS);
addContainerAttributesToProto();
return;
}
containerAttributes.put(IPS, StringUtils.join(",", ips));
}
@Override
public synchronized String getHost() {
if (containerAttributes.get(HOST) == null) {
initContainerAttributes();
}
return containerAttributes.get(HOST);
}
@Override
public synchronized void setHost(String host) {
maybeInitBuilder();
if (host == null) {
containerAttributes.remove(HOST);
return;
}
containerAttributes.put(HOST, host);
}
private ContainerStateProto convertToProtoFormat(ContainerState e) {
return ProtoUtils.convertToProtoFormat(e);
}

View File

@ -35,9 +35,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
@ -161,4 +163,19 @@ public class TestProtocolRecords {
Assert.assertEquals(321,
pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
}
@Test
public void testContainerStatus() {
ContainerStatus status = Records.newRecord(ContainerStatus.class);
List<String> ips = Arrays.asList("127.0.0.1", "139.5.25.2");
status.setIPs(ips);
status.setHost("locahost123");
ContainerStatusPBImpl pb =
new ContainerStatusPBImpl(((ContainerStatusPBImpl) status).getProto());
Assert.assertEquals(ips, pb.getIPs());
Assert.assertEquals("locahost123", pb.getHost());
status.setIPs(null);
Assert.assertNull(status.getIPs());
}
}

View File

@ -22,6 +22,8 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@ -626,6 +628,26 @@ public abstract class ContainerExecutor implements Configurable {
}
}
// LinuxContainerExecutor overrides this method and behaves differently.
public String[] getIpAndHost(Container container) {
return getLocalIpAndHost(container);
}
// ipAndHost[0] contains ip.
// ipAndHost[1] contains hostname.
public static String[] getLocalIpAndHost(Container container) {
String[] ipAndHost = new String[2];
try {
InetAddress address = InetAddress.getLocalHost();
ipAndHost[0] = address.getHostAddress();
ipAndHost[1] = address.getHostName();
} catch (UnknownHostException e) {
LOG.error("Unable to get Local hostname and ip for " + container
.getContainerId(), e);
}
return ipAndHost;
}
/**
* Mark the container as inactive. For inactive containers this
* method has no effect.

View File

@ -523,6 +523,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
return 0;
}
@Override
public String[] getIpAndHost(Container container) {
return linuxContainerRuntime.getIpAndHost(container);
}
@Override
public int reacquireContainer(ContainerReacquisitionContext ctx)
throws IOException, InterruptedException {

View File

@ -68,6 +68,8 @@ public interface Container extends EventHandler<ContainerEvent> {
void setLogDir(String logDir);
void setIpAndHost(String[] ipAndHost);
String toString();
Priority getPriority();

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@ -115,6 +116,8 @@ public class ContainerImpl implements Container {
private int remainingRetryAttempts;
private String workDir;
private String logDir;
private String host;
private String ips;
/** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf;
@ -507,9 +510,12 @@ public class ContainerImpl implements Container {
public ContainerStatus cloneAndGetContainerStatus() {
this.readLock.lock();
try {
return BuilderUtils.newContainerStatus(this.containerId,
ContainerStatus status = BuilderUtils.newContainerStatus(this.containerId,
getCurrentState(), diagnostics.toString(), exitCode, getResource(),
this.containerTokenIdentifier.getExecutionType());
status.setIPs(ips == null ? null : Arrays.asList(ips.split(",")));
status.setHost(host);
return status;
} finally {
this.readLock.unlock();
}
@ -566,6 +572,12 @@ public class ContainerImpl implements Container {
this.workDir = workDir;
}
@Override
public void setIpAndHost(String[] ipAndHost) {
this.ips = ipAndHost[0];
this.host = ipAndHost[1];
}
@Override
public String getLogDir() {
return logDir;

View File

@ -157,13 +157,15 @@ public class PrivilegedOperationExecutor {
}
} catch (ExitCodeException e) {
if (operation.isFailureLoggingEnabled()) {
StringBuilder logBuilder = new StringBuilder("Shell execution returned "
+ "exit code: ")
.append(exec.getExitCode())
.append(". Privileged Execution Operation Output: ")
.append(System.lineSeparator()).append(exec.getOutput());
.append(". Privileged Execution Operation Stderr: ")
.append(System.lineSeparator())
.append(e.getMessage())
.append(System.lineSeparator())
.append("Stdout: " + exec.getOutput())
.append(System.lineSeparator());
logBuilder.append("Full command array for failed execution: ")
.append(System.lineSeparator());
logBuilder.append(Arrays.toString(fullCommandArray));

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
@ -162,4 +163,9 @@ public class DefaultLinuxContainerRuntime implements LinuxContainerRuntime {
throws ContainerExecutionException {
}
@Override
public String[] getIpAndHost(Container container) {
return ContainerExecutor.getLocalIpAndHost(container);
}
}

View File

@ -116,4 +116,10 @@ public class DelegatingLinuxContainerRuntime implements LinuxContainerRuntime {
runtime.reapContainer(ctx);
}
@Override
public String[] getIpAndHost(Container container) {
LinuxContainerRuntime runtime = pickContainerRuntime(container);
return runtime.getIpAndHost(container);
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerClient;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerInspectCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStopCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
@ -593,4 +594,41 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
public void reapContainer(ContainerRuntimeContext ctx)
throws ContainerExecutionException {
}
// ipAndHost[0] contains comma separated list of IPs
// ipAndHost[1] contains the hostname.
@Override
public String[] getIpAndHost(Container container) {
String containerId = container.getContainerId().toString();
DockerInspectCommand inspectCommand =
new DockerInspectCommand(containerId).getIpAndHost();
try {
String commandFile = dockerClient.writeCommandToTempFile(inspectCommand,
containerId);
PrivilegedOperation privOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.RUN_DOCKER_CMD);
privOp.appendArgs(commandFile);
String output = privilegedOperationExecutor
.executePrivilegedOperation(null, privOp, null,
container.getLaunchContext().getEnvironment(), true, false);
LOG.info("Docker inspect output for " + containerId + ": " + output);
int index = output.lastIndexOf(',');
if (index == -1) {
LOG.error("Incorrect format for ip and host");
return null;
}
String ips = output.substring(0, index).trim();
String host = output.substring(index+1).trim();
String[] ipAndHost = new String[2];
ipAndHost[0] = ips;
ipAndHost[1] = host;
return ipAndHost;
} catch (ContainerExecutionException e) {
LOG.error("Error when writing command to temp file", e);
} catch (PrivilegedOperationException e) {
LOG.error("Error when executing command.", e);
}
return null;
}
}

View File

@ -38,4 +38,14 @@ public class DockerInspectCommand extends DockerCommand {
super.addCommandArguments(containerName);
return this;
}
public DockerInspectCommand getIpAndHost() {
// Be sure to not use space in the argument, otherwise the
// extract_values_delim method in container-executor binary
// cannot parse the arguments correctly.
super.addCommandArguments("--format='{{range(.NetworkSettings.Networks)}}"
+ "{{.IPAddress}},{{end}}{{.Config.Hostname}}'");
super.addCommandArguments(containerName);
return this;
}
}

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -45,8 +43,10 @@ import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
public class ContainersMonitorImpl extends AbstractService implements
ContainersMonitor {
@ -447,6 +447,17 @@ public class ContainersMonitorImpl extends AbstractService implements
containerMetricsUnregisterDelayMs);
usageMetrics.recordProcessId(pId);
}
Container container = context.getContainers().get(containerId);
String[] ipAndHost = containerExecutor.getIpAndHost(container);
if (ipAndHost != null && ipAndHost[0] != null
&& ipAndHost[1] != null) {
container.setIpAndHost(ipAndHost);
LOG.info(containerId + "'s ip = " + ipAndHost[0]
+ ", and hostname = " + ipAndHost[1]);
} else {
LOG.info("Can not get both ip and hostname: " + Arrays
.toString(ipAndHost));
}
}
}
// End of initializing any uninitialized processTrees

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
/**
* An abstraction for various container runtime implementations. Examples
@ -73,4 +74,9 @@ public interface ContainerRuntime {
*/
void reapContainer(ContainerRuntimeContext ctx)
throws ContainerExecutionException;
/**
* Return the host and ip of the container
*/
String[] getIpAndHost(Container container);
}

View File

@ -419,6 +419,13 @@ int change_user(uid_t user, gid_t group) {
return 0;
}
char* check_docker_binary(char *docker_binary) {
if (docker_binary == NULL) {
return "docker";
}
return docker_binary;
}
/**
* Utility function to concatenate argB to argA using the concat_pattern.
*/
@ -1123,6 +1130,8 @@ char* parse_docker_command_file(const char* command_file) {
int run_docker(const char *command_file) {
char* docker_command = parse_docker_command_file(command_file);
char* docker_binary = get_value(DOCKER_BINARY_KEY, &executor_cfg);
docker_binary = check_docker_binary(docker_binary);
char* docker_command_with_binary = calloc(sizeof(char), EXECUTOR_PATH_MAX);
snprintf(docker_command_with_binary, EXECUTOR_PATH_MAX, "%s %s", docker_binary, docker_command);
char **args = extract_values_delim(docker_command_with_binary, " ");
@ -1288,9 +1297,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
char *docker_command = parse_docker_command_file(command_file);
char *docker_binary = get_value(DOCKER_BINARY_KEY, &executor_cfg);
if (docker_binary == NULL) {
docker_binary = "docker";
}
docker_binary = check_docker_binary(docker_binary);
fprintf(LOGFILE, "Creating script paths...\n");
exit_code = create_script_paths(

View File

@ -47,4 +47,13 @@ public class TestDockerInspectCommand {
assertEquals("inspect --format='{{.State.Status}}' foo",
dockerInspectCommand.getCommandWithArguments());
}
@Test
public void testGetIpAndHost() throws Exception {
dockerInspectCommand.getIpAndHost();
assertEquals(
"inspect --format='{{range(.NetworkSettings.Networks)}}{{.IPAddress}}"
+ ",{{end}}{{.Config.Hostname}}' foo",
dockerInspectCommand.getCommandWithArguments());
}
}

View File

@ -174,7 +174,13 @@ public class MockContainer implements Container {
public void setLogDir(String logDir) {
}
@Override
public Priority getPriority() {
return Priority.UNDEFINED;
}
@Override
public void setIpAndHost(String[] ipAndHost) {
}
}