YARN-5356. NodeManager should communicate physical resource capability to ResourceManager. Contributed by Inigo Goiri

(cherry picked from commit 3f93ac0733)

Conflicts:

	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
This commit is contained in:
Jason Lowe 2016-11-08 22:16:53 +00:00
parent b77239b46d
commit 8bb264c70d
14 changed files with 166 additions and 21 deletions

View File

@ -218,6 +218,11 @@ public class NodeInfo {
public Integer getDecommissioningTimeout() { public Integer getDecommissioningTimeout() {
return null; return null;
} }
@Override
public Resource getPhysicalResource() {
return null;
}
} }
public static RMNode newNodeInfo(String rackName, String hostName, public static RMNode newNodeInfo(String rackName, String hostName,

View File

@ -207,4 +207,9 @@ public class RMNodeWrapper implements RMNode {
public Integer getDecommissioningTimeout() { public Integer getDecommissioningTimeout() {
return null; return null;
} }
@Override
public Resource getPhysicalResource() {
return null;
}
} }

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SysInfo; import org.apache.hadoop.util.SysInfo;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/** /**
* Plugin to calculate resource information on the system. * Plugin to calculate resource information on the system.
@ -195,4 +196,42 @@ public class ResourceCalculatorPlugin extends Configured {
return null; return null;
} }
/**
* Create the ResourceCalculatorPlugin for the containers monitor in the Node
* Manager and configure it. If the plugin is not configured, this method
* will try and return a memory calculator plugin available for this system.
*
* @param conf Configure the plugin with this.
* @return ResourceCalculatorPlugin or null if ResourceCalculatorPlugin is
* not available for current system.
*/
public static ResourceCalculatorPlugin getContainersMonitorPlugin(
Configuration conf) {
Class<? extends ResourceCalculatorPlugin> clazzNM = conf.getClass(
YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class);
Class<? extends ResourceCalculatorPlugin> clazz = conf.getClass(
YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, clazzNM,
ResourceCalculatorPlugin.class);
return ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
}
/**
* Create the ResourceCalculatorPlugin for the node resource monitor in the
* Node Manager and configure it. If the plugin is not configured, this
* method will try and return a memory calculator plugin available for this
* system.
*
* @param conf Configure the plugin with this.
* @return ResourceCalculatorPlugin or null if ResourceCalculatorPlugin is
* not available for current system.
*/
public static ResourceCalculatorPlugin getNodeResourceMonitorPlugin(
Configuration conf) {
Class<? extends ResourceCalculatorPlugin> clazz = conf.getClass(
YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class);
return ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
}
} }

View File

@ -41,6 +41,15 @@ public abstract class RegisterNodeManagerRequest {
int httpPort, Resource resource, String nodeManagerVersionId, int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses, List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels) { List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels) {
return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
containerStatuses, runningApplications, nodeLabels, null);
}
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
Resource physicalResource) {
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class); Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort); request.setHttpPort(httpPort);
@ -50,6 +59,7 @@ public abstract class RegisterNodeManagerRequest {
request.setContainerStatuses(containerStatuses); request.setContainerStatuses(containerStatuses);
request.setRunningApplications(runningApplications); request.setRunningApplications(runningApplications);
request.setNodeLabels(nodeLabels); request.setNodeLabels(nodeLabels);
request.setPhysicalResource(physicalResource);
return request; return request;
} }
@ -88,4 +98,18 @@ public abstract class RegisterNodeManagerRequest {
*/ */
public abstract void setRunningApplications( public abstract void setRunningApplications(
List<ApplicationId> runningApplications); List<ApplicationId> runningApplications);
/**
* Get the physical resources in the node to properly estimate resource
* utilization.
* @return Physical resources in the node.
*/
public abstract Resource getPhysicalResource();
/**
* Set the physical resources in the node to properly estimate resource
* utilization.
* @param physicalResource Physical resources in the node.
*/
public abstract void setPhysicalResource(Resource physicalResource);
} }

View File

@ -56,6 +56,9 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private List<ApplicationId> runningApplications = null; private List<ApplicationId> runningApplications = null;
private Set<NodeLabel> labels = null; private Set<NodeLabel> labels = null;
/** Physical resources in the node. */
private Resource physicalResource = null;
public RegisterNodeManagerRequestPBImpl() { public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder(); builder = RegisterNodeManagerRequestProto.newBuilder();
} }
@ -93,6 +96,9 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
} }
builder.setNodeLabels(newBuilder.build()); builder.setNodeLabels(newBuilder.build());
} }
if (this.physicalResource != null) {
builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
}
} }
private synchronized void addNMContainerStatusesToProto() { private synchronized void addNMContainerStatusesToProto() {
@ -269,7 +275,29 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
initContainerRecoveryReports(); initContainerRecoveryReports();
this.containerStatuses.addAll(containerReports); this.containerStatuses.addAll(containerReports);
} }
@Override
public synchronized Resource getPhysicalResource() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.physicalResource != null) {
return this.physicalResource;
}
if (!p.hasPhysicalResource()) {
return null;
}
this.physicalResource = convertFromProtoFormat(p.getPhysicalResource());
return this.physicalResource;
}
@Override
public synchronized void setPhysicalResource(Resource pPhysicalResource) {
maybeInitBuilder();
if (pPhysicalResource == null) {
builder.clearPhysicalResource();
}
this.physicalResource = pPhysicalResource;
}
@Override @Override
public int hashCode() { public int hashCode() {
return getProto().hashCode(); return getProto().hashCode();

View File

@ -58,6 +58,7 @@ message RegisterNodeManagerRequestProto {
repeated NMContainerStatusProto container_statuses = 6; repeated NMContainerStatusProto container_statuses = 6;
repeated ApplicationIdProto runningApplications = 7; repeated ApplicationIdProto runningApplications = 7;
optional NodeLabelsProto nodeLabels = 8; optional NodeLabelsProto nodeLabels = 8;
optional ResourceProto physicalResource = 9;
} }
message RegisterNodeManagerResponseProto { message RegisterNodeManagerResponseProto {

View File

@ -196,6 +196,7 @@ public class TestYarnServerApiClasses {
resource.setMemorySize(10000); resource.setMemorySize(10000);
resource.setVirtualCores(2); resource.setVirtualCores(2);
original.setResource(resource); original.setResource(resource);
original.setPhysicalResource(resource);
RegisterNodeManagerRequestPBImpl copy = new RegisterNodeManagerRequestPBImpl( RegisterNodeManagerRequestPBImpl copy = new RegisterNodeManagerRequestPBImpl(
original.getProto()); original.getProto());
@ -203,6 +204,8 @@ public class TestYarnServerApiClasses {
assertEquals(9090, copy.getNodeId().getPort()); assertEquals(9090, copy.getNodeId().getPort());
assertEquals(10000, copy.getResource().getMemorySize()); assertEquals(10000, copy.getResource().getMemorySize());
assertEquals(2, copy.getResource().getVirtualCores()); assertEquals(2, copy.getResource().getVirtualCores());
assertEquals(10000, copy.getPhysicalResource().getMemorySize());
assertEquals(2, copy.getPhysicalResource().getVirtualCores());
} }

View File

@ -66,12 +66,8 @@ public class NodeResourceMonitorImpl extends AbstractService implements
conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS, conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS); YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS);
Class<? extends ResourceCalculatorPlugin> clazz =
conf.getClass(YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class);
this.resourceCalculatorPlugin = this.resourceCalculatorPlugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf); ResourceCalculatorPlugin.getNodeResourceMonitorPlugin(conf);
LOG.info(" Using ResourceCalculatorPlugin : " LOG.info(" Using ResourceCalculatorPlugin : "
+ this.resourceCalculatorPlugin); + this.resourceCalculatorPlugin);

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.YarnVersionInfo;
@ -111,6 +112,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private long nextHeartBeatInterval; private long nextHeartBeatInterval;
private ResourceTracker resourceTracker; private ResourceTracker resourceTracker;
private Resource totalResource; private Resource totalResource;
private Resource physicalResource;
private int httpPort; private int httpPort;
private String nodeManagerVersionId; private String nodeManagerVersionId;
private String minimumResourceManagerVersion; private String minimumResourceManagerVersion;
@ -185,6 +187,19 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.totalResource = Resource.newInstance(memoryMb, virtualCores); this.totalResource = Resource.newInstance(memoryMb, virtualCores);
metrics.addResource(totalResource); metrics.addResource(totalResource);
// Get actual node physical resources
int physicalMemoryMb = memoryMb;
int physicalCores = virtualCores;
ResourceCalculatorPlugin rcp =
ResourceCalculatorPlugin.getNodeResourceMonitorPlugin(conf);
if (rcp != null) {
physicalMemoryMb = (int) (rcp.getPhysicalMemorySize() / (1024 * 1024));
physicalCores = rcp.getNumProcessors();
}
this.physicalResource =
Resource.newInstance(physicalMemoryMb, physicalCores);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf); this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
this.tokenRemovalDelayMs = this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
@ -341,7 +356,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications(), nodeManagerVersionId, containerReports, getRunningApplications(),
nodeLabels); nodeLabels, physicalResource);
if (containerReports != null) { if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports); LOG.info("Registering with RM using containers :" + containerReports);
} }

View File

@ -109,14 +109,8 @@ public class ContainersMonitorImpl extends AbstractService implements
conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS, conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS)); YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS));
Class<? extends ResourceCalculatorPlugin> clazz =
conf.getClass(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
conf.getClass(
YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class),
ResourceCalculatorPlugin.class);
this.resourceCalculatorPlugin = this.resourceCalculatorPlugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf); ResourceCalculatorPlugin.getContainersMonitorPlugin(conf);
LOG.info(" Using ResourceCalculatorPlugin : " LOG.info(" Using ResourceCalculatorPlugin : "
+ this.resourceCalculatorPlugin); + this.resourceCalculatorPlugin);
processTreeClass = conf.getClass(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null, processTreeClass = conf.getClass(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null,

View File

@ -316,6 +316,7 @@ public class ResourceTrackerService extends AbstractService implements
int httpPort = request.getHttpPort(); int httpPort = request.getHttpPort();
Resource capability = request.getResource(); Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion(); String nodeManagerVersion = request.getNMVersion();
Resource physicalResource = request.getPhysicalResource();
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class); .newRecordInstance(RegisterNodeManagerResponse.class);
@ -385,7 +386,7 @@ public class ResourceTrackerService extends AbstractService implements
.getCurrentKey()); .getCurrentKey());
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability, nodeManagerVersion); resolve(host), capability, nodeManagerVersion, physicalResource);
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) { if (oldNode == null) {

View File

@ -113,6 +113,12 @@ public interface RMNode {
*/ */
public ResourceUtilization getNodeUtilization(); public ResourceUtilization getNodeUtilization();
/**
* the physical resources in the node.
* @return the physical resources in the node.
*/
Resource getPhysicalResource();
/** /**
* The rack name for this node manager. * The rack name for this node manager.
* @return the rack name. * @return the rack name.

View File

@ -133,6 +133,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* Resource utilization for the node. */ /* Resource utilization for the node. */
private ResourceUtilization nodeUtilization; private ResourceUtilization nodeUtilization;
/** Physical resources in the node. */
private volatile Resource physicalResource;
/* Container Queue Information for the node.. Used by Distributed Scheduler */ /* Container Queue Information for the node.. Used by Distributed Scheduler */
private QueuedContainersStatus queuedContainersStatus; private QueuedContainersStatus queuedContainersStatus;
@ -353,7 +356,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEvent> stateMachine; RMNodeEvent> stateMachine;
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { int cmPort, int httpPort, Node node, Resource capability,
String nodeManagerVersion) {
this(nodeId, context, hostName, cmPort, httpPort, node, capability,
nodeManagerVersion, null);
}
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
int cmPort, int httpPort, Node node, Resource capability,
String nodeManagerVersion, Resource physResource) {
this.nodeId = nodeId; this.nodeId = nodeId;
this.context = context; this.context = context;
this.hostName = hostName; this.hostName = hostName;
@ -367,6 +378,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.lastHealthReportTime = System.currentTimeMillis(); this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion; this.nodeManagerVersion = nodeManagerVersion;
this.timeStamp = 0; this.timeStamp = 0;
this.physicalResource = physResource;
this.latestNodeHeartBeatResponse.setResponseId(0); this.latestNodeHeartBeatResponse.setResponseId(0);
@ -526,6 +538,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
} }
@Override
public Resource getPhysicalResource() {
return this.physicalResource;
}
public void setPhysicalResource(Resource physicalResource) {
this.physicalResource = physicalResource;
}
@Override @Override
public NodeState getState() { public NodeState getState() {
this.readLock.lock(); this.readLock.lock();

View File

@ -112,12 +112,13 @@ public class MockNodes {
private Set<String> labels; private Set<String> labels;
private ResourceUtilization containersUtilization; private ResourceUtilization containersUtilization;
private ResourceUtilization nodeUtilization; private ResourceUtilization nodeUtilization;
private Resource physicalResource;
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport, Resource perNode, String rackName, String healthReport,
long lastHealthReportTime, int cmdPort, String hostName, NodeState state, long lastHealthReportTime, int cmdPort, String hostName, NodeState state,
Set<String> labels, ResourceUtilization containersUtilization, Set<String> labels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) { ResourceUtilization nodeUtilization, Resource pPhysicalResource) {
this.nodeId = nodeId; this.nodeId = nodeId;
this.nodeAddr = nodeAddr; this.nodeAddr = nodeAddr;
this.httpAddress = httpAddress; this.httpAddress = httpAddress;
@ -131,6 +132,7 @@ public class MockNodes {
this.labels = labels; this.labels = labels;
this.containersUtilization = containersUtilization; this.containersUtilization = containersUtilization;
this.nodeUtilization = nodeUtilization; this.nodeUtilization = nodeUtilization;
this.physicalResource = pPhysicalResource;
} }
@Override @Override
@ -279,6 +281,11 @@ public class MockNodes {
public Integer getDecommissioningTimeout() { public Integer getDecommissioningTimeout() {
return null; return null;
} }
@Override
public Resource getPhysicalResource() {
return this.physicalResource;
}
}; };
private static RMNode buildRMNode(int rack, final Resource perNode, private static RMNode buildRMNode(int rack, final Resource perNode,
@ -289,19 +296,19 @@ public class MockNodes {
private static RMNode buildRMNode(int rack, final Resource perNode, private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, Set<String> labels) { NodeState state, String httpAddr, Set<String> labels) {
return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123, return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123,
labels, null, null); labels, null, null, null);
} }
private static RMNode buildRMNode(int rack, final Resource perNode, private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, int hostnum, String hostName, int port) { NodeState state, String httpAddr, int hostnum, String hostName, int port) {
return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port, return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port,
null, null, null); null, null, null, null);
} }
private static RMNode buildRMNode(int rack, final Resource perNode, private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, int hostnum, String hostName, int port, NodeState state, String httpAddr, int hostnum, String hostName, int port,
Set<String> labels, ResourceUtilization containersUtilization, Set<String> labels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) { ResourceUtilization nodeUtilization, Resource physicalResource) {
final String rackName = "rack"+ rack; final String rackName = "rack"+ rack;
final int nid = hostnum; final int nid = hostnum;
final String nodeAddr = hostName + ":" + nid; final String nodeAddr = hostName + ":" + nid;
@ -314,7 +321,7 @@ public class MockNodes {
String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode,
rackName, healthReport, 0, nid, hostName, state, labels, rackName, healthReport, 0, nid, hostName, state, labels,
containersUtilization, nodeUtilization); containersUtilization, nodeUtilization, physicalResource);
} }
public static RMNode nodeInfo(int rack, final Resource perNode, public static RMNode nodeInfo(int rack, final Resource perNode,