YARN-5168. Added exposed port information for Docker container.
Contributed by Xun Liu
This commit is contained in:
parent
2499435d9d
commit
f82922dcfa
|
@ -3642,7 +3642,7 @@ public class TestRMContainerAllocator {
|
|||
: RMContainerAllocator.PRIORITY_MAP;
|
||||
Container container = Container.newInstance(containerId,
|
||||
NodeId.newInstance(nodeName, 1234), nodeName + ":5678",
|
||||
Resource.newInstance(1024, 1), priority, null);
|
||||
Resource.newInstance(1024, 1), priority, null);
|
||||
containersToAllocate.add(container);
|
||||
return containerId;
|
||||
}
|
||||
|
|
|
@ -167,8 +167,10 @@ public class NodeInfo {
|
|||
list2.add(ContainerStatus.newInstance(cId, ContainerState.RUNNING, "",
|
||||
ContainerExitStatus.SUCCESS));
|
||||
}
|
||||
list.add(new UpdatedContainerInfo(new ArrayList<ContainerStatus>(),
|
||||
list2));
|
||||
List<Map.Entry<ApplicationId, ContainerStatus>> needUpdateContainers =
|
||||
new ArrayList<Map.Entry<ApplicationId, ContainerStatus>>();
|
||||
list.add(new UpdatedContainerInfo(new ArrayList<ContainerStatus>(),
|
||||
list2, needUpdateContainers));
|
||||
return list;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
|||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
|
@ -125,7 +127,20 @@ public abstract class Container implements Comparable<Container> {
|
|||
@Private
|
||||
@Unstable
|
||||
public abstract void setNodeHttpAddress(String nodeHttpAddress);
|
||||
|
||||
|
||||
/**
|
||||
* Get the exposed ports of the node on which the container is allocated.
|
||||
* @return exposed ports of the node on which the container is allocated
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract Map<String, List<Map<String, String>>> getExposedPorts();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setExposedPorts(
|
||||
Map<String, List<Map<String, String>>> ports);
|
||||
|
||||
/**
|
||||
* Get the <code>Resource</code> allocated to the container.
|
||||
* @return <code>Resource</code> allocated to the container
|
||||
|
|
|
@ -23,6 +23,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* {@code ContainerReport} is a report of an container.
|
||||
* <p>
|
||||
|
@ -77,6 +80,7 @@ public abstract class ContainerReport {
|
|||
report.setContainerState(containerState);
|
||||
report.setNodeHttpAddress(nodeHttpAddress);
|
||||
report.setExecutionType(executionType);
|
||||
|
||||
return report;
|
||||
}
|
||||
|
||||
|
@ -211,8 +215,22 @@ public abstract class ContainerReport {
|
|||
public abstract void setContainerExitStatus(int containerExitStatus);
|
||||
|
||||
/**
|
||||
* Get the Node Http address of the container
|
||||
* Get exposed ports of the container.
|
||||
*
|
||||
* @return the node exposed ports of the container
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getExposedPorts();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setExposedPorts(
|
||||
Map<String, List<Map<String, String>>> ports);
|
||||
|
||||
/**
|
||||
* Get the Node Http address of the container.
|
||||
*
|
||||
* @return the node http address of the container
|
||||
*/
|
||||
@Public
|
||||
|
|
|
@ -223,4 +223,22 @@ public abstract class ContainerStatus {
|
|||
throw new UnsupportedOperationException(
|
||||
"subclass must implement this method");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get exposed ports of the container.
|
||||
* @return List of exposed ports
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public String getExposedPorts() {
|
||||
throw new UnsupportedOperationException(
|
||||
"subclass must implement this method");
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setExposedPorts(String ports) {
|
||||
throw new UnsupportedOperationException(
|
||||
"subclass must implement this method");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,6 +155,7 @@ message ContainerProto {
|
|||
optional int64 allocation_request_id = 8 [default = -1];
|
||||
optional int32 version = 9 [default = 0];
|
||||
repeated string allocation_tags = 10;
|
||||
optional string exposed_ports = 11;
|
||||
}
|
||||
|
||||
message ContainerReportProto {
|
||||
|
@ -170,6 +171,7 @@ message ContainerReportProto {
|
|||
optional ContainerStateProto container_state = 10;
|
||||
optional string node_http_address = 11;
|
||||
optional ExecutionTypeProto executionType = 12 [default = GUARANTEED];
|
||||
optional string exposed_ports = 13;
|
||||
}
|
||||
|
||||
enum YarnApplicationStateProto {
|
||||
|
|
|
@ -159,7 +159,8 @@ public class TestDSAppMaster {
|
|||
|
||||
private Container generateContainer(ContainerId cid) {
|
||||
return Container.newInstance(cid, NodeId.newInstance("host", 5000),
|
||||
"host:80", Resource.newInstance(1024, 1), Priority.newInstance(0), null);
|
||||
"host:80", Resource.newInstance(1024, 1), Priority.newInstance(0),
|
||||
null);
|
||||
}
|
||||
|
||||
private ContainerStatus
|
||||
|
|
|
@ -21,6 +21,8 @@ import io.swagger.annotations.ApiModel;
|
|||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
|
@ -52,6 +54,7 @@ public class Container extends BaseResource {
|
|||
private Resource resource = null;
|
||||
private Artifact artifact = null;
|
||||
private Boolean privilegedContainer = null;
|
||||
private Map<String, List<Map<String, String>>> exposedPorts = null;
|
||||
|
||||
/**
|
||||
* Unique container id of a running service, e.g.
|
||||
|
@ -244,6 +247,17 @@ public class Container extends BaseResource {
|
|||
this.privilegedContainer = privilegedContainer;
|
||||
}
|
||||
|
||||
@ApiModelProperty(example = "null",
|
||||
value = "Ports exposed for this container.")
|
||||
@JsonProperty("exposed_ports")
|
||||
public Map<String, List<Map<String, String>>> getExposedPorts() {
|
||||
return exposedPorts;
|
||||
}
|
||||
|
||||
public void setExposedPorts(Map<String, List<Map<String, String>>> ports) {
|
||||
this.exposedPorts = ports;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(java.lang.Object o) {
|
||||
if (this == o) {
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.yarn.service.component.instance;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -65,6 +67,8 @@ import java.io.IOException;
|
|||
import java.text.MessageFormat;
|
||||
import java.util.Date;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -784,6 +788,15 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
doRegistryUpdate = false;
|
||||
}
|
||||
}
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
try {
|
||||
Map<String, List<Map<String, String>>> ports = null;
|
||||
ports = mapper.readValue(status.getExposedPorts(),
|
||||
new TypeReference<Map<String, List<Map<String, String>>>>(){});
|
||||
container.setExposedPorts(ports);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to process container ports mapping: {}", e);
|
||||
}
|
||||
setContainerStatus(status.getContainerId(), status);
|
||||
if (containerRec != null && timelineServiceEnabled && doRegistryUpdate) {
|
||||
serviceTimelinePublisher.componentInstanceIPHostUpdated(containerRec);
|
||||
|
|
|
@ -69,6 +69,8 @@ public final class ServiceTimelineMetricsConstants {
|
|||
*/
|
||||
public static final String IP = "IP";
|
||||
|
||||
public static final String EXPOSED_PORTS = "EXPOSED_PORTS";
|
||||
|
||||
public static final String HOSTNAME = "HOSTNAME";
|
||||
|
||||
public static final String BARE_HOST = "BARE_HOST";
|
||||
|
|
|
@ -208,6 +208,8 @@ public class ServiceTimelinePublisher extends CompositeService {
|
|||
// create info keys
|
||||
Map<String, Object> entityInfos = new HashMap<String, Object>();
|
||||
entityInfos.put(ServiceTimelineMetricsConstants.IP, container.getIp());
|
||||
entityInfos.put(ServiceTimelineMetricsConstants.EXPOSED_PORTS,
|
||||
container.getExposedPorts());
|
||||
entityInfos.put(ServiceTimelineMetricsConstants.HOSTNAME,
|
||||
container.getHostname());
|
||||
entityInfos.put(ServiceTimelineMetricsConstants.STATE,
|
||||
|
|
|
@ -984,6 +984,9 @@ public class ApplicationCLI extends YarnCLI {
|
|||
containerReportStr.print("\tNodeHttpAddress : ");
|
||||
containerReportStr.println(containerReport.getNodeHttpAddress() == null
|
||||
? "N/A" : containerReport.getNodeHttpAddress());
|
||||
containerReportStr.print("\tExposedPorts : ");
|
||||
containerReportStr.println(containerReport.getExposedPorts() == null
|
||||
? "N/A" : containerReport.getExposedPorts());
|
||||
containerReportStr.print("\tDiagnostics : ");
|
||||
containerReportStr.print(containerReport.getDiagnosticsInfo());
|
||||
} else {
|
||||
|
|
|
@ -96,11 +96,14 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
public class TestYarnCLI {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestYarnCLI.class);
|
||||
private YarnClient client = mock(YarnClient.class);
|
||||
ByteArrayOutputStream sysOutStream;
|
||||
private PrintStream sysOut;
|
||||
|
@ -277,10 +280,17 @@ public class TestYarnCLI {
|
|||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||
applicationId, 1);
|
||||
ContainerId containerId = ContainerId.newContainerId(attemptId, 1);
|
||||
Map<String, List<Map<String, String>>> ports = new HashMap<>();
|
||||
ArrayList<Map<String, String>> list = new ArrayList();
|
||||
HashMap<String, String> map = new HashMap();
|
||||
map.put("abc", "123");
|
||||
list.add(map);
|
||||
ports.put("192.168.0.1", list);
|
||||
ContainerReport container = ContainerReport.newInstance(containerId, null,
|
||||
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
|
||||
"diagnosticInfo", "logURL", 0, ContainerState.COMPLETE,
|
||||
"http://" + NodeId.newInstance("host", 2345).toString());
|
||||
container.setExposedPorts(ports);
|
||||
when(client.getContainerReport(any(ContainerId.class))).thenReturn(
|
||||
container);
|
||||
int result = cli.run(new String[] { "container", "-status",
|
||||
|
@ -298,9 +308,11 @@ public class TestYarnCLI {
|
|||
pw.println("\tLOG-URL : logURL");
|
||||
pw.println("\tHost : host:1234");
|
||||
pw.println("\tNodeHttpAddress : http://host:2345");
|
||||
pw.println("\tExposedPorts : {\"192.168.0.1\":[{\"abc\":\"123\"}]}");
|
||||
pw.println("\tDiagnostics : diagnosticInfo");
|
||||
pw.close();
|
||||
String appReportStr = baos.toString("UTF-8");
|
||||
|
||||
Assert.assertEquals(appReportStr, sysOutStream.toString());
|
||||
verify(sysOut, times(1)).println(isA(String.class));
|
||||
}
|
||||
|
@ -315,18 +327,22 @@ public class TestYarnCLI {
|
|||
ContainerId containerId1 = ContainerId.newContainerId(attemptId, 2);
|
||||
ContainerId containerId2 = ContainerId.newContainerId(attemptId, 3);
|
||||
long time1=1234,time2=5678;
|
||||
Map<String, List<Map<String, String>>> ports = new HashMap<>();
|
||||
ContainerReport container = ContainerReport.newInstance(containerId, null,
|
||||
NodeId.newInstance("host", 1234), Priority.UNDEFINED, time1, time2,
|
||||
"diagnosticInfo", "logURL", 0, ContainerState.COMPLETE,
|
||||
"http://" + NodeId.newInstance("host", 2345).toString());
|
||||
container.setExposedPorts(ports);
|
||||
ContainerReport container1 = ContainerReport.newInstance(containerId1, null,
|
||||
NodeId.newInstance("host", 1234), Priority.UNDEFINED, time1, time2,
|
||||
"diagnosticInfo", "logURL", 0, ContainerState.COMPLETE,
|
||||
"http://" + NodeId.newInstance("host", 2345).toString());
|
||||
container1.setExposedPorts(ports);
|
||||
ContainerReport container2 = ContainerReport.newInstance(containerId2, null,
|
||||
NodeId.newInstance("host", 1234), Priority.UNDEFINED, time1,0,
|
||||
"diagnosticInfo", "", 0, ContainerState.RUNNING,
|
||||
"http://" + NodeId.newInstance("host", 2345).toString());
|
||||
container2.setExposedPorts(ports);
|
||||
List<ContainerReport> reports = new ArrayList<ContainerReport>();
|
||||
reports.add(container);
|
||||
reports.add(container1);
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
|
@ -37,6 +39,8 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@Private
|
||||
|
@ -53,6 +57,7 @@ public class ContainerPBImpl extends Container {
|
|||
private Priority priority = null;
|
||||
private Token containerToken = null;
|
||||
private Set<String> allocationTags = null;
|
||||
private Map<String, List<Map<String, String>>> exposedPorts = null;
|
||||
|
||||
public ContainerPBImpl() {
|
||||
builder = ContainerProto.newBuilder();
|
||||
|
@ -114,6 +119,11 @@ public class ContainerPBImpl extends Container {
|
|||
builder.clearAllocationTags();
|
||||
builder.addAllAllocationTags(this.allocationTags);
|
||||
}
|
||||
if (this.exposedPorts != null) {
|
||||
Gson gson = new Gson();
|
||||
String strExposedPorts = gson.toJson(this.exposedPorts);
|
||||
builder.setExposedPorts(strExposedPorts);
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -208,11 +218,38 @@ public class ContainerPBImpl extends Container {
|
|||
@Override
|
||||
public void setResource(Resource resource) {
|
||||
maybeInitBuilder();
|
||||
if (resource == null)
|
||||
if (resource == null) {
|
||||
builder.clearResource();
|
||||
}
|
||||
this.resource = resource;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, List<Map<String, String>>> getExposedPorts() {
|
||||
ContainerProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.exposedPorts != null) {
|
||||
return this.exposedPorts;
|
||||
}
|
||||
if (!p.hasExposedPorts()) {
|
||||
return null;
|
||||
}
|
||||
String ports = p.getExposedPorts();
|
||||
Gson gson = new Gson();
|
||||
this.exposedPorts = gson.fromJson(ports,
|
||||
new TypeToken<Map<String, List<Map<String, String>>>>(){}.getType());
|
||||
|
||||
return this.exposedPorts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExposedPorts(Map<String, List<Map<String, String>>> ports) {
|
||||
maybeInitBuilder();
|
||||
if (resource == null) {
|
||||
builder.clearExposedPorts();
|
||||
}
|
||||
this.exposedPorts = ports;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Priority getPriority() {
|
||||
ContainerProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
@ -251,8 +288,9 @@ public class ContainerPBImpl extends Container {
|
|||
@Override
|
||||
public void setContainerToken(Token containerToken) {
|
||||
maybeInitBuilder();
|
||||
if (containerToken == null)
|
||||
if (containerToken == null) {
|
||||
builder.clearContainerToken();
|
||||
}
|
||||
this.containerToken = containerToken;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -35,6 +36,9 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
|||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ContainerReportPBImpl extends ContainerReport {
|
||||
|
||||
ContainerReportProto proto = ContainerReportProto.getDefaultInstance();
|
||||
|
@ -207,6 +211,24 @@ public class ContainerReportPBImpl extends ContainerReport {
|
|||
builder.setContainerExitStatus(containerExitStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExposedPorts() {
|
||||
ContainerReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getExposedPorts();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExposedPorts(Map<String, List<Map<String, String>>> ports) {
|
||||
maybeInitBuilder();
|
||||
if (ports == null) {
|
||||
builder.clearExposedPorts();
|
||||
return;
|
||||
}
|
||||
Gson gson = new Gson();
|
||||
String strPorts = gson.toJson(ports);
|
||||
builder.setExposedPorts(strPorts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFinishTime(long finishTime) {
|
||||
maybeInitBuilder();
|
||||
|
@ -239,7 +261,6 @@ public class ContainerReportPBImpl extends ContainerReport {
|
|||
}
|
||||
|
||||
public ContainerReportProto getProto() {
|
||||
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
|
|
|
@ -52,6 +52,7 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
|||
private ContainerId containerId = null;
|
||||
private static final String HOST = "HOST";
|
||||
private static final String IPS = "IPS";
|
||||
private static final String PORTS = "PORTS";
|
||||
private Map<String, String> containerAttributes = new HashMap<>();
|
||||
|
||||
|
||||
|
@ -98,6 +99,7 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
|||
sb.append("ExitStatus: ").append(getExitStatus()).append(", ");
|
||||
sb.append("IP: ").append(getIPs()).append(", ");
|
||||
sb.append("Host: ").append(getHost()).append(", ");
|
||||
sb.append("ExposedPorts: ").append(getExposedPorts()).append(", ");
|
||||
sb.append("ContainerSubState: ").append(getContainerSubState());
|
||||
sb.append("]");
|
||||
return sb.toString();
|
||||
|
@ -318,6 +320,25 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
|||
containerAttributes.put(IPS, StringUtils.join(",", ips));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String getExposedPorts() {
|
||||
if (!containerAttributes.containsKey(PORTS)) {
|
||||
initContainerAttributes();
|
||||
}
|
||||
String ports = containerAttributes.get((PORTS));
|
||||
return ports == null ? "" : ports;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setExposedPorts(String ports) {
|
||||
maybeInitBuilder();
|
||||
if (ports == null) {
|
||||
containerAttributes.remove(PORTS);
|
||||
return;
|
||||
}
|
||||
containerAttributes.put(PORTS, ports);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String getHost() {
|
||||
if (containerAttributes.get(HOST) == null) {
|
||||
|
|
|
@ -73,6 +73,9 @@ public class ContainerMetricsConstants {
|
|||
public static final String ALLOCATED_HOST_HTTP_ADDRESS_INFO =
|
||||
"YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS";
|
||||
|
||||
public static final String ALLOCATED_EXPOSED_PORTS =
|
||||
"YARN_CONTAINER_ALLOCATED_EXPOSED_PORTS";
|
||||
|
||||
// Event of this type will be emitted by NM.
|
||||
public static final String LOCALIZATION_START_EVENT_TYPE =
|
||||
"YARN_NM_CONTAINER_LOCALIZATION_STARTED";
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
|||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Set;
|
||||
|
@ -68,6 +69,8 @@ public final class TimelineEntityV2Converter {
|
|||
int exitStatus = ContainerExitStatus.INVALID;
|
||||
ContainerState state = null;
|
||||
String nodeHttpAddress = null;
|
||||
Map<String, List<Map<String, String>>> exposedPorts = null;
|
||||
|
||||
Map<String, Object> entityInfo = entity.getInfo();
|
||||
if (entityInfo != null) {
|
||||
if (entityInfo
|
||||
|
@ -103,6 +106,12 @@ public final class TimelineEntityV2Converter {
|
|||
(String) entityInfo.get(
|
||||
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO);
|
||||
}
|
||||
if (entityInfo.containsKey(
|
||||
ContainerMetricsConstants.ALLOCATED_EXPOSED_PORTS)) {
|
||||
exposedPorts =
|
||||
(Map<String, List<Map<String, String>>>) entityInfo
|
||||
.get(ContainerMetricsConstants.ALLOCATED_EXPOSED_PORTS);
|
||||
}
|
||||
if (entityInfo.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)) {
|
||||
diagnosticsInfo =
|
||||
entityInfo.get(
|
||||
|
@ -136,12 +145,15 @@ public final class TimelineEntityV2Converter {
|
|||
if (allocatedHost != null) {
|
||||
allocatedNode = NodeId.newInstance(allocatedHost, allocatedPort);
|
||||
}
|
||||
return ContainerReport.newInstance(
|
||||
ContainerReport container = ContainerReport.newInstance(
|
||||
ContainerId.fromString(entity.getId()),
|
||||
Resource.newInstance(allocatedMem, allocatedVcore), allocatedNode,
|
||||
Priority.newInstance(allocatedPriority),
|
||||
createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state,
|
||||
nodeHttpAddress);
|
||||
container.setExposedPorts(exposedPorts);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
||||
public static ApplicationAttemptReport convertToApplicationAttemptReport(
|
||||
|
|
|
@ -209,13 +209,16 @@ public class ApplicationHistoryManagerImpl extends AbstractService implements
|
|||
containerHistory.getContainerId().toString(),
|
||||
containerHistory.getContainerId().toString(),
|
||||
user);
|
||||
return ContainerReport.newInstance(containerHistory.getContainerId(),
|
||||
containerHistory.getAllocatedResource(),
|
||||
containerHistory.getAssignedNode(), containerHistory.getPriority(),
|
||||
containerHistory.getStartTime(), containerHistory.getFinishTime(),
|
||||
containerHistory.getDiagnosticsInfo(), logUrl,
|
||||
containerHistory.getContainerExitStatus(),
|
||||
containerHistory.getContainerState(), null);
|
||||
ContainerReport container = ContainerReport.newInstance(
|
||||
containerHistory.getContainerId(),
|
||||
containerHistory.getAllocatedResource(),
|
||||
containerHistory.getAssignedNode(), containerHistory.getPriority(),
|
||||
containerHistory.getStartTime(), containerHistory.getFinishTime(),
|
||||
containerHistory.getDiagnosticsInfo(), logUrl,
|
||||
containerHistory.getContainerExitStatus(),
|
||||
containerHistory.getContainerState(), null);
|
||||
container.setExposedPorts(containerHistory.getExposedPorts());
|
||||
return container;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -572,6 +572,8 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
int exitStatus = ContainerExitStatus.INVALID;
|
||||
ContainerState state = null;
|
||||
String nodeHttpAddress = null;
|
||||
Map<String, List<Map<String, String>>> exposedPorts = null;
|
||||
|
||||
Map<String, Object> entityInfo = entity.getOtherInfo();
|
||||
if (entityInfo != null) {
|
||||
if (entityInfo
|
||||
|
@ -607,6 +609,12 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
(String) entityInfo
|
||||
.get(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO);
|
||||
}
|
||||
if (entityInfo.containsKey(
|
||||
ContainerMetricsConstants.ALLOCATED_EXPOSED_PORTS)) {
|
||||
exposedPorts =
|
||||
(Map<String, List<Map<String, String>>>) entityInfo
|
||||
.get(ContainerMetricsConstants.ALLOCATED_EXPOSED_PORTS);
|
||||
}
|
||||
}
|
||||
List<TimelineEvent> events = entity.getEvents();
|
||||
if (events != null) {
|
||||
|
@ -655,12 +663,15 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
containerId.toString(),
|
||||
user);
|
||||
}
|
||||
return ContainerReport.newInstance(
|
||||
ContainerReport container = ContainerReport.newInstance(
|
||||
ContainerId.fromString(entity.getEntityId()),
|
||||
Resource.newInstance(allocatedMem, allocatedVcore), allocatedNode,
|
||||
Priority.newInstance(allocatedPriority),
|
||||
createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state,
|
||||
nodeHttpAddress);
|
||||
container.setExposedPorts(exposedPorts);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
||||
private ApplicationReportExt generateApplicationReport(TimelineEntity entity,
|
||||
|
|
|
@ -26,6 +26,9 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The class contains all the fields that are stored persistently for
|
||||
* <code>RMContainer</code>.
|
||||
|
@ -52,6 +55,8 @@ public class ContainerHistoryData {
|
|||
|
||||
private ContainerState containerState;
|
||||
|
||||
private Map<String, List<Map<String, String>>> exposedPorts;
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public static ContainerHistoryData newInstance(ContainerId containerId,
|
||||
|
@ -68,6 +73,7 @@ public class ContainerHistoryData {
|
|||
containerHD.setDiagnosticsInfo(diagnosticsInfo);
|
||||
containerHD.setContainerExitStatus(containerExitCode);
|
||||
containerHD.setContainerState(containerState);
|
||||
|
||||
return containerHD;
|
||||
}
|
||||
|
||||
|
@ -179,4 +185,11 @@ public class ContainerHistoryData {
|
|||
this.containerState = containerState;
|
||||
}
|
||||
|
||||
public Map<String, List<Map<String, String>>> getExposedPorts() {
|
||||
return exposedPorts;
|
||||
}
|
||||
|
||||
public void setExposedPorts(Map<String, List<Map<String, String>>> ports) {
|
||||
this.exposedPorts = ports;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,7 @@ public class ContainerInfo {
|
|||
protected String nodeHttpAddress;
|
||||
protected String nodeId;
|
||||
protected Map<String, Long> allocatedResources;
|
||||
private String exposedPorts;
|
||||
|
||||
public ContainerInfo() {
|
||||
// JAXB needs this
|
||||
|
@ -76,9 +77,9 @@ public class ContainerInfo {
|
|||
containerState = container.getContainerState();
|
||||
nodeHttpAddress = container.getNodeHttpAddress();
|
||||
nodeId = container.getAssignedNode().toString();
|
||||
exposedPorts = container.getExposedPorts();
|
||||
|
||||
Resource allocated = container.getAllocatedResource();
|
||||
|
||||
if (allocated != null) {
|
||||
allocatedMB = allocated.getMemorySize();
|
||||
allocatedVCores = allocated.getVirtualCores();
|
||||
|
@ -159,4 +160,8 @@ public class ContainerInfo {
|
|||
public Map<String, Long> getAllocatedResources() {
|
||||
return Collections.unmodifiableMap(allocatedResources);
|
||||
}
|
||||
|
||||
public String getExposedPorts() {
|
||||
return exposedPorts;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -299,10 +299,12 @@ public class TestAMRMClientRelayerMetrics {
|
|||
List<UpdatedContainer> updated = new ArrayList<>();
|
||||
updated.add(UpdatedContainer
|
||||
.newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
|
||||
.newInstance(createContainerId(2), null, null, null, null, null)));
|
||||
.newInstance(createContainerId(2), null, null, null,
|
||||
null, null)));
|
||||
updated.add(UpdatedContainer
|
||||
.newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
|
||||
.newInstance(createContainerId(5), null, null, null, null, null)));
|
||||
.newInstance(createContainerId(5), null, null, null,
|
||||
null, null)));
|
||||
this.mockAMS.response.setUpdatedContainers(updated);
|
||||
|
||||
this.homeRelayer.allocate(getAllocateRequest());
|
||||
|
|
|
@ -930,4 +930,9 @@ public abstract class ContainerExecutor implements Configurable {
|
|||
}
|
||||
return symLinks;
|
||||
}
|
||||
|
||||
public String getExposedPorts(Container container)
|
||||
throws ContainerExecutionException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1038,4 +1038,9 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExposedPorts(Container container)
|
||||
throws ContainerExecutionException {
|
||||
return linuxContainerRuntime.getExposedPorts(container);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1340,8 +1340,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
if (isResourceChange) {
|
||||
increasedContainer =
|
||||
org.apache.hadoop.yarn.api.records.Container.newInstance(
|
||||
containerId, null, null, targetResource, null, null,
|
||||
currentExecType);
|
||||
containerId, null, null, targetResource, null,
|
||||
null, currentExecType);
|
||||
if (context.getIncreasedContainers().putIfAbsent(containerId,
|
||||
increasedContainer) != null){
|
||||
throw RPCUtil.getRemoteException("Container " + containerId.toString()
|
||||
|
@ -1510,6 +1510,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
sb.append(status.getIPs()).append(", ");
|
||||
sb.append("Host: ");
|
||||
sb.append(status.getHost()).append(", ");
|
||||
sb.append("ExposedPorts: ");
|
||||
sb.append(status.getExposedPorts()).append(", ");
|
||||
sb.append("ContainerSubState: ");
|
||||
sb.append(status.getContainerSubState());
|
||||
sb.append("]");
|
||||
|
|
|
@ -81,6 +81,8 @@ public interface Container extends EventHandler<ContainerEvent> {
|
|||
|
||||
void setIpAndHost(String[] ipAndHost);
|
||||
|
||||
void setExposedPorts(String ports);
|
||||
|
||||
String toString();
|
||||
|
||||
Priority getPriority();
|
||||
|
|
|
@ -176,6 +176,7 @@ public class ContainerImpl implements Container {
|
|||
private String logDir;
|
||||
private String host;
|
||||
private String ips;
|
||||
private String exposedPorts;
|
||||
private volatile ReInitializationContext reInitContext;
|
||||
private volatile boolean isReInitializing = false;
|
||||
private volatile boolean isMarkeForKilling = false;
|
||||
|
@ -857,6 +858,7 @@ public class ContainerImpl implements Container {
|
|||
Arrays.asList(ips.split(",")));
|
||||
status.setHost(host);
|
||||
status.setContainerSubState(getContainerSubState());
|
||||
status.setExposedPorts(exposedPorts);
|
||||
return status;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
|
@ -2270,4 +2272,9 @@ public class ContainerImpl implements Container {
|
|||
|| state == ContainerState.EXITED_WITH_FAILURE
|
||||
|| state == ContainerState.EXITED_WITH_SUCCESS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExposedPorts(String ports) {
|
||||
this.exposedPorts = ports;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -208,6 +208,11 @@ public class DefaultLinuxContainerRuntime implements LinuxContainerRuntime {
|
|||
return ContainerExecutor.getLocalIpAndHost(container);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExposedPorts(Container container) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOStreamPair execContainer(ContainerExecContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
|
|
|
@ -202,6 +202,13 @@ public class DelegatingLinuxContainerRuntime implements LinuxContainerRuntime {
|
|||
return runtime.getIpAndHost(container);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExposedPorts(Container container)
|
||||
throws ContainerExecutionException {
|
||||
LinuxContainerRuntime runtime = pickContainerRuntime(container);
|
||||
return runtime.getExposedPorts(container);
|
||||
}
|
||||
|
||||
private boolean isPluggableRuntime(String runtimeType) {
|
||||
for (LinuxContainerRuntimeConstants.RuntimeType type :
|
||||
LinuxContainerRuntimeConstants.RuntimeType.values()) {
|
||||
|
|
|
@ -1268,7 +1268,23 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
|
|||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getExposedPorts(Container container)
|
||||
throws ContainerExecutionException {
|
||||
ContainerId containerId = container.getContainerId();
|
||||
String containerIdStr = containerId.toString();
|
||||
DockerInspectCommand inspectCommand =
|
||||
new DockerInspectCommand(containerIdStr).getExposedPorts();
|
||||
try {
|
||||
String output = executeDockerInspect(containerId, inspectCommand);
|
||||
return output;
|
||||
} catch (ContainerExecutionException e) {
|
||||
LOG.error("Error when writing command to temp file", e);
|
||||
} catch (PrivilegedOperationException e) {
|
||||
LOG.error("Error when executing command.", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private PrivilegedOperation buildLaunchOp(ContainerRuntimeContext ctx,
|
||||
String commandFile, DockerCommand command) {
|
||||
|
|
|
@ -75,4 +75,11 @@ public class DockerInspectCommand extends DockerCommand {
|
|||
|
||||
public static final String STATUS_TEMPLATE = "{{.State.Status}}";
|
||||
public static final String STOPSIGNAL_TEMPLATE = "{{.Config.StopSignal}}";
|
||||
|
||||
public DockerInspectCommand getExposedPorts() {
|
||||
super.addCommandArguments("format", "{{json .NetworkSettings.Ports}}");
|
||||
this.commandArguments = "--format={{json .NetworkSettings.Ports}}";
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -616,6 +616,8 @@ public class ContainersMonitorImpl extends AbstractService implements
|
|||
LOG.info("Can not get both ip and hostname: "
|
||||
+ Arrays.toString(ipAndHost));
|
||||
}
|
||||
String exposedPorts = containerExecutor.getExposedPorts(container);
|
||||
container.setExposedPorts(exposedPorts);
|
||||
} else {
|
||||
LOG.info(containerId + " is missing. Not setting ip and hostname");
|
||||
}
|
||||
|
|
|
@ -105,4 +105,14 @@ public interface ContainerRuntime {
|
|||
* and hostname
|
||||
*/
|
||||
String[] getIpAndHost(Container container) throws ContainerExecutionException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the exposed ports of the container.
|
||||
* @param container the {@link Container}
|
||||
* @return List of exposed ports
|
||||
* @throws ContainerExecutionException if an error occurs while getting
|
||||
* the exposed ports
|
||||
*/
|
||||
String getExposedPorts(Container container)
|
||||
throws ContainerExecutionException;
|
||||
}
|
||||
|
|
|
@ -568,9 +568,10 @@ cleanup:
|
|||
}
|
||||
|
||||
int get_docker_inspect_command(const char *command_file, const struct configuration *conf, args *args) {
|
||||
const char *valid_format_strings[] = { "{{.State.Status}}",
|
||||
const char *valid_format_strings[] = {"{{.State.Status}}",
|
||||
"{{range(.NetworkSettings.Networks)}}{{.IPAddress}},{{end}}{{.Config.Hostname}}",
|
||||
"{{.State.Status}},{{.Config.StopSignal}}"};
|
||||
"{{json .NetworkSettings.Ports}}",
|
||||
"{{.State.Status}},{{.Config.StopSignal}}"};
|
||||
int ret = 0, i = 0, valid_format = 0;
|
||||
char *format = NULL, *container_name = NULL, *tmp_buffer = NULL;
|
||||
struct configuration command_config = {0, NULL};
|
||||
|
@ -590,7 +591,8 @@ int get_docker_inspect_command(const char *command_file, const struct configurat
|
|||
ret = INVALID_DOCKER_INSPECT_FORMAT;
|
||||
goto free_and_exit;
|
||||
}
|
||||
for (i = 0; i < 3; ++i) {
|
||||
|
||||
for (i = 0; i < 4; ++i) {
|
||||
if (strcmp(format, valid_format_strings[i]) == 0) {
|
||||
valid_format = 1;
|
||||
break;
|
||||
|
|
|
@ -171,14 +171,20 @@ namespace ContainerExecutor {
|
|||
" format={{range(.NetworkSettings.Networks)}}{{.IPAddress}},{{end}}{{.Config.Hostname}}\n"
|
||||
" name=container_e1_12312_11111_02_000001",
|
||||
"inspect --format={{range(.NetworkSettings.Networks)}}{{.IPAddress}},{{end}}{{.Config.Hostname}} container_e1_12312_11111_02_000001"));
|
||||
file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
|
||||
"[docker-command-execution]\n docker-command=inspect\n format={{json .NetworkSettings.Ports}}\n name=container_e1_12312_11111_02_000001",
|
||||
"inspect --format={{json .NetworkSettings.Ports}} container_e1_12312_11111_02_000001"));
|
||||
file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
|
||||
"[docker-command-execution]\n docker-command=inspect\n format={{.State.Status}},{{.Config.StopSignal}}\n name=container_e1_12312_11111_02_000001",
|
||||
"inspect --format={{.State.Status}},{{.Config.StopSignal}} container_e1_12312_11111_02_000001"));
|
||||
|
||||
std::vector<std::pair<std::string, int> > bad_file_cmd_vec;
|
||||
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||
"[docker-command-execution]\n docker-command=run\n format='{{.State.Status}}'",
|
||||
static_cast<int>(INCORRECT_COMMAND)));
|
||||
bad_file_cmd_vec.push_back(
|
||||
std::make_pair<std::string, int>("docker-command=inspect\n format='{{.State.Status}}'",
|
||||
static_cast<int>(INCORRECT_COMMAND)));
|
||||
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||
"docker-command=inspect\n format='{{.State.Status}}'",
|
||||
static_cast<int>(INCORRECT_COMMAND)));
|
||||
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||
"[docker-command-execution]\n docker-command=inspect\n format={{.State.Status}}\n name=",
|
||||
static_cast<int>(INVALID_DOCKER_CONTAINER_NAME)));
|
||||
|
@ -194,6 +200,12 @@ namespace ContainerExecutor {
|
|||
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||
"[docker-command-execution]\n docker-command=inspect\n format={{.IPAddress}}\n name=container_e1_12312_11111_02_000001",
|
||||
static_cast<int>(INVALID_DOCKER_INSPECT_FORMAT)));
|
||||
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||
"[docker-command-execution]\n docker-command=inspect\n format={{.NetworkSettings.Ports}}\n name=container_e1_12312_11111_02_000001",
|
||||
static_cast<int>(INVALID_DOCKER_INSPECT_FORMAT)));
|
||||
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||
"[docker-command-execution]\n docker-command=inspect\n format={{.Config.StopSignal}}\n name=container_e1_12312_11111_02_000001",
|
||||
static_cast<int>(INVALID_DOCKER_INSPECT_FORMAT)));
|
||||
|
||||
run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_inspect_command);
|
||||
}
|
||||
|
|
|
@ -62,6 +62,11 @@ public class MockLinuxContainerRuntime implements LinuxContainerRuntime {
|
|||
return new String[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExposedPorts(Container container) {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOStreamPair execContainer(ContainerExecContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
|
|
|
@ -192,6 +192,10 @@ public class MockContainer implements Container {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExposedPorts(String ports) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return false;
|
||||
|
|
|
@ -398,6 +398,9 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
|
|||
container.getAllocatedNode().getPort());
|
||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
|
||||
container.getAllocatedPriority().getPriority());
|
||||
entityInfo.put(
|
||||
ContainerMetricsConstants.ALLOCATED_EXPOSED_PORTS,
|
||||
container.getExposedPorts());
|
||||
entityInfo.put(
|
||||
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
|
||||
container.getNodeHttpAddress());
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -93,6 +94,10 @@ public interface RMContainer extends EventHandler<RMContainerEvent>,
|
|||
ContainerRequest getContainerRequest();
|
||||
|
||||
String getNodeHttpAddress();
|
||||
|
||||
Map<String, List<Map<String, String>>> getExposedPorts();
|
||||
|
||||
void setExposedPorts(Map<String, List<Map<String, String>>> exposed);
|
||||
|
||||
String getNodeLabelExpression();
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
|||
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
|
@ -796,7 +798,8 @@ public class RMContainerImpl implements RMContainer {
|
|||
this.getAllocatedSchedulerKey().getPriority(), this.getCreationTime(),
|
||||
this.getFinishTime(), this.getDiagnosticsInfo(), this.getLogURL(),
|
||||
this.getContainerExitStatus(), this.getContainerState(),
|
||||
this.getNodeHttpAddress(), this.getExecutionType());
|
||||
this.getNodeHttpAddress(), this.getExecutionType());
|
||||
containerReport.setExposedPorts(this.getExposedPorts());
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
@ -821,6 +824,19 @@ public class RMContainerImpl implements RMContainer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<Map<String, String>>> getExposedPorts() {
|
||||
if (container.getExposedPorts() == null) {
|
||||
return null;
|
||||
}
|
||||
return container.getExposedPorts();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExposedPorts(Map<String, List<Map<String, String>>> ports) {
|
||||
container.setExposedPorts(ports);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeLabelExpression() {
|
||||
if (nodeLabelExpression == null) {
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.apache.commons.collections.keyvalue.DefaultMapEntry;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -178,6 +179,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
private final Map<ContainerId, Container> toBeUpdatedContainers =
|
||||
new HashMap<>();
|
||||
|
||||
/*
|
||||
* Because the Docker container's Ip, Port Mapping and other properties
|
||||
* are generated after the container is launched, need to update the
|
||||
* container property information to the applications in the RM.
|
||||
*/
|
||||
private final Map<ContainerId, ContainerStatus> updatedExistContainers =
|
||||
new HashMap<>();
|
||||
|
||||
// NOTE: This is required for backward compatibility.
|
||||
private final Map<ContainerId, Container> toBeDecreasedContainers =
|
||||
new HashMap<>();
|
||||
|
@ -1371,6 +1380,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
new ArrayList<ContainerStatus>();
|
||||
List<ContainerStatus> newlyCompletedContainers =
|
||||
new ArrayList<ContainerStatus>();
|
||||
List<Map.Entry<ApplicationId, ContainerStatus>> needUpdateContainers =
|
||||
new ArrayList<Map.Entry<ApplicationId, ContainerStatus>>();
|
||||
int numRemoteRunningContainers = 0;
|
||||
for (ContainerStatus remoteContainer : containerStatuses) {
|
||||
ContainerId containerId = remoteContainer.getContainerId();
|
||||
|
@ -1412,6 +1423,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
containerAllocationExpirer
|
||||
.unregister(new AllocationExpirationInfo(containerId));
|
||||
}
|
||||
|
||||
// Check if you need to update the exist container status
|
||||
boolean needUpdate = false;
|
||||
if (!updatedExistContainers.containsKey(containerId)) {
|
||||
needUpdate = true;
|
||||
} else {
|
||||
ContainerStatus pContainer = updatedExistContainers.get(containerId);
|
||||
if (null != pContainer) {
|
||||
String preExposedPorts = pContainer.getExposedPorts();
|
||||
if (null != preExposedPorts &&
|
||||
!preExposedPorts.equals(remoteContainer.getExposedPorts())) {
|
||||
needUpdate = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (needUpdate) {
|
||||
updatedExistContainers.put(containerId, remoteContainer);
|
||||
needUpdateContainers.add(new DefaultMapEntry(containerAppId,
|
||||
remoteContainer));
|
||||
}
|
||||
} else {
|
||||
// A finished container
|
||||
launchedContainers.remove(containerId);
|
||||
|
@ -1434,9 +1465,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
}
|
||||
|
||||
if (newlyLaunchedContainers.size() != 0
|
||||
|| newlyCompletedContainers.size() != 0) {
|
||||
|| newlyCompletedContainers.size() != 0
|
||||
|| needUpdateContainers.size() != 0) {
|
||||
nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
|
||||
newlyCompletedContainers));
|
||||
newlyCompletedContainers, needUpdateContainers));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,20 +19,26 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
|
||||
public class UpdatedContainerInfo {
|
||||
private List<ContainerStatus> newlyLaunchedContainers;
|
||||
private List<ContainerStatus> completedContainers;
|
||||
private List<Map.Entry<ApplicationId, ContainerStatus>> updateContainers;
|
||||
|
||||
public UpdatedContainerInfo() {
|
||||
}
|
||||
|
||||
public UpdatedContainerInfo(List<ContainerStatus> newlyLaunchedContainers
|
||||
, List<ContainerStatus> completedContainers) {
|
||||
public UpdatedContainerInfo(List<ContainerStatus> newlyLaunchedContainers,
|
||||
List<ContainerStatus> completedContainers,
|
||||
List<Map.Entry<ApplicationId, ContainerStatus>>
|
||||
updateContainers) {
|
||||
this.newlyLaunchedContainers = newlyLaunchedContainers;
|
||||
this.completedContainers = completedContainers;
|
||||
this.updateContainers = updateContainers;
|
||||
}
|
||||
|
||||
public List<ContainerStatus> getNewlyLaunchedContainers() {
|
||||
|
@ -42,4 +48,8 @@ public class UpdatedContainerInfo {
|
|||
public List<ContainerStatus> getCompletedContainers() {
|
||||
return this.completedContainers;
|
||||
}
|
||||
|
||||
public List<Map.Entry<ApplicationId, ContainerStatus>> getUpdateContainers() {
|
||||
return this.updateContainers;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -31,6 +32,8 @@ import java.util.TimerTask;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -1008,11 +1011,14 @@ public abstract class AbstractYarnScheduler
|
|||
new ArrayList<>();
|
||||
List<ContainerStatus> completedContainers =
|
||||
new ArrayList<>();
|
||||
List<Map.Entry<ApplicationId, ContainerStatus>> updateExistContainers =
|
||||
new ArrayList<>();
|
||||
|
||||
for(UpdatedContainerInfo containerInfo : containerInfoList) {
|
||||
newlyLaunchedContainers
|
||||
.addAll(containerInfo.getNewlyLaunchedContainers());
|
||||
completedContainers.addAll(containerInfo.getCompletedContainers());
|
||||
updateExistContainers.addAll(containerInfo.getUpdateContainers());
|
||||
}
|
||||
|
||||
// Processing the newly launched containers
|
||||
|
@ -1028,6 +1034,30 @@ public abstract class AbstractYarnScheduler
|
|||
containerIncreasedOnNode(container.getId(), schedulerNode, container);
|
||||
}
|
||||
|
||||
// Processing the update exist containers
|
||||
for (Map.Entry<ApplicationId, ContainerStatus> c : updateExistContainers) {
|
||||
SchedulerApplication<T> app = applications.get(c.getKey());
|
||||
ContainerId containerId = c.getValue().getContainerId();
|
||||
String strExposedPorts = c.getValue().getExposedPorts();
|
||||
Map<String, List<Map<String, String>>> exposedPorts = null;
|
||||
if (null != strExposedPorts && !strExposedPorts.isEmpty()) {
|
||||
Gson gson = new Gson();
|
||||
exposedPorts = gson.fromJson(strExposedPorts,
|
||||
new TypeToken<Map<String, List<Map<String, String>>>>()
|
||||
{}.getType());
|
||||
}
|
||||
|
||||
RMContainer rmContainer
|
||||
= app.getCurrentAppAttempt().getRMContainer(containerId);
|
||||
if (null != rmContainer &&
|
||||
(null == rmContainer.getExposedPorts()
|
||||
|| rmContainer.getExposedPorts().size() == 0)) {
|
||||
LOG.info("update exist container " + containerId.getContainerId()
|
||||
+ ", strExposedPorts = " + strExposedPorts);
|
||||
rmContainer.setExposedPorts(exposedPorts);
|
||||
}
|
||||
}
|
||||
|
||||
return completedContainers;
|
||||
}
|
||||
|
||||
|
|
|
@ -320,6 +320,8 @@ public class ContainerUpdateContext {
|
|||
updatedResource,
|
||||
existingRMContainer.getContainer().getPriority(), null,
|
||||
tempContainer.getExecutionType());
|
||||
newContainer.setExposedPorts(
|
||||
existingRMContainer.getContainer().getExposedPorts());
|
||||
newContainer.setAllocationRequestId(
|
||||
existingRMContainer.getContainer().getAllocationRequestId());
|
||||
newContainer.setVersion(existingRMContainer.getContainer().getVersion());
|
||||
|
|
|
@ -21,6 +21,7 @@ import javax.xml.bind.annotation.XmlAccessType;
|
|||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
|
@ -43,6 +44,7 @@ public class AppAttemptInfo {
|
|||
protected String blacklistedNodes;
|
||||
private String nodesBlacklistedBySystem;
|
||||
protected String appAttemptId;
|
||||
private String exportPorts;
|
||||
|
||||
public AppAttemptInfo() {
|
||||
}
|
||||
|
@ -55,6 +57,7 @@ public class AppAttemptInfo {
|
|||
this.nodeId = "";
|
||||
this.logsLink = "";
|
||||
this.blacklistedNodes = "";
|
||||
this.exportPorts = "";
|
||||
if (attempt != null) {
|
||||
this.id = attempt.getAppAttemptId().getAttemptId();
|
||||
this.startTime = attempt.getStartTime();
|
||||
|
@ -68,6 +71,9 @@ public class AppAttemptInfo {
|
|||
+ masterContainer.getNodeHttpAddress(),
|
||||
masterContainer.getId().toString(), user);
|
||||
|
||||
Gson gson = new Gson();
|
||||
this.exportPorts = gson.toJson(masterContainer.getExposedPorts());
|
||||
|
||||
nodesBlacklistedBySystem =
|
||||
StringUtils.join(attempt.getAMBlacklistManager()
|
||||
.getBlacklistUpdates().getBlacklistAdditions(), ", ");
|
||||
|
|
|
@ -1449,7 +1449,8 @@ public class TestClientRMService {
|
|||
RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId,
|
||||
rmContext, yarnScheduler, null, asContext, config, null, app));
|
||||
Container container = Container.newInstance(
|
||||
ContainerId.newContainerId(attemptId, 1), null, "", null, null, null);
|
||||
ContainerId.newContainerId(attemptId, 1), null,
|
||||
"", null, null, null);
|
||||
RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
|
||||
SchedulerRequestKey.extractFrom(container), attemptId, null, "",
|
||||
rmContext));
|
||||
|
|
|
@ -358,7 +358,8 @@ public class TestRMWebServicesAppAttempts extends JerseyTestBase {
|
|||
WebServicesTestUtils.getXmlString(element, "containerId"),
|
||||
WebServicesTestUtils.getXmlString(element, "nodeHttpAddress"),
|
||||
WebServicesTestUtils.getXmlString(element, "nodeId"),
|
||||
WebServicesTestUtils.getXmlString(element, "logsLink"), user);
|
||||
WebServicesTestUtils.getXmlString(element, "logsLink"), user,
|
||||
WebServicesTestUtils.getXmlString(element, "exportPorts"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -366,18 +367,17 @@ public class TestRMWebServicesAppAttempts extends JerseyTestBase {
|
|||
String user)
|
||||
throws Exception {
|
||||
|
||||
assertEquals("incorrect number of elements", 10, info.length());
|
||||
assertEquals("incorrect number of elements", 11, info.length());
|
||||
|
||||
verifyAppAttemptInfoGeneric(appAttempt, info.getInt("id"),
|
||||
info.getLong("startTime"), info.getString("containerId"),
|
||||
info.getString("nodeHttpAddress"), info.getString("nodeId"),
|
||||
info.getString("logsLink"), user);
|
||||
info.getString("logsLink"), user, info.getString("exportPorts"));
|
||||
}
|
||||
|
||||
private void verifyAppAttemptInfoGeneric(RMAppAttempt appAttempt, int id,
|
||||
long startTime, String containerId, String nodeHttpAddress, String
|
||||
nodeId,
|
||||
String logsLink, String user) {
|
||||
nodeId, String logsLink, String user, String exportPorts) {
|
||||
|
||||
assertEquals("id doesn't match", appAttempt.getAppAttemptId()
|
||||
.getAttemptId(), id);
|
||||
|
|
|
@ -376,6 +376,18 @@ export default Ember.Component.extend({
|
|||
return 'N/A';
|
||||
}
|
||||
}
|
||||
}, {
|
||||
id: 'exposedPorts',
|
||||
headerTitle: 'Exposed Ports',
|
||||
contentPath: 'exposedPorts',
|
||||
getCellContent: function(row) {
|
||||
var ports = row.get('exposedPorts');
|
||||
if (ports) {
|
||||
return ports;
|
||||
} else {
|
||||
return 'N/A';
|
||||
}
|
||||
}
|
||||
}, {
|
||||
id: 'nodeHttpAddress',
|
||||
headerTitle: 'NodeManager Web UI',
|
||||
|
@ -485,6 +497,18 @@ export default Ember.Component.extend({
|
|||
return 'N/A';
|
||||
}
|
||||
}
|
||||
}, {
|
||||
id: 'exposedPorts',
|
||||
headerTitle: 'Exposed Ports',
|
||||
contentPath: 'exposedPorts',
|
||||
getCellContent: function(row) {
|
||||
var ports = row.get('exposedPorts');
|
||||
if (ports) {
|
||||
return ports;
|
||||
} else {
|
||||
return 'N/A';
|
||||
}
|
||||
}
|
||||
}, {
|
||||
id: 'nodeHttpAddress',
|
||||
headerTitle: 'Node Manager UI',
|
||||
|
|
|
@ -27,6 +27,7 @@ export default DS.Model.extend({
|
|||
containerId: DS.attr('string'),
|
||||
amContainerId: DS.attr('string'),
|
||||
nodeHttpAddress: DS.attr('string'),
|
||||
exposedPorts: DS.attr('string'),
|
||||
nodeId: DS.attr('string'),
|
||||
hosts: DS.attr('string'),
|
||||
logsLink: DS.attr('string'),
|
||||
|
|
|
@ -31,6 +31,7 @@ export default DS.Model.extend({
|
|||
node: DS.attr('string'),
|
||||
hostUrl: DS.attr('string'),
|
||||
ipAddr: DS.attr('string'),
|
||||
exposedPorts: DS.attr('string'),
|
||||
exitStatusCode: DS.attr('string'),
|
||||
|
||||
createdDate: Ember.computed('createdTimestamp', function() {
|
||||
|
|
|
@ -30,6 +30,7 @@ export default DS.Model.extend({
|
|||
containerExitStatus: DS.attr('number'),
|
||||
containerState: DS.attr('string'),
|
||||
nodeHttpAddress: DS.attr('string'),
|
||||
exposedPorts: DS.attr('string'),
|
||||
nodeId: DS.attr('string'),
|
||||
|
||||
startTs: function() {
|
||||
|
|
|
@ -32,6 +32,7 @@ export default DS.Model.extend({
|
|||
logsLink: DS.attr('string'),
|
||||
state: DS.attr('string'),
|
||||
appAttemptId: DS.attr('string'),
|
||||
exposedPorts: DS.attr('string'),
|
||||
|
||||
appId: Ember.computed("id",function () {
|
||||
var id = this.get("id");
|
||||
|
|
|
@ -32,6 +32,7 @@ export default DS.Model.extend({
|
|||
nodeHttpAddress: DS.attr('string'),
|
||||
nodeId: DS.attr('string'),
|
||||
diagnosticsInfo: DS.attr('string'),
|
||||
exposedPorts: DS.attr('string'),
|
||||
|
||||
startTs: function() {
|
||||
return Converter.dateToTimeStamp(this.get("startedTime"));
|
||||
|
|
|
@ -36,6 +36,7 @@ export default DS.JSONAPISerializer.extend({
|
|||
containerId: payload.containerId,
|
||||
amContainerId: payload.amContainerId,
|
||||
nodeHttpAddress: payload.nodeHttpAddress,
|
||||
exposedPorts: payload.exposedPorts,
|
||||
nodeId: payload.nodeId,
|
||||
hosts: payload.host,
|
||||
state: payload.appAttemptState,
|
||||
|
|
|
@ -35,6 +35,7 @@ export default DS.JSONAPISerializer.extend({
|
|||
host: info.HOSTNAME,
|
||||
node: info.BARE_HOST,
|
||||
ipAddr: info.IP,
|
||||
exposedPorts: info.EXPOSED_PORTS,
|
||||
exitStatusCode: info.EXIT_STATUS_CODE
|
||||
}
|
||||
};
|
||||
|
|
|
@ -37,7 +37,8 @@ export default DS.JSONAPISerializer.extend({
|
|||
containerExitStatus: payload.containerExitStatus + '',
|
||||
containerState: payload.containerState,
|
||||
nodeId : payload.nodeId,
|
||||
nodeHttpAddress: payload.nodeHttpAddress
|
||||
nodeHttpAddress: payload.nodeHttpAddress,
|
||||
exposedPorts: payload.exposedPorts
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ export default DS.JSONAPISerializer.extend({
|
|||
containerId: payload.info.YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER,
|
||||
amContainerId: payload.info.YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER,
|
||||
nodeHttpAddress: payload.info.YARN_APPLICATION_ATTEMPT_MASTER_NODE_ADDRESS,
|
||||
exposedPorts: payload.info.YARN_CONTAINER_ALLOCATED_EXPOSED_PORTS,
|
||||
nodeId: payload.info.YARN_APPLICATION_ATTEMPT_MASTER_NODE_ID,
|
||||
hosts: payload.info.YARN_APPLICATION_ATTEMPT_HOST,
|
||||
state: payload.info.YARN_APPLICATION_ATTEMPT_STATE,
|
||||
|
|
|
@ -34,6 +34,7 @@ export default DS.JSONAPISerializer.extend({
|
|||
startedTime: Converter.timeStampToDate(payload.createdtime),
|
||||
finishedTime: Converter.timeStampToDate(payload.info.YARN_CONTAINER_FINISHED_TIME),
|
||||
nodeHttpAddress: payload.info.YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS,
|
||||
exposedPorts: payload.info.YARN_CONTAINER_ALLOCATED_EXPOSED_PORTS,
|
||||
containerExitStatus: payload.info.YARN_CONTAINER_EXIT_STATUS + '',
|
||||
containerState: payload.info.YARN_CONTAINER_STATE,
|
||||
nodeId: payload.info.YARN_CONTAINER_ALLOCATED_HOST + ':' + payload.info.YARN_CONTAINER_ALLOCATED_PORT,
|
||||
|
|
|
@ -56,6 +56,12 @@
|
|||
<td title="{{attempt.nodeHttpAddress}}"><a href="{{attempt.masterNodeURL}}">{{attempt.nodeHttpAddress}}</a></td>
|
||||
</tr>
|
||||
{{/if}}
|
||||
{{#if attempt.exposedPorts}}
|
||||
<tr>
|
||||
<td>Exposed Ports</td>
|
||||
<td title="{{attempt.exposedPorts}}">{{attempt.exposedPorts}}</td>
|
||||
</tr>
|
||||
{{/if}}
|
||||
{{#if attempt.logsLink}}
|
||||
<tr>
|
||||
<td>Log</td>
|
||||
|
|
|
@ -54,6 +54,12 @@
|
|||
<td title="{{container.nodeHttpAddress}}"><a href="{{container.masterNodeURL}}">{{container.nodeHttpAddress}}</a></td>
|
||||
</tr>
|
||||
{{/if}}
|
||||
{{#if container.exposedPorts}}
|
||||
<tr>
|
||||
<td>Exposed Ports</td>
|
||||
<td title="{{container.exposedPorts}}">{{container.exposedPorts}}</td>
|
||||
</tr>
|
||||
{{/if}}
|
||||
{{#if container.logUrl}}
|
||||
<tr>
|
||||
<td>Log</td>
|
||||
|
|
|
@ -57,6 +57,10 @@
|
|||
<td>IP Address</td>
|
||||
<td>{{check-availability model.container.ipAddr}}</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Exposed Ports</td>
|
||||
<td>{{check-availability model.container.exposedPorts}}</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Exit Status Code</td>
|
||||
<td>{{check-availability model.container.exitStatusCode}}</td>
|
||||
|
|
Loading…
Reference in New Issue