YARN-5430. Return container's ip and host from NM ContainerStatus call. Contributed by Jian He.
This commit is contained in:
parent
ee3358402a
commit
49a97a76f5
|
@ -25,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@code ContainerStatus} represents the current status of a
|
* {@code ContainerStatus} represents the current status of a
|
||||||
* {@code Container}.
|
* {@code Container}.
|
||||||
|
@ -151,4 +153,28 @@ public abstract class ContainerStatus {
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setCapability(Resource capability);
|
public abstract void setCapability(Resource capability);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all the IP addresses with which the container run.
|
||||||
|
* @return The IP address where the container runs.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract List<String> getIPs();
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public abstract void setIPs(List<String> ips);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the hostname where the container runs.
|
||||||
|
* @return The hostname where the container runs.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract String getHost();
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public abstract void setHost(String host);
|
||||||
}
|
}
|
||||||
|
|
|
@ -529,6 +529,7 @@ message ContainerStatusProto {
|
||||||
optional int32 exit_status = 4 [default = -1000];
|
optional int32 exit_status = 4 [default = -1000];
|
||||||
optional ResourceProto capability = 5;
|
optional ResourceProto capability = 5;
|
||||||
optional ExecutionTypeProto executionType = 6 [default = GUARANTEED];
|
optional ExecutionTypeProto executionType = 6 [default = GUARANTEED];
|
||||||
|
repeated StringStringMapProto container_attributes = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ContainerExitStatusProto {
|
enum ContainerExitStatusProto {
|
||||||
|
|
|
@ -21,11 +21,13 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
|
||||||
|
@ -33,7 +35,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProtoOrBuilder;
|
||||||
|
|
||||||
import com.google.protobuf.TextFormat;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
|
@ -43,6 +49,9 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
|
||||||
private ContainerId containerId = null;
|
private ContainerId containerId = null;
|
||||||
|
private static final String HOST = "HOST";
|
||||||
|
private static final String IPS = "IPS";
|
||||||
|
private Map<String, String> containerAttributes = new HashMap<>();
|
||||||
|
|
||||||
|
|
||||||
public ContainerStatusPBImpl() {
|
public ContainerStatusPBImpl() {
|
||||||
|
@ -94,6 +103,9 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||||
if (containerId != null) {
|
if (containerId != null) {
|
||||||
builder.setContainerId(convertToProtoFormat(this.containerId));
|
builder.setContainerId(convertToProtoFormat(this.containerId));
|
||||||
}
|
}
|
||||||
|
if (containerAttributes != null && !containerAttributes.isEmpty()) {
|
||||||
|
addContainerAttributesToProto();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void mergeLocalToProto() {
|
private synchronized void mergeLocalToProto() {
|
||||||
|
@ -111,6 +123,57 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||||
viaProto = false;
|
viaProto = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void addContainerAttributesToProto() {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.clearContainerAttributes();
|
||||||
|
if (containerAttributes == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Iterable<YarnProtos.StringStringMapProto> iterable =
|
||||||
|
new Iterable<YarnProtos.StringStringMapProto>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<YarnProtos.StringStringMapProto> iterator() {
|
||||||
|
return new Iterator<YarnProtos.StringStringMapProto>() {
|
||||||
|
|
||||||
|
private Iterator<String> keyIter =
|
||||||
|
containerAttributes.keySet().iterator();
|
||||||
|
|
||||||
|
@Override public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public YarnProtos.StringStringMapProto next() {
|
||||||
|
String key = keyIter.next();
|
||||||
|
String value = containerAttributes.get(key);
|
||||||
|
|
||||||
|
if (value == null) {
|
||||||
|
value = "";
|
||||||
|
}
|
||||||
|
|
||||||
|
return YarnProtos.StringStringMapProto.newBuilder().setKey(key)
|
||||||
|
.setValue((value)).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean hasNext() {
|
||||||
|
return keyIter.hasNext();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
builder.addAllContainerAttributes(iterable);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initContainerAttributes() {
|
||||||
|
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
List<YarnProtos.StringStringMapProto> list = p.getContainerAttributesList();
|
||||||
|
for (YarnProtos.StringStringMapProto c : list) {
|
||||||
|
if (!containerAttributes.containsKey(c.getKey())) {
|
||||||
|
this.containerAttributes.put(c.getKey(), c.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized ExecutionType getExecutionType() {
|
public synchronized ExecutionType getExecutionType() {
|
||||||
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
|
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
@ -211,6 +274,44 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||||
builder.setCapability(convertToProtoFormat(capability));
|
builder.setCapability(convertToProtoFormat(capability));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized List<String> getIPs() {
|
||||||
|
if (!containerAttributes.containsKey(IPS)) {
|
||||||
|
initContainerAttributes();
|
||||||
|
}
|
||||||
|
String ips = containerAttributes.get((IPS));
|
||||||
|
return ips == null ? null : Arrays.asList(ips.split(","));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void setIPs(List<String> ips) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (ips == null) {
|
||||||
|
containerAttributes.remove(IPS);
|
||||||
|
addContainerAttributesToProto();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
containerAttributes.put(IPS, StringUtils.join(",", ips));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized String getHost() {
|
||||||
|
if (containerAttributes.get(HOST) == null) {
|
||||||
|
initContainerAttributes();
|
||||||
|
}
|
||||||
|
return containerAttributes.get(HOST);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void setHost(String host) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (host == null) {
|
||||||
|
containerAttributes.remove(HOST);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
containerAttributes.put(HOST, host);
|
||||||
|
}
|
||||||
|
|
||||||
private ContainerStateProto convertToProtoFormat(ContainerState e) {
|
private ContainerStateProto convertToProtoFormat(ContainerState e) {
|
||||||
return ProtoUtils.convertToProtoFormat(e);
|
return ProtoUtils.convertToProtoFormat(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,9 +35,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
|
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
|
||||||
|
@ -161,4 +163,19 @@ public class TestProtocolRecords {
|
||||||
Assert.assertEquals(321,
|
Assert.assertEquals(321,
|
||||||
pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
|
pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerStatus() {
|
||||||
|
ContainerStatus status = Records.newRecord(ContainerStatus.class);
|
||||||
|
List<String> ips = Arrays.asList("127.0.0.1", "139.5.25.2");
|
||||||
|
status.setIPs(ips);
|
||||||
|
status.setHost("locahost123");
|
||||||
|
ContainerStatusPBImpl pb =
|
||||||
|
new ContainerStatusPBImpl(((ContainerStatusPBImpl) status).getProto());
|
||||||
|
Assert.assertEquals(ips, pb.getIPs());
|
||||||
|
Assert.assertEquals("locahost123", pb.getHost());
|
||||||
|
|
||||||
|
status.setIPs(null);
|
||||||
|
Assert.assertNull(status.getIPs());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,8 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -626,6 +628,26 @@ public abstract class ContainerExecutor implements Configurable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LinuxContainerExecutor overrides this method and behaves differently.
|
||||||
|
public String[] getIpAndHost(Container container) {
|
||||||
|
return getLocalIpAndHost(container);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ipAndHost[0] contains ip.
|
||||||
|
// ipAndHost[1] contains hostname.
|
||||||
|
public static String[] getLocalIpAndHost(Container container) {
|
||||||
|
String[] ipAndHost = new String[2];
|
||||||
|
try {
|
||||||
|
InetAddress address = InetAddress.getLocalHost();
|
||||||
|
ipAndHost[0] = address.getHostAddress();
|
||||||
|
ipAndHost[1] = address.getHostName();
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
LOG.error("Unable to get Local hostname and ip for " + container
|
||||||
|
.getContainerId(), e);
|
||||||
|
}
|
||||||
|
return ipAndHost;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mark the container as inactive. For inactive containers this
|
* Mark the container as inactive. For inactive containers this
|
||||||
* method has no effect.
|
* method has no effect.
|
||||||
|
|
|
@ -523,6 +523,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getIpAndHost(Container container) {
|
||||||
|
return linuxContainerRuntime.getIpAndHost(container);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int reacquireContainer(ContainerReacquisitionContext ctx)
|
public int reacquireContainer(ContainerReacquisitionContext ctx)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
|
@ -67,6 +67,8 @@ public interface Container extends EventHandler<ContainerEvent> {
|
||||||
|
|
||||||
void setLogDir(String logDir);
|
void setLogDir(String logDir);
|
||||||
|
|
||||||
|
void setIpAndHost(String[] ipAndHost);
|
||||||
|
|
||||||
String toString();
|
String toString();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -114,6 +115,8 @@ public class ContainerImpl implements Container {
|
||||||
private int remainingRetryAttempts;
|
private int remainingRetryAttempts;
|
||||||
private String workDir;
|
private String workDir;
|
||||||
private String logDir;
|
private String logDir;
|
||||||
|
private String host;
|
||||||
|
private String ips;
|
||||||
|
|
||||||
/** The NM-wide configuration - not specific to this container */
|
/** The NM-wide configuration - not specific to this container */
|
||||||
private final Configuration daemonConf;
|
private final Configuration daemonConf;
|
||||||
|
@ -501,9 +504,12 @@ public class ContainerImpl implements Container {
|
||||||
public ContainerStatus cloneAndGetContainerStatus() {
|
public ContainerStatus cloneAndGetContainerStatus() {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
try {
|
try {
|
||||||
return BuilderUtils.newContainerStatus(this.containerId,
|
ContainerStatus status = BuilderUtils.newContainerStatus(this.containerId,
|
||||||
getCurrentState(), diagnostics.toString(), exitCode, getResource(),
|
getCurrentState(), diagnostics.toString(), exitCode, getResource(),
|
||||||
this.containerTokenIdentifier.getExecutionType());
|
this.containerTokenIdentifier.getExecutionType());
|
||||||
|
status.setIPs(ips == null ? null : Arrays.asList(ips.split(",")));
|
||||||
|
status.setHost(host);
|
||||||
|
return status;
|
||||||
} finally {
|
} finally {
|
||||||
this.readLock.unlock();
|
this.readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -560,6 +566,12 @@ public class ContainerImpl implements Container {
|
||||||
this.workDir = workDir;
|
this.workDir = workDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setIpAndHost(String[] ipAndHost) {
|
||||||
|
this.ips = ipAndHost[0];
|
||||||
|
this.host = ipAndHost[1];
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getLogDir() {
|
public String getLogDir() {
|
||||||
return logDir;
|
return logDir;
|
||||||
|
|
|
@ -157,13 +157,15 @@ public class PrivilegedOperationExecutor {
|
||||||
}
|
}
|
||||||
} catch (ExitCodeException e) {
|
} catch (ExitCodeException e) {
|
||||||
if (operation.isFailureLoggingEnabled()) {
|
if (operation.isFailureLoggingEnabled()) {
|
||||||
|
|
||||||
StringBuilder logBuilder = new StringBuilder("Shell execution returned "
|
StringBuilder logBuilder = new StringBuilder("Shell execution returned "
|
||||||
+ "exit code: ")
|
+ "exit code: ")
|
||||||
.append(exec.getExitCode())
|
.append(exec.getExitCode())
|
||||||
.append(". Privileged Execution Operation Output: ")
|
.append(". Privileged Execution Operation Stderr: ")
|
||||||
.append(System.lineSeparator()).append(exec.getOutput());
|
.append(System.lineSeparator())
|
||||||
|
.append(e.getMessage())
|
||||||
|
.append(System.lineSeparator())
|
||||||
|
.append("Stdout: " + exec.getOutput())
|
||||||
|
.append(System.lineSeparator());
|
||||||
logBuilder.append("Full command array for failed execution: ")
|
logBuilder.append("Full command array for failed execution: ")
|
||||||
.append(System.lineSeparator());
|
.append(System.lineSeparator());
|
||||||
logBuilder.append(Arrays.toString(fullCommandArray));
|
logBuilder.append(Arrays.toString(fullCommandArray));
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
|
||||||
|
@ -162,4 +163,9 @@ public class DefaultLinuxContainerRuntime implements LinuxContainerRuntime {
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getIpAndHost(Container container) {
|
||||||
|
return ContainerExecutor.getLocalIpAndHost(container);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,4 +116,10 @@ public class DelegatingLinuxContainerRuntime implements LinuxContainerRuntime {
|
||||||
|
|
||||||
runtime.reapContainer(ctx);
|
runtime.reapContainer(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getIpAndHost(Container container) {
|
||||||
|
LinuxContainerRuntime runtime = pickContainerRuntime(container);
|
||||||
|
return runtime.getIpAndHost(container);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerClient;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerClient;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerInspectCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStopCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStopCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
|
||||||
|
@ -593,4 +594,41 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
|
||||||
public void reapContainer(ContainerRuntimeContext ctx)
|
public void reapContainer(ContainerRuntimeContext ctx)
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// ipAndHost[0] contains comma separated list of IPs
|
||||||
|
// ipAndHost[1] contains the hostname.
|
||||||
|
@Override
|
||||||
|
public String[] getIpAndHost(Container container) {
|
||||||
|
String containerId = container.getContainerId().toString();
|
||||||
|
DockerInspectCommand inspectCommand =
|
||||||
|
new DockerInspectCommand(containerId).getIpAndHost();
|
||||||
|
try {
|
||||||
|
String commandFile = dockerClient.writeCommandToTempFile(inspectCommand,
|
||||||
|
containerId);
|
||||||
|
PrivilegedOperation privOp = new PrivilegedOperation(
|
||||||
|
PrivilegedOperation.OperationType.RUN_DOCKER_CMD);
|
||||||
|
privOp.appendArgs(commandFile);
|
||||||
|
String output = privilegedOperationExecutor
|
||||||
|
.executePrivilegedOperation(null, privOp, null,
|
||||||
|
container.getLaunchContext().getEnvironment(), true, false);
|
||||||
|
LOG.info("Docker inspect output for " + containerId + ": " + output);
|
||||||
|
int index = output.lastIndexOf(',');
|
||||||
|
if (index == -1) {
|
||||||
|
LOG.error("Incorrect format for ip and host");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String ips = output.substring(0, index).trim();
|
||||||
|
String host = output.substring(index+1).trim();
|
||||||
|
String[] ipAndHost = new String[2];
|
||||||
|
ipAndHost[0] = ips;
|
||||||
|
ipAndHost[1] = host;
|
||||||
|
return ipAndHost;
|
||||||
|
} catch (ContainerExecutionException e) {
|
||||||
|
LOG.error("Error when writing command to temp file", e);
|
||||||
|
} catch (PrivilegedOperationException e) {
|
||||||
|
LOG.error("Error when executing command.", e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,4 +38,14 @@ public class DockerInspectCommand extends DockerCommand {
|
||||||
super.addCommandArguments(containerName);
|
super.addCommandArguments(containerName);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DockerInspectCommand getIpAndHost() {
|
||||||
|
// Be sure to not use space in the argument, otherwise the
|
||||||
|
// extract_values_delim method in container-executor binary
|
||||||
|
// cannot parse the arguments correctly.
|
||||||
|
super.addCommandArguments("--format='{{range(.NetworkSettings.Networks)}}"
|
||||||
|
+ "{{.IPAddress}},{{end}}{{.Config.Hostname}}'");
|
||||||
|
super.addCommandArguments(containerName);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -18,11 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -31,19 +28,22 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
|
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
|
||||||
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
|
|
||||||
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
||||||
|
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import java.util.Arrays;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class ContainersMonitorImpl extends AbstractService implements
|
public class ContainersMonitorImpl extends AbstractService implements
|
||||||
ContainersMonitor {
|
ContainersMonitor {
|
||||||
|
@ -437,6 +437,17 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
containerMetricsUnregisterDelayMs);
|
containerMetricsUnregisterDelayMs);
|
||||||
usageMetrics.recordProcessId(pId);
|
usageMetrics.recordProcessId(pId);
|
||||||
}
|
}
|
||||||
|
Container container = context.getContainers().get(containerId);
|
||||||
|
String[] ipAndHost = containerExecutor.getIpAndHost(container);
|
||||||
|
if (ipAndHost != null && ipAndHost[0] != null
|
||||||
|
&& ipAndHost[1] != null) {
|
||||||
|
container.setIpAndHost(ipAndHost);
|
||||||
|
LOG.info(containerId + "'s ip = " + ipAndHost[0]
|
||||||
|
+ ", and hostname = " + ipAndHost[1]);
|
||||||
|
} else {
|
||||||
|
LOG.info("Can not get both ip and hostname: " + Arrays
|
||||||
|
.toString(ipAndHost));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// End of initializing any uninitialized processTrees
|
// End of initializing any uninitialized processTrees
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An abstraction for various container runtime implementations. Examples
|
* An abstraction for various container runtime implementations. Examples
|
||||||
|
@ -73,4 +74,9 @@ public interface ContainerRuntime {
|
||||||
*/
|
*/
|
||||||
void reapContainer(ContainerRuntimeContext ctx)
|
void reapContainer(ContainerRuntimeContext ctx)
|
||||||
throws ContainerExecutionException;
|
throws ContainerExecutionException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the host and ip of the container
|
||||||
|
*/
|
||||||
|
String[] getIpAndHost(Container container);
|
||||||
}
|
}
|
|
@ -412,6 +412,13 @@ int change_user(uid_t user, gid_t group) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char* check_docker_binary(char *docker_binary) {
|
||||||
|
if (docker_binary == NULL) {
|
||||||
|
return "docker";
|
||||||
|
}
|
||||||
|
return docker_binary;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility function to concatenate argB to argA using the concat_pattern.
|
* Utility function to concatenate argB to argA using the concat_pattern.
|
||||||
*/
|
*/
|
||||||
|
@ -1100,6 +1107,8 @@ char* parse_docker_command_file(const char* command_file) {
|
||||||
int run_docker(const char *command_file) {
|
int run_docker(const char *command_file) {
|
||||||
char* docker_command = parse_docker_command_file(command_file);
|
char* docker_command = parse_docker_command_file(command_file);
|
||||||
char* docker_binary = get_value(DOCKER_BINARY_KEY, &executor_cfg);
|
char* docker_binary = get_value(DOCKER_BINARY_KEY, &executor_cfg);
|
||||||
|
docker_binary = check_docker_binary(docker_binary);
|
||||||
|
|
||||||
char* docker_command_with_binary = calloc(sizeof(char), EXECUTOR_PATH_MAX);
|
char* docker_command_with_binary = calloc(sizeof(char), EXECUTOR_PATH_MAX);
|
||||||
snprintf(docker_command_with_binary, EXECUTOR_PATH_MAX, "%s %s", docker_binary, docker_command);
|
snprintf(docker_command_with_binary, EXECUTOR_PATH_MAX, "%s %s", docker_binary, docker_command);
|
||||||
char **args = extract_values_delim(docker_command_with_binary, " ");
|
char **args = extract_values_delim(docker_command_with_binary, " ");
|
||||||
|
@ -1265,9 +1274,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
|
||||||
|
|
||||||
char *docker_command = parse_docker_command_file(command_file);
|
char *docker_command = parse_docker_command_file(command_file);
|
||||||
char *docker_binary = get_value(DOCKER_BINARY_KEY, &executor_cfg);
|
char *docker_binary = get_value(DOCKER_BINARY_KEY, &executor_cfg);
|
||||||
if (docker_binary == NULL) {
|
docker_binary = check_docker_binary(docker_binary);
|
||||||
docker_binary = "docker";
|
|
||||||
}
|
|
||||||
|
|
||||||
fprintf(LOGFILE, "Creating script paths...\n");
|
fprintf(LOGFILE, "Creating script paths...\n");
|
||||||
exit_code = create_script_paths(
|
exit_code = create_script_paths(
|
||||||
|
|
|
@ -47,4 +47,13 @@ public class TestDockerInspectCommand {
|
||||||
assertEquals("inspect --format='{{.State.Status}}' foo",
|
assertEquals("inspect --format='{{.State.Status}}' foo",
|
||||||
dockerInspectCommand.getCommandWithArguments());
|
dockerInspectCommand.getCommandWithArguments());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetIpAndHost() throws Exception {
|
||||||
|
dockerInspectCommand.getIpAndHost();
|
||||||
|
assertEquals(
|
||||||
|
"inspect --format='{{range(.NetworkSettings.Networks)}}{{.IPAddress}}"
|
||||||
|
+ ",{{end}}{{.Config.Hostname}}' foo",
|
||||||
|
dockerInspectCommand.getCommandWithArguments());
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -172,4 +172,9 @@ public class MockContainer implements Container {
|
||||||
@Override
|
@Override
|
||||||
public void setLogDir(String logDir) {
|
public void setLogDir(String logDir) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setIpAndHost(String[] ipAndHost) {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue