YARN-9809. Added node manager health status to resource manager registration call. Contributed by Eric Badger (ebadger).

This commit is contained in:
Eric E Payne 2020-09-28 18:50:44 +00:00
parent 25a51f2f39
commit 947b0a154a
29 changed files with 367 additions and 79 deletions

View File

@ -71,6 +71,8 @@ public class NodeHealthScriptRunner extends AbstractService {
private long lastReportedTime; private long lastReportedTime;
private TimerTask timer; private TimerTask timer;
private boolean runBeforeStartup;
private enum HealthCheckerExitStatus { private enum HealthCheckerExitStatus {
SUCCESS, SUCCESS,
@ -191,7 +193,7 @@ public class NodeHealthScriptRunner extends AbstractService {
} }
public NodeHealthScriptRunner(String scriptName, long chkInterval, long timeout, public NodeHealthScriptRunner(String scriptName, long chkInterval, long timeout,
String[] scriptArgs) { String[] scriptArgs, boolean runBeforeStartup) {
super(NodeHealthScriptRunner.class.getName()); super(NodeHealthScriptRunner.class.getName());
this.lastReportedTime = System.currentTimeMillis(); this.lastReportedTime = System.currentTimeMillis();
this.isHealthy = true; this.isHealthy = true;
@ -200,6 +202,7 @@ public class NodeHealthScriptRunner extends AbstractService {
this.intervalTime = chkInterval; this.intervalTime = chkInterval;
this.scriptTimeout = timeout; this.scriptTimeout = timeout;
this.timer = new NodeHealthMonitorExecutor(scriptArgs); this.timer = new NodeHealthMonitorExecutor(scriptArgs);
this.runBeforeStartup = runBeforeStartup;
} }
/* /*
@ -217,9 +220,16 @@ public class NodeHealthScriptRunner extends AbstractService {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true); nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
// Start the timer task immediately and long delay = 0;
// then periodically at interval time. if (runBeforeStartup) {
nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime); // Start the timer task immediately and wait for it to return.
timer.run();
delay = intervalTime;
}
// Set the script to run periodically at interval time.
nodeHealthScriptScheduler.scheduleAtFixedRate(timer, delay,
intervalTime);
super.serviceStart(); super.serviceStart();
} }

View File

@ -100,7 +100,7 @@ public class TestNodeHealthScriptRunner {
writeNodeHealthScriptFile(normalScript, true); writeNodeHealthScriptFile(normalScript, true);
NodeHealthScriptRunner nodeHealthScriptRunner = new NodeHealthScriptRunner( NodeHealthScriptRunner nodeHealthScriptRunner = new NodeHealthScriptRunner(
nodeHealthscriptFile.getAbsolutePath(), nodeHealthscriptFile.getAbsolutePath(),
500, 1000, new String[] {}); 500, 1000, new String[] {}, true);
nodeHealthScriptRunner.init(conf); nodeHealthScriptRunner.init(conf);
TimerTask timerTask = nodeHealthScriptRunner.getTimerTask(); TimerTask timerTask = nodeHealthScriptRunner.getTimerTask();

View File

@ -1928,6 +1928,13 @@ public class YarnConfiguration extends Configuration {
NM_PREFIX + "health-checker.script.timeout-ms"; NM_PREFIX + "health-checker.script.timeout-ms";
public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =
2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS; 2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS;
/** Whether or not to run the node health script before the NM
* starts up.*/
public static final String NM_HEALTH_CHECK_RUN_BEFORE_STARTUP =
NM_PREFIX + "health-checker.run-before-startup";
public static final boolean DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP =
false;
/** The health check script to run.*/ /** The health check script to run.*/
public static final String NM_HEALTH_CHECK_SCRIPT_PATH = public static final String NM_HEALTH_CHECK_SCRIPT_PATH =

View File

@ -1595,6 +1595,13 @@
<value>1200000</value> <value>1200000</value>
</property> </property>
<property>
<description>Whether or not to run the node health script
before the NM starts up.</description>
<name>yarn.nodemanager.health-checker.run-before-startup</name>
<value>false</value>
</property>
<property> <property>
<description>The health check script to run.</description> <description>The health check script to run.</description>
<name>yarn.nodemanager.health-checker.script.path</name> <name>yarn.nodemanager.health-checker.script.path</name>

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
public abstract class RegisterNodeManagerRequest { public abstract class RegisterNodeManagerRequest {
@ -53,14 +54,15 @@ public abstract class RegisterNodeManagerRequest {
Resource physicalResource) { Resource physicalResource) {
return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
containerStatuses, runningApplications, nodeLabels, physicalResource, containerStatuses, runningApplications, nodeLabels, physicalResource,
null); null, null);
} }
public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId, int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses, List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels, List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
Resource physicalResource, Set<NodeAttribute> nodeAttributes) { Resource physicalResource, Set<NodeAttribute> nodeAttributes,
NodeStatus nodeStatus) {
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class); Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort); request.setHttpPort(httpPort);
@ -72,6 +74,7 @@ public abstract class RegisterNodeManagerRequest {
request.setNodeLabels(nodeLabels); request.setNodeLabels(nodeLabels);
request.setPhysicalResource(physicalResource); request.setPhysicalResource(physicalResource);
request.setNodeAttributes(nodeAttributes); request.setNodeAttributes(nodeAttributes);
request.setNodeStatus(nodeStatus);
return request; return request;
} }
@ -133,4 +136,16 @@ public abstract class RegisterNodeManagerRequest {
public abstract Set<NodeAttribute> getNodeAttributes(); public abstract Set<NodeAttribute> getNodeAttributes();
public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes); public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes);
/**
* Get the status of the node.
* @return The status of the node.
*/
public abstract NodeStatus getNodeStatus();
/**
* Set the status of the node.
* @param nodeStatus The status of the node.
*/
public abstract void setNodeStatus(NodeStatus nodeStatus);
} }

View File

@ -45,13 +45,16 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregation
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeAttributesProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeAttributesProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest { public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
RegisterNodeManagerRequestProto.Builder builder = null; RegisterNodeManagerRequestProto.Builder builder = null;
@ -68,6 +71,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
/** Physical resources in the node. */ /** Physical resources in the node. */
private Resource physicalResource = null; private Resource physicalResource = null;
private NodeStatus nodeStatus;
public RegisterNodeManagerRequestPBImpl() { public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder(); builder = RegisterNodeManagerRequestProto.newBuilder();
@ -118,6 +122,9 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
if (this.physicalResource != null) { if (this.physicalResource != null) {
builder.setPhysicalResource(convertToProtoFormat(this.physicalResource)); builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
} }
if (this.nodeStatus != null) {
builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));
}
if (this.logAggregationReportsForApps != null) { if (this.logAggregationReportsForApps != null) {
addLogAggregationStatusForAppsToProto(); addLogAggregationStatusForAppsToProto();
} }
@ -359,6 +366,29 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
this.physicalResource = pPhysicalResource; this.physicalResource = pPhysicalResource;
} }
@Override
public synchronized NodeStatus getNodeStatus() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.nodeStatus != null) {
return this.nodeStatus;
}
if (!p.hasNodeStatus()) {
return null;
}
this.nodeStatus = convertFromProtoFormat(p.getNodeStatus());
return this.nodeStatus;
}
@Override
public synchronized void setNodeStatus(NodeStatus pNodeStatus) {
maybeInitBuilder();
if (pNodeStatus == null) {
builder.clearNodeStatus();
}
this.nodeStatus = pNodeStatus;
}
@Override @Override
public int hashCode() { public int hashCode() {
return getProto().hashCode(); return getProto().hashCode();
@ -533,4 +563,13 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
} }
this.logAggregationReportsForApps = logAggregationStatusForApps; this.logAggregationReportsForApps = logAggregationStatusForApps;
} }
private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto s) {
return new NodeStatusPBImpl(s);
}
private NodeStatusProto convertToProtoFormat(NodeStatus s) {
return ((NodeStatusPBImpl)s).getProto();
}
} }

View File

@ -73,6 +73,7 @@ message RegisterNodeManagerRequestProto {
optional ResourceProto physicalResource = 9; optional ResourceProto physicalResource = 9;
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
optional NodeAttributesProto nodeAttributes = 11; optional NodeAttributesProto nodeAttributes = 11;
optional NodeStatusProto nodeStatus = 12;
} }
message RegisterNodeManagerResponseProto { message RegisterNodeManagerResponseProto {

View File

@ -367,8 +367,11 @@ public class NodeManager extends CompositeService
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS); YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS);
String[] scriptArgs = conf.getStrings( String[] scriptArgs = conf.getStrings(
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS, new String[] {}); YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS, new String[] {});
boolean runBeforeStartup = conf.getBoolean(
YarnConfiguration.NM_HEALTH_CHECK_RUN_BEFORE_STARTUP,
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP);
return new NodeHealthScriptRunner(nodeHealthScript, return new NodeHealthScriptRunner(nodeHealthScript,
nmCheckintervalTime, scriptTimeout, scriptArgs); nmCheckintervalTime, scriptTimeout, scriptArgs, runBeforeStartup);
} }
@VisibleForTesting @VisibleForTesting

View File

@ -390,10 +390,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// during RM recovery // during RM recovery
synchronized (this.context) { synchronized (this.context) {
List<NMContainerStatus> containerReports = getNMContainerStatuses(); List<NMContainerStatus> containerReports = getNMContainerStatuses();
NodeStatus nodeStatus = getNodeStatus(0);
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications(), nodeManagerVersionId, containerReports, getRunningApplications(),
nodeLabels, physicalResource, nodeAttributes); nodeLabels, physicalResource, nodeAttributes, nodeStatus);
if (containerReports != null) { if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports); LOG.info("Registering with RM using containers :" + containerReports);

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import static org.mockito.Mockito.mock;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -133,6 +135,9 @@ public class TestEventFlow {
new DummyContainerManager(context, exec, del, nodeStatusUpdater, new DummyContainerManager(context, exec, del, nodeStatusUpdater,
metrics, dirsHandler); metrics, dirsHandler);
nodeStatusUpdater.init(conf); nodeStatusUpdater.init(conf);
NodeResourceMonitorImpl nodeResourceMonitor = mock(
NodeResourceMonitorImpl.class);
((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);
((NMContext)context).setContainerManager(containerManager); ((NMContext)context).setContainerManager(containerManager);
nodeStatusUpdater.start(); nodeStatusUpdater.start();
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater); ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);

View File

@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitorImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -156,9 +157,12 @@ public abstract class BaseContainerManagerTest {
protected NodeHealthCheckerService nodeHealthChecker; protected NodeHealthCheckerService nodeHealthChecker;
protected LocalDirsHandlerService dirsHandler; protected LocalDirsHandlerService dirsHandler;
protected final long DUMMY_RM_IDENTIFIER = 1234; protected final long DUMMY_RM_IDENTIFIER = 1234;
private NodeHealthCheckerService nodeHealthCheckerService = mock(NodeHealthCheckerService.class);
private NodeResourceMonitorImpl nodeResourceMonitor = mock(
NodeResourceMonitorImpl.class);
protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
context, new AsyncDispatcher(), null, metrics) { context, new AsyncDispatcher(), nodeHealthCheckerService, metrics) {
@Override @Override
protected ResourceTracker getRMClient() { protected ResourceTracker getRMClient() {
return new LocalRMInterface(); return new LocalRMInterface();
@ -223,6 +227,7 @@ public abstract class BaseContainerManagerTest {
nodeHealthChecker.init(conf); nodeHealthChecker.init(conf);
containerManager = createContainerManager(delSrvc); containerManager = createContainerManager(delSrvc);
((NMContext)context).setContainerManager(containerManager); ((NMContext)context).setContainerManager(containerManager);
((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);
nodeStatusUpdater.init(conf); nodeStatusUpdater.init(conf);
containerManager.init(conf); containerManager.init(conf);
nodeStatusUpdater.start(); nodeStatusUpdater.start();

View File

@ -333,6 +333,7 @@ public class ResourceTrackerService extends AbstractService implements
Resource capability = request.getResource(); Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion(); String nodeManagerVersion = request.getNMVersion();
Resource physicalResource = request.getPhysicalResource(); Resource physicalResource = request.getPhysicalResource();
NodeStatus nodeStatus = request.getNodeStatus();
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class); .newRecordInstance(RegisterNodeManagerResponse.class);
@ -409,7 +410,7 @@ public class ResourceTrackerService extends AbstractService implements
if (oldNode == null) { if (oldNode == null) {
RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId, RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
request.getNMContainerStatuses(), request.getNMContainerStatuses(),
request.getRunningApplications()); request.getRunningApplications(), nodeStatus);
if (request.getLogAggregationReportsForApps() != null if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) { && !request.getLogAggregationReportsForApps().isEmpty()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -445,7 +446,7 @@ public class ResourceTrackerService extends AbstractService implements
this.rmContext.getRMNodes().put(nodeId, rmNode); this.rmContext.getRMNodes().put(nodeId, rmNode);
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeStartedEvent(nodeId, null, null)); .handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus));
} else { } else {
// Reset heartbeat ID since node just restarted. // Reset heartbeat ID since node just restarted.
oldNode.resetLastNodeHeartBeatResponse(); oldNode.resetLastNodeHeartBeatResponse();

View File

@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.collections.keyvalue.DefaultMapEntry; import org.apache.commons.collections.keyvalue.DefaultMapEntry;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -199,7 +200,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType, RMNodeEventType,
RMNodeEvent>(NodeState.NEW) RMNodeEvent>(NodeState.NEW)
//Transitions from NEW state //Transitions from NEW state
.addTransition(NodeState.NEW, NodeState.RUNNING, .addTransition(NodeState.NEW,
EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STARTED, new AddNodeTransition()) RMNodeEventType.STARTED, new AddNodeTransition())
.addTransition(NodeState.NEW, NodeState.NEW, .addTransition(NodeState.NEW, NodeState.NEW,
RMNodeEventType.RESOURCE_UPDATE, RMNodeEventType.RESOURCE_UPDATE,
@ -685,7 +687,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private void updateMetricsForRejoinedNode(NodeState previousNodeState) { private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics(); ClusterMetrics metrics = ClusterMetrics.getMetrics();
metrics.incrNumActiveNodes();
switch (previousNodeState) { switch (previousNodeState) {
case LOST: case LOST:
@ -827,10 +828,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
public static class AddNodeTransition implements public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> { MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
@Override @Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) { public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler // Inform the scheduler
RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
List<NMContainerStatus> containers = null; List<NMContainerStatus> containers = null;
@ -848,8 +849,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
if (previousRMNode != null) { if (previousRMNode != null) {
ClusterMetrics.getMetrics().decrDecommisionedNMs(); ClusterMetrics.getMetrics().decrDecommisionedNMs();
} }
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
containers = startEvent.getNMContainerStatuses(); containers = startEvent.getNMContainerStatuses();
if (containers != null && !containers.isEmpty()) { if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) { for (NMContainerStatus container : containers) {
@ -866,17 +865,35 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
} }
rmNode.context.getDispatcher().getEventHandler() NodeState nodeState;
.handle(new NodeAddedSchedulerEvent(rmNode, containers)); NodeStatus nodeStatus = startEvent.getNodeStatus();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent( if (nodeStatus == null) {
NodesListManagerEventType.NODE_USABLE, rmNode)); nodeState = NodeState.RUNNING;
reportNodeRunning(rmNode, containers);
} else {
RMNodeStatusEvent rmNodeStatusEvent =
new RMNodeStatusEvent(nodeId, nodeStatus);
NodeHealthStatus nodeHealthStatus =
updateRMNodeFromStatusEvents(rmNode, rmNodeStatusEvent);
if (nodeHealthStatus.getIsNodeHealthy()) {
nodeState = NodeState.RUNNING;
reportNodeRunning(rmNode, containers);
} else {
nodeState = NodeState.UNHEALTHY;
reportNodeUnusable(rmNode, nodeState);
}
}
List<LogAggregationReport> logAggregationReportsForApps = List<LogAggregationReport> logAggregationReportsForApps =
startEvent.getLogAggregationReportsForApps(); startEvent.getLogAggregationReportsForApps();
if (logAggregationReportsForApps != null if (logAggregationReportsForApps != null
&& !logAggregationReportsForApps.isEmpty()) { && !logAggregationReportsForApps.isEmpty()) {
rmNode.handleLogAggregationStatus(logAggregationReportsForApps); rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
} }
return nodeState;
} }
} }
@ -1087,6 +1104,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
} }
/**
* Report node is RUNNING.
* @param rmNode
* @param containers
*/
public static void reportNodeRunning(RMNodeImpl rmNode,
List<NMContainerStatus> containers) {
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeAddedSchedulerEvent(rmNode, containers));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
}
/** /**
* Report node is UNUSABLE and update metrics. * Report node is UNUSABLE and update metrics.
* @param rmNode * @param rmNode
@ -1278,6 +1311,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// notifiers get update metadata because they will very likely query it // notifiers get update metadata because they will very likely query it
// upon notification // upon notification
// Update metrics // Update metrics
ClusterMetrics.getMetrics().incrNumActiveNodes();
rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY); rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY);
return NodeState.RUNNING; return NodeState.RUNNING;
} }

View File

@ -24,19 +24,22 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public class RMNodeStartedEvent extends RMNodeEvent { public class RMNodeStartedEvent extends RMNodeEvent {
private final NodeStatus nodeStatus;
private List<NMContainerStatus> containerStatuses; private List<NMContainerStatus> containerStatuses;
private List<ApplicationId> runningApplications; private List<ApplicationId> runningApplications;
private List<LogAggregationReport> logAggregationReportsForApps; private List<LogAggregationReport> logAggregationReportsForApps;
public RMNodeStartedEvent(NodeId nodeId, public RMNodeStartedEvent(NodeId nodeId,
List<NMContainerStatus> containerReports, List<NMContainerStatus> containerReports,
List<ApplicationId> runningApplications) { List<ApplicationId> runningApplications, NodeStatus nodeStatus) {
super(nodeId, RMNodeEventType.STARTED); super(nodeId, RMNodeEventType.STARTED);
this.containerStatuses = containerReports; this.containerStatuses = containerReports;
this.runningApplications = runningApplications; this.runningApplications = runningApplications;
this.nodeStatus = nodeStatus;
} }
public List<NMContainerStatus> getNMContainerStatuses() { public List<NMContainerStatus> getNMContainerStatuses() {
@ -55,4 +58,8 @@ public class RMNodeStartedEvent extends RMNodeEvent {
List<LogAggregationReport> logAggregationReportsForApps) { List<LogAggregationReport> logAggregationReportsForApps) {
this.logAggregationReportsForApps = logAggregationReportsForApps; this.logAggregationReportsForApps = logAggregationReportsForApps;
} }
public NodeStatus getNodeStatus() {
return nodeStatus;
}
} }

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -185,6 +188,17 @@ public class MockNM {
req.setNodeLabels(nodeLabels); req.setNodeLabels(nodeLabels);
} }
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setResponseId(0);
status.setNodeId(nodeId);
status.setContainersStatuses(new ArrayList<>(containerStats.values()));
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
healthStatus.setHealthReport("");
healthStatus.setIsNodeHealthy(true);
healthStatus.setLastHealthReportTime(1);
status.setNodeHealthStatus(healthStatus);
req.setNodeStatus(status);
RegisterNodeManagerResponse registrationResponse = RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req); resourceTracker.registerNodeManager(req);
this.currentContainerTokenMasterKey = this.currentContainerTokenMasterKey =
@ -324,4 +338,12 @@ public class MockNM {
public void setResponseId(int id) { public void setResponseId(int id) {
this.responseId = id; this.responseId = id;
} }
public static NodeStatus createMockNodeStatus() {
NodeStatus mockNodeStatus = mock(NodeStatus.class);
NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
return mockNodeStatus;
}
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
@ -960,7 +963,9 @@ public class MockRM extends ResourceManager {
public void sendNodeStarted(MockNM nm) throws Exception { public void sendNodeStarted(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId()); nm.getNodeId());
node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); NodeStatus mockNodeStatus = createMockNodeStatus();
node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null,
mockNodeStatus));
drainEventsImplicitly(); drainEventsImplicitly();
} }

View File

@ -96,7 +96,7 @@ public class NodeManager implements ContainerManagementProtocol {
public NodeManager(String hostName, int containerManagerPort, int httpPort, public NodeManager(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability, String rackName, Resource capability,
ResourceManager resourceManager) ResourceManager resourceManager, NodeStatus nodeStatus)
throws IOException, YarnException { throws IOException, YarnException {
this.containerManagerAddress = hostName + ":" + containerManagerPort; this.containerManagerAddress = hostName + ":" + containerManagerPort;
this.nodeHttpAddress = hostName + ":" + httpPort; this.nodeHttpAddress = hostName + ":" + httpPort;
@ -111,6 +111,7 @@ public class NodeManager implements ContainerManagementProtocol {
request.setResource(capability); request.setResource(capability);
request.setNodeId(this.nodeId); request.setNodeId(this.nodeId);
request.setNMVersion(YarnVersionInfo.getVersion()); request.setNMVersion(YarnVersionInfo.getVersion());
request.setNodeStatus(nodeStatus);
resourceTrackerService.registerNodeManager(request); resourceTrackerService.registerNodeManager(request);
this.resourceManager = resourceManager; this.resourceManager = resourceManager;
resourceManager.getResourceScheduler().getNodeReport(this.nodeId); resourceManager.getResourceScheduler().getNodeReport(this.nodeId);

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -216,8 +217,9 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000) @Test (timeout = 5000)
public void testExpiredContainer() { public void testExpiredContainer() {
NodeStatus mockNodeStatus = createMockNodeStatus();
// Start the node // Start the node
node.handle(new RMNodeStartedEvent(null, null, null)); node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
// Expire a container // Expire a container
@ -279,12 +281,13 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000) @Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{ public void testContainerUpdate() throws InterruptedException{
NodeStatus mockNodeStatus = createMockNodeStatus();
//Start the node //Start the node
node.handle(new RMNodeStartedEvent(null, null, null)); node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
node2.handle(new RMNodeStartedEvent(null, null, null)); node2.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
ApplicationId app1 = BuilderUtils.newApplicationId(1, 1); ApplicationId app1 = BuilderUtils.newApplicationId(1, 1);
@ -340,8 +343,9 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000) @Test (timeout = 5000)
public void testStatusChange(){ public void testStatusChange(){
NodeStatus mockNodeStatus = createMockNodeStatus();
//Start the node //Start the node
node.handle(new RMNodeStartedEvent(null, null, null)); node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
//Add info to the queue first //Add info to the queue first
node.setNextHeartBeat(false); node.setNextHeartBeat(false);
@ -606,6 +610,33 @@ public class TestRMNodeTransitions {
Assert.assertEquals(NodeState.REBOOTED, node.getState()); Assert.assertEquals(NodeState.REBOOTED, node.getState());
} }
@Test
public void testAddUnhealthyNode() {
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
System.currentTimeMillis());
NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
new ArrayList<>(), null, status, null, null, null);
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
nodeStatus));
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy + 1, cm.getUnhealthyNMs());
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
}
@Test @Test
public void testNMShutdown() { public void testNMShutdown() {
RMNodeImpl node = getRunningNode(); RMNodeImpl node = getRunningNode();
@ -711,7 +742,9 @@ public class TestRMNodeTransitions {
Resource capability = Resource.newInstance(4096, 4); Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null,
capability, nmVersion); capability, nmVersion);
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); NodeStatus mockNodeStatus = createMockNodeStatus();
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
mockNodeStatus));
Assert.assertEquals(NodeState.RUNNING, node.getState()); Assert.assertEquals(NodeState.RUNNING, node.getState());
return node; return node;
} }
@ -762,7 +795,9 @@ public class TestRMNodeTransitions {
Resource capability = Resource.newInstance(4096, 4); Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, capability, null); null, capability, null);
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); NodeStatus mockNodeStatus = createMockNodeStatus();
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
mockNodeStatus));
Assert.assertEquals(NodeState.RUNNING, node.getState()); Assert.assertEquals(NodeState.RUNNING, node.getState());
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING)); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING));
Assert.assertEquals(NodeState.REBOOTED, node.getState()); Assert.assertEquals(NodeState.REBOOTED, node.getState());
@ -778,7 +813,9 @@ public class TestRMNodeTransitions {
int initialUnhealthy = cm.getUnhealthyNMs(); int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs(); int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); NodeStatus mockNodeStatus = createMockNodeStatus();
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
mockNodeStatus));
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes", Assert.assertEquals("Unhealthy Nodes",
@ -1074,8 +1111,9 @@ public class TestRMNodeTransitions {
@Test @Test
public void testForHandlingDuplicatedCompltedContainers() { public void testForHandlingDuplicatedCompltedContainers() {
NodeStatus mockNodeStatus = createMockNodeStatus();
// Start the node // Start the node
node.handle(new RMNodeStartedEvent(null, null, null)); node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
// Add info to the queue first // Add info to the queue first
node.setNextHeartBeat(false); node.setNextHeartBeat(false);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -26,6 +27,7 @@ import java.util.Collection;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -82,12 +84,13 @@ public class TestResourceManager {
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
registerNode(String hostName, int containerManagerPort, int httpPort, registerNode(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability) throws IOException, String rackName, Resource capability, NodeStatus nodeStatus)
throws IOException,
YarnException { YarnException {
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
hostName, containerManagerPort, httpPort, rackName, capability, hostName, containerManagerPort, httpPort, rackName, capability,
resourceManager); resourceManager, nodeStatus);
NodeAddedSchedulerEvent nodeAddEvent1 = NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext() new NodeAddedSchedulerEvent(resourceManager.getRMContext()
.getRMNodes().get(nm.getNodeId())); .getRMNodes().get(nm.getNodeId()));
@ -103,26 +106,30 @@ public class TestResourceManager {
final int memory = 4 * 1024; final int memory = 4 * 1024;
final int vcores = 4; final int vcores = 4;
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1 // Register node1
String host1 = "host1"; String host1 = "host1";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 =
registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory, vcores)); Resources.createResource(memory, vcores), mockNodeStatus);
// Register node2 // Register node2
String host2 = "host2"; String host2 = "host2";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 =
registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory/2, vcores/2)); Resources.createResource(memory/2, vcores/2), mockNodeStatus);
// nodes should be in RUNNING state // nodes should be in RUNNING state
RMNodeImpl node1 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( RMNodeImpl node1 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
nm1.getNodeId()); nm1.getNodeId());
RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
nm2.getNodeId()); nm2.getNodeId());
node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null)); node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null,
node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null)); mockNodeStatus));
node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null,
mockNodeStatus));
// Submit an application // Submit an application
Application application = new Application("user1", resourceManager); Application application = new Application("user1", resourceManager);
@ -210,9 +217,10 @@ public class TestResourceManager {
public void testNodeHealthReportIsNotNull() throws Exception{ public void testNodeHealthReportIsNotNull() throws Exception{
String host1 = "host1"; String host1 = "host1";
final int memory = 4 * 1024; final int memory = 4 * 1024;
NodeStatus mockNodeStatus = createMockNodeStatus();
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 =
registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory, 1)); Resources.createResource(memory, 1), mockNodeStatus);
nm1.heartbeat(); nm1.heartbeat();
nm1.heartbeat(); nm1.heartbeat();
Collection<RMNode> values = resourceManager.getRMContext().getRMNodes().values(); Collection<RMNode> values = resourceManager.getRMContext().getRMNodes().values();

View File

@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -2682,10 +2684,14 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
RegisterNodeManagerRequest.class); RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234); NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1); Resource capability = BuilderUtils.newResource(1024, 1);
NodeStatus mockNodeStatus = createMockNodeStatus();
req.setResource(capability); req.setResource(capability);
req.setNodeId(nodeId); req.setNodeId(nodeId);
req.setHttpPort(1234); req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion()); req.setNMVersion(YarnVersionInfo.getVersion());
req.setNodeStatus(mockNodeStatus);
ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1); ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1);
ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2); ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2);
ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3); ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus; package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -139,13 +140,15 @@ public class TestRMAppLogAggregationStatus {
Resource capability = Resource.newInstance(4096, 4); Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node1 = RMNodeImpl node1 =
new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null); new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null);
node1.handle(new RMNodeStartedEvent(nodeId1, null, null)); NodeStatus mockNodeStatus = createMockNodeStatus();
node1.handle(new RMNodeStartedEvent(nodeId1, null, null, mockNodeStatus));
rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1)); rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1));
NodeId nodeId2 = NodeId.newInstance("localhost", 2345); NodeId nodeId2 = NodeId.newInstance("localhost", 2345);
RMNodeImpl node2 = RMNodeImpl node2 =
new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null); new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null);
node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null)); node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null,
mockNodeStatus));
rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2)); rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2));
// The initial log aggregation status for these two nodes // The initial log aggregation status for these two nodes

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.junit.Assert; import org.junit.Assert;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -135,12 +138,15 @@ public class TestNMExpiry {
String hostname3 = "localhost3"; String hostname3 = "localhost3";
Resource capability = BuilderUtils.newResource(1024, 1); Resource capability = BuilderUtils.newResource(1024, 1);
NodeStatus mockNodeStatus = createMockNodeStatus();
RegisterNodeManagerRequest request1 = recordFactory RegisterNodeManagerRequest request1 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class); .newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId1 = NodeId.newInstance(hostname1, 0); NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
request1.setNodeId(nodeId1); request1.setNodeId(nodeId1);
request1.setHttpPort(0); request1.setHttpPort(0);
request1.setResource(capability); request1.setResource(capability);
request1.setNodeStatus(mockNodeStatus);
resourceTrackerService.registerNodeManager(request1); resourceTrackerService.registerNodeManager(request1);
RegisterNodeManagerRequest request2 = recordFactory RegisterNodeManagerRequest request2 = recordFactory
@ -149,6 +155,7 @@ public class TestNMExpiry {
request2.setNodeId(nodeId2); request2.setNodeId(nodeId2);
request2.setHttpPort(0); request2.setHttpPort(0);
request2.setResource(capability); request2.setResource(capability);
request2.setNodeStatus(mockNodeStatus);
resourceTrackerService.registerNodeManager(request2); resourceTrackerService.registerNodeManager(request2);
int waitCount = 0; int waitCount = 0;

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
@ -178,9 +181,13 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase {
RegisterNodeManagerRequest request1 = recordFactory RegisterNodeManagerRequest request1 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class); .newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId1 = NodeId.newInstance(hostname1, 0); NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
NodeStatus mockNodeStatus = createMockNodeStatus();
request1.setNodeId(nodeId1); request1.setNodeId(nodeId1);
request1.setHttpPort(0); request1.setHttpPort(0);
request1.setResource(capability); request1.setResource(capability);
request1.setNodeStatus(mockNodeStatus);
resourceTrackerService.registerNodeManager(request1); resourceTrackerService.registerNodeManager(request1);
Assert.assertNotNull(context.getRMNodes().get(nodeId1)); Assert.assertNotNull(context.getRMNodes().get(nodeId1));
// verify Scheduler and RMContext use same RMNode reference. // verify Scheduler and RMContext use same RMNode reference.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@ -913,9 +915,13 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
RegisterNodeManagerRequest request1 = RegisterNodeManagerRequest request1 =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId1 = NodeId.newInstance(hostname1, 0); NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
NodeStatus mockNodeStatus = createMockNodeStatus();
request1.setNodeId(nodeId1); request1.setNodeId(nodeId1);
request1.setHttpPort(0); request1.setHttpPort(0);
request1.setResource(capability); request1.setResource(capability);
request1.setNodeStatus(mockNodeStatus);
privateResourceTrackerService.registerNodeManager(request1); privateResourceTrackerService.registerNodeManager(request1);
privateDispatcher.await(); privateDispatcher.await();
Resource clusterResource = Resource clusterResource =

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -43,6 +44,7 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
public class TestSchedulerHealth { public class TestSchedulerHealth {
@ -170,11 +172,11 @@ public class TestSchedulerHealth {
} }
private NodeManager registerNode(String hostName, int containerManagerPort, private NodeManager registerNode(String hostName, int containerManagerPort,
int httpPort, String rackName, Resource capability) throws IOException, int httpPort, String rackName, Resource capability, NodeStatus nodeStatus)
YarnException { throws IOException, YarnException {
NodeManager nm = NodeManager nm =
new NodeManager(hostName, containerManagerPort, httpPort, rackName, new NodeManager(hostName, containerManagerPort, httpPort, rackName,
capability, resourceManager); capability, resourceManager, nodeStatus);
NodeAddedSchedulerEvent nodeAddEvent1 = NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
.get(nm.getNodeId())); .get(nm.getNodeId()));
@ -200,11 +202,13 @@ public class TestSchedulerHealth {
assumeTrue("This test is only supported on Capacity Scheduler", assumeTrue("This test is only supported on Capacity Scheduler",
isCapacityScheduler); isCapacityScheduler);
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1 // Register node1
String host_0 = "host_0"; String host_0 = "host_0";
NodeManager nm_0 = NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(5 * 1024, 1)); Resources.createResource(5 * 1024, 1), mockNodeStatus);
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);
@ -275,15 +279,17 @@ public class TestSchedulerHealth {
assumeTrue("This test is only supported on Capacity Scheduler", assumeTrue("This test is only supported on Capacity Scheduler",
isCapacityScheduler); isCapacityScheduler);
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register nodes // Register nodes
String host_0 = "host_0"; String host_0 = "host_0";
NodeManager nm_0 = NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(2 * 1024, 1)); Resources.createResource(2 * 1024, 1), mockNodeStatus);
String host_1 = "host_1"; String host_1 = "host_1";
NodeManager nm_1 = NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(5 * 1024, 1)); Resources.createResource(5 * 1024, 1), mockNodeStatus);
nodeUpdate(nm_0); nodeUpdate(nm_0);
nodeUpdate(nm_1); nodeUpdate(nm_1);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -43,6 +44,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -227,9 +229,10 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
private NodeManager registerNode(ResourceManager rm, String hostName, private NodeManager registerNode(ResourceManager rm, String hostName,
int containerManagerPort, int httpPort, String rackName, int containerManagerPort, int httpPort, String rackName,
Resource capability) throws IOException, YarnException { Resource capability, NodeStatus nodeStatus)
throws IOException, YarnException {
NodeManager nm = new NodeManager(hostName, NodeManager nm = new NodeManager(hostName,
containerManagerPort, httpPort, rackName, capability, rm); containerManagerPort, httpPort, rackName, capability, rm, nodeStatus);
NodeAddedSchedulerEvent nodeAddEvent1 = NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes() new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
.get(nm.getNodeId())); .get(nm.getNodeId()));
@ -272,10 +275,10 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
private NodeManager registerNode(String hostName, int containerManagerPort, private NodeManager registerNode(String hostName, int containerManagerPort,
int httpPort, String rackName, int httpPort, String rackName,
Resource capability) Resource capability, NodeStatus nodeStatus)
throws IOException, YarnException { throws IOException, YarnException {
NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
rackName, capability, resourceManager); rackName, capability, resourceManager, nodeStatus);
NodeAddedSchedulerEvent nodeAddEvent1 = NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext() new NodeAddedSchedulerEvent(resourceManager.getRMContext()
.getRMNodes().get(nm.getNodeId())); .getRMNodes().get(nm.getNodeId()));
@ -288,17 +291,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
LOG.info("--- START: testCapacityScheduler ---"); LOG.info("--- START: testCapacityScheduler ---");
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1 // Register node1
String host_0 = "host_0"; String host_0 = "host_0";
NodeManager nm_0 = NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(4 * GB, 1)); Resources.createResource(4 * GB, 1), mockNodeStatus);
// Register node2 // Register node2
String host_1 = "host_1"; String host_1 = "host_1";
NodeManager nm_1 = NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(2 * GB, 1)); Resources.createResource(2 * GB, 1), mockNodeStatus);
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);
@ -428,11 +433,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
when(mC.getConfigurationProvider()).thenReturn( when(mC.getConfigurationProvider()).thenReturn(
new LocalConfigurationProvider()); new LocalConfigurationProvider());
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1 // Register node1
String host0 = "host_0"; String host0 = "host_0";
NodeManager nm0 = NodeManager nm0 =
registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(10 * GB, 10)); Resources.createResource(10 * GB, 10), mockNodeStatus);
// ResourceRequest priorities // ResourceRequest priorities
Priority priority0 = Priority.newInstance(0); Priority priority0 = Priority.newInstance(0);
@ -530,11 +537,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
when(mC.getConfigurationProvider()).thenReturn( when(mC.getConfigurationProvider()).thenReturn(
new LocalConfigurationProvider()); new LocalConfigurationProvider());
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1 // Register node1
String host0 = "host_0"; String host0 = "host_0";
NodeManager nm0 = NodeManager nm0 =
registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(10 * GB, 10)); Resources.createResource(10 * GB, 10), mockNodeStatus);
// ResourceRequest priorities // ResourceRequest priorities
Priority priority0 = Priority.newInstance(0); Priority priority0 = Priority.newInstance(0);
@ -1948,17 +1957,20 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { public void testMoveAppForMoveToQueueWithFreeCap() throws Exception {
ResourceScheduler scheduler = resourceManager.getResourceScheduler(); ResourceScheduler scheduler = resourceManager.getResourceScheduler();
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1 // Register node1
String host_0 = "host_0"; String host_0 = "host_0";
NodeManager nm_0 = NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(4 * GB, 1)); Resources.createResource(4 * GB, 1), mockNodeStatus);
// Register node2 // Register node2
String host_1 = "host_1"; String host_1 = "host_1";
NodeManager nm_1 = NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(2 * GB, 1)); Resources.createResource(2 * GB, 1), mockNodeStatus);
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);
@ -2064,17 +2076,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
ResourceScheduler scheduler = resourceManager.getResourceScheduler(); ResourceScheduler scheduler = resourceManager.getResourceScheduler();
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1 // Register node1
String host_0 = "host_0"; String host_0 = "host_0";
NodeManager nm_0 = NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(5 * GB, 1)); Resources.createResource(5 * GB, 1), mockNodeStatus);
// Register node2 // Register node2
String host_1 = "host_1"; String host_1 = "host_1";
NodeManager nm_1 = NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(5 * GB, 1)); Resources.createResource(5 * GB, 1), mockNodeStatus);
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);
@ -2186,11 +2200,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
ResourceScheduler scheduler = resourceManager.getResourceScheduler(); ResourceScheduler scheduler = resourceManager.getResourceScheduler();
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1 // Register node1
String host_0 = "host_0"; String host_0 = "host_0";
NodeManager nm_0 = NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(6 * GB, 1)); Resources.createResource(6 * GB, 1), mockNodeStatus);
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);
@ -2234,17 +2250,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
public void testMoveAppQueueMetricsCheck() throws Exception { public void testMoveAppQueueMetricsCheck() throws Exception {
ResourceScheduler scheduler = resourceManager.getResourceScheduler(); ResourceScheduler scheduler = resourceManager.getResourceScheduler();
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1 // Register node1
String host_0 = "host_0"; String host_0 = "host_0";
NodeManager nm_0 = NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(5 * GB, 1)); Resources.createResource(5 * GB, 1), mockNodeStatus);
// Register node2 // Register node2
String host_1 = "host_1"; String host_1 = "host_1";
NodeManager nm_1 = NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(5 * GB, 1)); Resources.createResource(5 * GB, 1), mockNodeStatus);
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);
@ -4126,9 +4144,12 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
} }
@Test @Test
public void testRemovedNodeDecomissioningNode() throws Exception { public void testRemovedNodeDecomissioningNode() throws Exception {
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register nodemanager // Register nodemanager
NodeManager nm = registerNode("host_decom", 1234, 2345, NodeManager nm = registerNode("host_decom", 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
mockNodeStatus);
RMNode node = RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
@ -4171,10 +4192,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
((CapacityScheduler) resourceManager.getResourceScheduler()) ((CapacityScheduler) resourceManager.getResourceScheduler())
.setRMContext(spyContext); .setRMContext(spyContext);
((AsyncDispatcher) mockDispatcher).start(); ((AsyncDispatcher) mockDispatcher).start();
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node // Register node
String host_0 = "host_0"; String host_0 = "host_0";
NodeManager nm_0 = registerNode(host_0, 1234, 2345, NodeManager nm_0 = registerNode(host_0, 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
mockNodeStatus);
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -5101,9 +5103,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
@Test @Test
public void testRemovedNodeDecomissioningNode() throws Exception { public void testRemovedNodeDecomissioningNode() throws Exception {
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register nodemanager // Register nodemanager
NodeManager nm = registerNode("host_decom", 1234, 2345, NodeManager nm = registerNode("host_decom", 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
mockNodeStatus);
RMNode node = RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
@ -5146,10 +5150,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
((FairScheduler) resourceManager.getResourceScheduler()) ((FairScheduler) resourceManager.getResourceScheduler())
.setRMContext(spyContext); .setRMContext(spyContext);
((AsyncDispatcher) mockDispatcher).start(); ((AsyncDispatcher) mockDispatcher).start();
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node // Register node
String host_0 = "host_0"; String host_0 = "host_0";
NodeManager nm_0 = registerNode(host_0, 1234, 2345, NodeManager nm_0 = registerNode(host_0, 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
mockNodeStatus);
RMNode node = RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
@ -5189,10 +5196,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
private NodeManager registerNode(String hostName, int containerManagerPort, private NodeManager registerNode(String hostName, int containerManagerPort,
int httpPort, String rackName, int httpPort, String rackName,
Resource capability) Resource capability, NodeStatus nodeStatus)
throws IOException, YarnException { throws IOException, YarnException {
NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
rackName, capability, resourceManager); rackName, capability, resourceManager, nodeStatus);
// after YARN-5375, scheduler event is processed in rm main dispatcher, // after YARN-5375, scheduler event is processed in rm main dispatcher,
// wait it processed, or may lead dead lock // wait it processed, or may lead dead lock

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -32,6 +33,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.event.Level; import org.slf4j.event.Level;
@ -141,10 +143,10 @@ public class TestFifoScheduler {
private NodeManager registerNode(String hostName, int containerManagerPort, private NodeManager registerNode(String hostName, int containerManagerPort,
int nmHttpPort, String rackName, int nmHttpPort, String rackName,
Resource capability) Resource capability, NodeStatus nodeStatus)
throws IOException, YarnException { throws IOException, YarnException {
NodeManager nm = new NodeManager(hostName, containerManagerPort, NodeManager nm = new NodeManager(hostName, containerManagerPort,
nmHttpPort, rackName, capability, resourceManager); nmHttpPort, rackName, capability, resourceManager, nodeStatus);
NodeAddedSchedulerEvent nodeAddEvent1 = NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
.get(nm.getNodeId())); .get(nm.getNodeId()));
@ -403,19 +405,21 @@ public class TestFifoScheduler {
LOG.info("--- START: testFifoScheduler ---"); LOG.info("--- START: testFifoScheduler ---");
final int GB = 1024; final int GB = 1024;
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1 // Register node1
String host_0 = "host_0"; String host_0 = "host_0";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(4 * GB, 1)); Resources.createResource(4 * GB, 1), mockNodeStatus);
nm_0.heartbeat(); nm_0.heartbeat();
// Register node2 // Register node2
String host_1 = "host_1"; String host_1 = "host_1";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(2 * GB, 1)); Resources.createResource(2 * GB, 1), mockNodeStatus);
nm_1.heartbeat(); nm_1.heartbeat();
// ResourceRequest priorities // ResourceRequest priorities
@ -1194,9 +1198,12 @@ public class TestFifoScheduler {
@Test @Test
public void testRemovedNodeDecomissioningNode() throws Exception { public void testRemovedNodeDecomissioningNode() throws Exception {
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register nodemanager // Register nodemanager
NodeManager nm = registerNode("host_decom", 1234, 2345, NodeManager nm = registerNode("host_decom", 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
mockNodeStatus);
RMNode node = RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
@ -1239,10 +1246,13 @@ public class TestFifoScheduler {
((FifoScheduler) resourceManager.getResourceScheduler()) ((FifoScheduler) resourceManager.getResourceScheduler())
.setRMContext(spyContext); .setRMContext(spyContext);
((AsyncDispatcher) mockDispatcher).start(); ((AsyncDispatcher) mockDispatcher).start();
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node // Register node
String host_0 = "host_0"; String host_0 = "host_0";
NodeManager nm_0 = registerNode(host_0, 1234, 2345, NodeManager nm_0 = registerNode(host_0, 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
mockNodeStatus);
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp; package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode; import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -191,8 +192,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
} }
private void sendStartedEvent(RMNode node) { private void sendStartedEvent(RMNode node) {
NodeStatus mockNodeStatus = createMockNodeStatus();
((RMNodeImpl) node) ((RMNodeImpl) node)
.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); .handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
mockNodeStatus));
} }
private void sendLostEvent(RMNode node) { private void sendLostEvent(RMNode node) {