From 947b0a154a6be7611dc623da66be6133a54d9b44 Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Mon, 28 Sep 2020 18:50:44 +0000 Subject: [PATCH] YARN-9809. Added node manager health status to resource manager registration call. Contributed by Eric Badger (ebadger). --- .../hadoop/util/NodeHealthScriptRunner.java | 18 ++++-- .../util/TestNodeHealthScriptRunner.java | 2 +- .../hadoop/yarn/conf/YarnConfiguration.java | 7 +++ .../src/main/resources/yarn-default.xml | 7 +++ .../RegisterNodeManagerRequest.java | 19 +++++- .../pb/RegisterNodeManagerRequestPBImpl.java | 41 ++++++++++++- .../yarn_server_common_service_protos.proto | 1 + .../yarn/server/nodemanager/NodeManager.java | 5 +- .../nodemanager/NodeStatusUpdaterImpl.java | 3 +- .../server/nodemanager/TestEventFlow.java | 5 ++ .../BaseContainerManagerTest.java | 7 ++- .../ResourceTrackerService.java | 5 +- .../resourcemanager/rmnode/RMNodeImpl.java | 56 ++++++++++++++---- .../rmnode/RMNodeStartedEvent.java | 9 ++- .../yarn/server/resourcemanager/MockNM.java | 22 +++++++ .../yarn/server/resourcemanager/MockRM.java | 7 ++- .../server/resourcemanager/NodeManager.java | 3 +- .../TestRMNodeTransitions.java | 54 ++++++++++++++--- .../resourcemanager/TestResourceManager.java | 22 ++++--- .../TestResourceTrackerService.java | 6 ++ .../TestRMAppLogAggregationStatus.java | 7 ++- .../resourcetracker/TestNMExpiry.java | 7 +++ .../resourcetracker/TestNMReconnect.java | 7 +++ .../scheduler/TestAbstractYarnScheduler.java | 6 ++ .../scheduler/TestSchedulerHealth.java | 18 ++++-- .../capacity/TestCapacityScheduler.java | 58 +++++++++++++------ .../scheduler/fair/TestFairScheduler.java | 15 +++-- .../scheduler/fifo/TestFifoScheduler.java | 24 +++++--- .../webapp/TestRMWebServicesNodes.java | 5 +- 29 files changed, 367 insertions(+), 79 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeHealthScriptRunner.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeHealthScriptRunner.java index 1d2e4b585dc..66d6bcd6936 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeHealthScriptRunner.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeHealthScriptRunner.java @@ -71,6 +71,8 @@ public class NodeHealthScriptRunner extends AbstractService { private long lastReportedTime; private TimerTask timer; + + private boolean runBeforeStartup; private enum HealthCheckerExitStatus { SUCCESS, @@ -191,7 +193,7 @@ public class NodeHealthScriptRunner extends AbstractService { } public NodeHealthScriptRunner(String scriptName, long chkInterval, long timeout, - String[] scriptArgs) { + String[] scriptArgs, boolean runBeforeStartup) { super(NodeHealthScriptRunner.class.getName()); this.lastReportedTime = System.currentTimeMillis(); this.isHealthy = true; @@ -200,6 +202,7 @@ public class NodeHealthScriptRunner extends AbstractService { this.intervalTime = chkInterval; this.scriptTimeout = timeout; this.timer = new NodeHealthMonitorExecutor(scriptArgs); + this.runBeforeStartup = runBeforeStartup; } /* @@ -217,9 +220,16 @@ public class NodeHealthScriptRunner extends AbstractService { @Override protected void serviceStart() throws Exception { nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true); - // Start the timer task immediately and - // then periodically at interval time. - nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime); + long delay = 0; + if (runBeforeStartup) { + // Start the timer task immediately and wait for it to return. + timer.run(); + delay = intervalTime; + } + + // Set the script to run periodically at interval time. + nodeHealthScriptScheduler.scheduleAtFixedRate(timer, delay, + intervalTime); super.serviceStart(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestNodeHealthScriptRunner.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestNodeHealthScriptRunner.java index 2748c0b581a..86b9f46e97e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestNodeHealthScriptRunner.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestNodeHealthScriptRunner.java @@ -100,7 +100,7 @@ public class TestNodeHealthScriptRunner { writeNodeHealthScriptFile(normalScript, true); NodeHealthScriptRunner nodeHealthScriptRunner = new NodeHealthScriptRunner( nodeHealthscriptFile.getAbsolutePath(), - 500, 1000, new String[] {}); + 500, 1000, new String[] {}, true); nodeHealthScriptRunner.init(conf); TimerTask timerTask = nodeHealthScriptRunner.getTimerTask(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e40861d16d6..ba440e6d960 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1928,6 +1928,13 @@ public class YarnConfiguration extends Configuration { NM_PREFIX + "health-checker.script.timeout-ms"; public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = 2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS; + + /** Whether or not to run the node health script before the NM + * starts up.*/ + public static final String NM_HEALTH_CHECK_RUN_BEFORE_STARTUP = + NM_PREFIX + "health-checker.run-before-startup"; + public static final boolean DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP = + false; /** The health check script to run.*/ public static final String NM_HEALTH_CHECK_SCRIPT_PATH = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index acb70150725..7236e4f835c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1595,6 +1595,13 @@ 1200000 + + Whether or not to run the node health script + before the NM starts up. + yarn.nodemanager.health-checker.run-before-startup + false + + The health check script to run. yarn.nodemanager.health-checker.script.path diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index acec16fd56b..54b39155c63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; public abstract class RegisterNodeManagerRequest { @@ -53,14 +54,15 @@ public abstract class RegisterNodeManagerRequest { Resource physicalResource) { return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, containerStatuses, runningApplications, nodeLabels, physicalResource, - null); + null, null); } public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, List runningApplications, Set nodeLabels, - Resource physicalResource, Set nodeAttributes) { + Resource physicalResource, Set nodeAttributes, + NodeStatus nodeStatus) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -72,6 +74,7 @@ public abstract class RegisterNodeManagerRequest { request.setNodeLabels(nodeLabels); request.setPhysicalResource(physicalResource); request.setNodeAttributes(nodeAttributes); + request.setNodeStatus(nodeStatus); return request; } @@ -133,4 +136,16 @@ public abstract class RegisterNodeManagerRequest { public abstract Set getNodeAttributes(); public abstract void setNodeAttributes(Set nodeAttributes); + + /** + * Get the status of the node. + * @return The status of the node. + */ + public abstract NodeStatus getNodeStatus(); + + /** + * Set the status of the node. + * @param nodeStatus The status of the node. + */ + public abstract void setNodeStatus(NodeStatus nodeStatus); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 317f8abd6f1..1ad414931a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -45,13 +45,16 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregation import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeAttributesProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; - +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; + public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest { RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); RegisterNodeManagerRequestProto.Builder builder = null; @@ -68,6 +71,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest /** Physical resources in the node. */ private Resource physicalResource = null; + private NodeStatus nodeStatus; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -118,6 +122,9 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest if (this.physicalResource != null) { builder.setPhysicalResource(convertToProtoFormat(this.physicalResource)); } + if (this.nodeStatus != null) { + builder.setNodeStatus(convertToProtoFormat(this.nodeStatus)); + } if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } @@ -359,6 +366,29 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest this.physicalResource = pPhysicalResource; } + @Override + public synchronized NodeStatus getNodeStatus() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.nodeStatus != null) { + return this.nodeStatus; + } + if (!p.hasNodeStatus()) { + return null; + } + this.nodeStatus = convertFromProtoFormat(p.getNodeStatus()); + return this.nodeStatus; + } + + @Override + public synchronized void setNodeStatus(NodeStatus pNodeStatus) { + maybeInitBuilder(); + if (pNodeStatus == null) { + builder.clearNodeStatus(); + } + this.nodeStatus = pNodeStatus; + } + + @Override public int hashCode() { return getProto().hashCode(); @@ -533,4 +563,13 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest } this.logAggregationReportsForApps = logAggregationStatusForApps; } + + private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto s) { + return new NodeStatusPBImpl(s); + } + + private NodeStatusProto convertToProtoFormat(NodeStatus s) { + return ((NodeStatusPBImpl)s).getProto(); + } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index fb74237b5b8..54b097e207e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -73,6 +73,7 @@ message RegisterNodeManagerRequestProto { optional ResourceProto physicalResource = 9; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10; optional NodeAttributesProto nodeAttributes = 11; + optional NodeStatusProto nodeStatus = 12; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 6edde643a39..62ae7b361ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -367,8 +367,11 @@ public class NodeManager extends CompositeService YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS); String[] scriptArgs = conf.getStrings( YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS, new String[] {}); + boolean runBeforeStartup = conf.getBoolean( + YarnConfiguration.NM_HEALTH_CHECK_RUN_BEFORE_STARTUP, + YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP); return new NodeHealthScriptRunner(nodeHealthScript, - nmCheckintervalTime, scriptTimeout, scriptArgs); + nmCheckintervalTime, scriptTimeout, scriptArgs, runBeforeStartup); } @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 682937d34ab..e50f713a64e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -390,10 +390,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // during RM recovery synchronized (this.context) { List containerReports = getNMContainerStatuses(); + NodeStatus nodeStatus = getNodeStatus(0); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels, physicalResource, nodeAttributes); + nodeLabels, physicalResource, nodeAttributes, nodeStatus); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index 54e090a29e2..e47bdbadaf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.mockito.Mockito.mock; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -133,6 +135,9 @@ public class TestEventFlow { new DummyContainerManager(context, exec, del, nodeStatusUpdater, metrics, dirsHandler); nodeStatusUpdater.init(conf); + NodeResourceMonitorImpl nodeResourceMonitor = mock( + NodeResourceMonitorImpl.class); + ((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor); ((NMContext)context).setContainerManager(containerManager); nodeStatusUpdater.start(); ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 493aa4ca762..5b1d84c3f47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitorImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,9 +157,12 @@ public abstract class BaseContainerManagerTest { protected NodeHealthCheckerService nodeHealthChecker; protected LocalDirsHandlerService dirsHandler; protected final long DUMMY_RM_IDENTIFIER = 1234; + private NodeHealthCheckerService nodeHealthCheckerService = mock(NodeHealthCheckerService.class); + private NodeResourceMonitorImpl nodeResourceMonitor = mock( + NodeResourceMonitorImpl.class); protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( - context, new AsyncDispatcher(), null, metrics) { + context, new AsyncDispatcher(), nodeHealthCheckerService, metrics) { @Override protected ResourceTracker getRMClient() { return new LocalRMInterface(); @@ -223,6 +227,7 @@ public abstract class BaseContainerManagerTest { nodeHealthChecker.init(conf); containerManager = createContainerManager(delSrvc); ((NMContext)context).setContainerManager(containerManager); + ((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor); nodeStatusUpdater.init(conf); containerManager.init(conf); nodeStatusUpdater.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 8963d72a524..48401e062fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -333,6 +333,7 @@ public class ResourceTrackerService extends AbstractService implements Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); Resource physicalResource = request.getPhysicalResource(); + NodeStatus nodeStatus = request.getNodeStatus(); RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -409,7 +410,7 @@ public class ResourceTrackerService extends AbstractService implements if (oldNode == null) { RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), - request.getRunningApplications()); + request.getRunningApplications(), nodeStatus); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { if (LOG.isDebugEnabled()) { @@ -445,7 +446,7 @@ public class ResourceTrackerService extends AbstractService implements this.rmContext.getRMNodes().put(nodeId, rmNode); this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeStartedEvent(nodeId, null, null)); + .handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus)); } else { // Reset heartbeat ID since node just restarted. oldNode.resetLastNodeHeartBeatResponse(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 7fe53c17f1e..464aaaea93d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.collections.keyvalue.DefaultMapEntry; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -199,7 +200,8 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeEventType, RMNodeEvent>(NodeState.NEW) //Transitions from NEW state - .addTransition(NodeState.NEW, NodeState.RUNNING, + .addTransition(NodeState.NEW, + EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STARTED, new AddNodeTransition()) .addTransition(NodeState.NEW, NodeState.NEW, RMNodeEventType.RESOURCE_UPDATE, @@ -685,7 +687,6 @@ public class RMNodeImpl implements RMNode, EventHandler { private void updateMetricsForRejoinedNode(NodeState previousNodeState) { ClusterMetrics metrics = ClusterMetrics.getMetrics(); - metrics.incrNumActiveNodes(); switch (previousNodeState) { case LOST: @@ -827,10 +828,10 @@ public class RMNodeImpl implements RMNode, EventHandler { } public static class AddNodeTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; List containers = null; @@ -848,8 +849,6 @@ public class RMNodeImpl implements RMNode, EventHandler { if (previousRMNode != null) { ClusterMetrics.getMetrics().decrDecommisionedNMs(); } - // Increment activeNodes explicitly because this is a new node. - ClusterMetrics.getMetrics().incrNumActiveNodes(); containers = startEvent.getNMContainerStatuses(); if (containers != null && !containers.isEmpty()) { for (NMContainerStatus container : containers) { @@ -866,17 +865,35 @@ public class RMNodeImpl implements RMNode, EventHandler { } } - rmNode.context.getDispatcher().getEventHandler() - .handle(new NodeAddedSchedulerEvent(rmNode, containers)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); + NodeState nodeState; + NodeStatus nodeStatus = startEvent.getNodeStatus(); + + if (nodeStatus == null) { + nodeState = NodeState.RUNNING; + reportNodeRunning(rmNode, containers); + } else { + RMNodeStatusEvent rmNodeStatusEvent = + new RMNodeStatusEvent(nodeId, nodeStatus); + + NodeHealthStatus nodeHealthStatus = + updateRMNodeFromStatusEvents(rmNode, rmNodeStatusEvent); + + if (nodeHealthStatus.getIsNodeHealthy()) { + nodeState = NodeState.RUNNING; + reportNodeRunning(rmNode, containers); + } else { + nodeState = NodeState.UNHEALTHY; + reportNodeUnusable(rmNode, nodeState); + } + } + List logAggregationReportsForApps = startEvent.getLogAggregationReportsForApps(); if (logAggregationReportsForApps != null && !logAggregationReportsForApps.isEmpty()) { rmNode.handleLogAggregationStatus(logAggregationReportsForApps); } + return nodeState; } } @@ -1087,6 +1104,22 @@ public class RMNodeImpl implements RMNode, EventHandler { } } + /** + * Report node is RUNNING. + * @param rmNode + * @param containers + */ + public static void reportNodeRunning(RMNodeImpl rmNode, + List containers) { + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeAddedSchedulerEvent(rmNode, containers)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); + // Increment activeNodes explicitly because this is a new node. + ClusterMetrics.getMetrics().incrNumActiveNodes(); + } + /** * Report node is UNUSABLE and update metrics. * @param rmNode @@ -1278,6 +1311,7 @@ public class RMNodeImpl implements RMNode, EventHandler { // notifiers get update metadata because they will very likely query it // upon notification // Update metrics + ClusterMetrics.getMetrics().incrNumActiveNodes(); rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY); return NodeState.RUNNING; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java index 397699453fb..fbbda9a37b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java @@ -24,19 +24,22 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; public class RMNodeStartedEvent extends RMNodeEvent { + private final NodeStatus nodeStatus; private List containerStatuses; private List runningApplications; private List logAggregationReportsForApps; public RMNodeStartedEvent(NodeId nodeId, List containerReports, - List runningApplications) { + List runningApplications, NodeStatus nodeStatus) { super(nodeId, RMNodeEventType.STARTED); this.containerStatuses = containerReports; this.runningApplications = runningApplications; + this.nodeStatus = nodeStatus; } public List getNMContainerStatuses() { @@ -55,4 +58,8 @@ public class RMNodeStartedEvent extends RMNodeEvent { List logAggregationReportsForApps) { this.logAggregationReportsForApps = logAggregationReportsForApps; } + + public NodeStatus getNodeStatus() { + return nodeStatus; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e2839552ee..04948aa8722 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -185,6 +188,17 @@ public class MockNM { req.setNodeLabels(nodeLabels); } + NodeStatus status = Records.newRecord(NodeStatus.class); + status.setResponseId(0); + status.setNodeId(nodeId); + status.setContainersStatuses(new ArrayList<>(containerStats.values())); + NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); + healthStatus.setHealthReport(""); + healthStatus.setIsNodeHealthy(true); + healthStatus.setLastHealthReportTime(1); + status.setNodeHealthStatus(healthStatus); + req.setNodeStatus(status); + RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = @@ -324,4 +338,12 @@ public class MockNM { public void setResponseId(int id) { this.responseId = id; } + + public static NodeStatus createMockNodeStatus() { + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); + return mockNodeStatus; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 9bbd148ff41..9b33c838807 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; + import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; @@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; @@ -960,7 +963,9 @@ public class MockRM extends ResourceManager { public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); - node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null, + mockNodeStatus)); drainEventsImplicitly(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index 41ae7b89977..5af33e6ea5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -96,7 +96,7 @@ public class NodeManager implements ContainerManagementProtocol { public NodeManager(String hostName, int containerManagerPort, int httpPort, String rackName, Resource capability, - ResourceManager resourceManager) + ResourceManager resourceManager, NodeStatus nodeStatus) throws IOException, YarnException { this.containerManagerAddress = hostName + ":" + containerManagerPort; this.nodeHttpAddress = hostName + ":" + httpPort; @@ -111,6 +111,7 @@ public class NodeManager implements ContainerManagementProtocol { request.setResource(capability); request.setNodeId(this.nodeId); request.setNMVersion(YarnVersionInfo.getVersion()); + request.setNodeStatus(nodeStatus); resourceTrackerService.registerNodeManager(request); this.resourceManager = resourceManager; resourceManager.getResourceScheduler().getNodeReport(this.nodeId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 67decaf9316..d9a1812da05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; @@ -216,8 +217,9 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testExpiredContainer() { + NodeStatus mockNodeStatus = createMockNodeStatus(); // Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -279,12 +281,13 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ + NodeStatus mockNodeStatus = createMockNodeStatus(); //Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeStartedEvent(null, null, null)); + node2.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); ApplicationId app1 = BuilderUtils.newApplicationId(1, 1); @@ -340,8 +343,9 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testStatusChange(){ + NodeStatus mockNodeStatus = createMockNodeStatus(); //Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); //Add info to the queue first node.setNextHeartBeat(false); @@ -606,6 +610,33 @@ public class TestRMNodeTransitions { Assert.assertEquals(NodeState.REBOOTED, node.getState()); } + @Test + public void testAddUnhealthyNode() { + ClusterMetrics cm = ClusterMetrics.getMetrics(); + int initialUnhealthy = cm.getUnhealthyNMs(); + int initialActive = cm.getNumActiveNMs(); + int initialLost = cm.getNumLostNMs(); + int initialDecommissioned = cm.getNumDecommisionedNMs(); + int initialRebooted = cm.getNumRebootedNMs(); + + NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", + System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, + new ArrayList<>(), null, status, null, null, null); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + nodeStatus)); + + Assert.assertEquals("Unhealthy Nodes", + initialUnhealthy + 1, cm.getUnhealthyNMs()); + Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); + Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); + Assert.assertEquals("Decommissioned Nodes", + initialDecommissioned, cm.getNumDecommisionedNMs()); + Assert.assertEquals("Rebooted Nodes", + initialRebooted, cm.getNumRebootedNMs()); + Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); + } + @Test public void testNMShutdown() { RMNodeImpl node = getRunningNode(); @@ -711,7 +742,9 @@ public class TestRMNodeTransitions { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, capability, nmVersion); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -762,7 +795,9 @@ public class TestRMNodeTransitions { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, capability, null); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals(NodeState.RUNNING, node.getState()); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING)); Assert.assertEquals(NodeState.REBOOTED, node.getState()); @@ -778,7 +813,9 @@ public class TestRMNodeTransitions { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", @@ -1074,8 +1111,9 @@ public class TestRMNodeTransitions { @Test public void testForHandlingDuplicatedCompltedContainers() { + NodeStatus mockNodeStatus = createMockNodeStatus(); // Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); // Add info to the queue first node.setNextHeartBeat(false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index efa7496259d..e5f75d66f70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -26,6 +27,7 @@ import java.util.Collection; import java.util.concurrent.TimeoutException; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -82,12 +84,13 @@ public class TestResourceManager { private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode(String hostName, int containerManagerPort, int httpPort, - String rackName, Resource capability) throws IOException, + String rackName, Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( hostName, containerManagerPort, httpPort, rackName, capability, - resourceManager); + resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -103,26 +106,30 @@ public class TestResourceManager { final int memory = 4 * 1024; final int vcores = 4; + + NodeStatus mockNodeStatus = createMockNodeStatus(); // Register node1 String host1 = "host1"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory, vcores)); + Resources.createResource(memory, vcores), mockNodeStatus); // Register node2 String host2 = "host2"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory/2, vcores/2)); + Resources.createResource(memory/2, vcores/2), mockNodeStatus); // nodes should be in RUNNING state RMNodeImpl node1 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( nm1.getNodeId()); RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( nm2.getNodeId()); - node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null)); - node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null)); + node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null, + mockNodeStatus)); + node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null, + mockNodeStatus)); // Submit an application Application application = new Application("user1", resourceManager); @@ -210,9 +217,10 @@ public class TestResourceManager { public void testNodeHealthReportIsNotNull() throws Exception{ String host1 = "host1"; final int memory = 4 * 1024; + NodeStatus mockNodeStatus = createMockNodeStatus(); org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory, 1)); + Resources.createResource(memory, 1), mockNodeStatus); nm1.heartbeat(); nm1.heartbeat(); Collection values = resourceManager.getRMContext().getRMNodes().values(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 698546d53e0..94d440a6d82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore; + +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -2682,10 +2684,14 @@ public class TestResourceTrackerService extends NodeLabelTestBase { RegisterNodeManagerRequest.class); NodeId nodeId = NodeId.newInstance("host2", 1234); Resource capability = BuilderUtils.newResource(1024, 1); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + req.setResource(capability); req.setNodeId(nodeId); req.setHttpPort(1234); req.setNMVersion(YarnVersionInfo.getVersion()); + req.setNodeStatus(mockNodeStatus); ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1); ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2); ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index c2bc6117806..a09be0cc349 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -139,13 +140,15 @@ public class TestRMAppLogAggregationStatus { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node1 = new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null); - node1.handle(new RMNodeStartedEvent(nodeId1, null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node1.handle(new RMNodeStartedEvent(nodeId1, null, null, mockNodeStatus)); rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1)); NodeId nodeId2 = NodeId.newInstance("localhost", 2345); RMNodeImpl node2 = new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null); - node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null)); + node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null, + mockNodeStatus)); rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2)); // The initial log aggregation status for these two nodes diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index f69faf4ea55..017a1e021d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; + +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.junit.Assert; import org.slf4j.Logger; @@ -135,12 +138,15 @@ public class TestNMExpiry { String hostname3 = "localhost3"; Resource capability = BuilderUtils.newResource(1024, 1); + NodeStatus mockNodeStatus = createMockNodeStatus(); + RegisterNodeManagerRequest request1 = recordFactory .newRecordInstance(RegisterNodeManagerRequest.class); NodeId nodeId1 = NodeId.newInstance(hostname1, 0); request1.setNodeId(nodeId1); request1.setHttpPort(0); request1.setResource(capability); + request1.setNodeStatus(mockNodeStatus); resourceTrackerService.registerNodeManager(request1); RegisterNodeManagerRequest request2 = recordFactory @@ -149,6 +155,7 @@ public class TestNMExpiry { request2.setNodeId(nodeId2); request2.setHttpPort(0); request2.setResource(capability); + request2.setNodeStatus(mockNodeStatus); resourceTrackerService.registerNodeManager(request2); int waitCount = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index 3c4e6b424de..817fb9dfc33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; @@ -178,9 +181,13 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase { RegisterNodeManagerRequest request1 = recordFactory .newRecordInstance(RegisterNodeManagerRequest.class); NodeId nodeId1 = NodeId.newInstance(hostname1, 0); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + request1.setNodeId(nodeId1); request1.setHttpPort(0); request1.setResource(capability); + request1.setNodeStatus(mockNodeStatus); resourceTrackerService.registerNodeManager(request1); Assert.assertNotNull(context.getRMNodes().get(nodeId1)); // verify Scheduler and RMContext use same RMNode reference. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index ba409b1386b..e66d5390382 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -913,9 +915,13 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { RegisterNodeManagerRequest request1 = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); NodeId nodeId1 = NodeId.newInstance(hostname1, 0); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + request1.setNodeId(nodeId1); request1.setHttpPort(0); request1.setResource(capability); + request1.setNodeStatus(mockNodeStatus); privateResourceTrackerService.registerNodeManager(request1); privateDispatcher.await(); Resource clusterResource = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java index 83a354de5a2..a75be7745fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -43,6 +44,7 @@ import org.junit.Test; import java.io.IOException; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assume.assumeTrue; public class TestSchedulerHealth { @@ -170,11 +172,11 @@ public class TestSchedulerHealth { } private NodeManager registerNode(String hostName, int containerManagerPort, - int httpPort, String rackName, Resource capability) throws IOException, - YarnException { + int httpPort, String rackName, Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, rackName, - capability, resourceManager); + capability, resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -200,11 +202,13 @@ public class TestSchedulerHealth { assumeTrue("This test is only supported on Capacity Scheduler", isCapacityScheduler); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * 1024, 1)); + Resources.createResource(5 * 1024, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -275,15 +279,17 @@ public class TestSchedulerHealth { assumeTrue("This test is only supported on Capacity Scheduler", isCapacityScheduler); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodes String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * 1024, 1)); + Resources.createResource(2 * 1024, 1), mockNodeStatus); String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * 1024, 1)); + Resources.createResource(5 * 1024, 1), mockNodeStatus); nodeUpdate(nm_0); nodeUpdate(nm_1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index b60394876f2..3b2a5574827 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES; import static org.junit.Assert.assertEquals; @@ -43,6 +44,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -227,9 +229,10 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { private NodeManager registerNode(ResourceManager rm, String hostName, int containerManagerPort, int httpPort, String rackName, - Resource capability) throws IOException, YarnException { + Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, - containerManagerPort, httpPort, rackName, capability, rm); + containerManagerPort, httpPort, rackName, capability, rm, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -272,10 +275,10 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { private NodeManager registerNode(String hostName, int containerManagerPort, int httpPort, String rackName, - Resource capability) + Resource capability, NodeStatus nodeStatus) throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, - rackName, capability, resourceManager); + rackName, capability, resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -288,17 +291,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { LOG.info("--- START: testCapacityScheduler ---"); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + Resources.createResource(4 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + Resources.createResource(2 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -428,11 +433,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { when(mC.getConfigurationProvider()).thenReturn( new LocalConfigurationProvider()); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host0 = "host_0"; NodeManager nm0 = registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(10 * GB, 10)); + Resources.createResource(10 * GB, 10), mockNodeStatus); // ResourceRequest priorities Priority priority0 = Priority.newInstance(0); @@ -530,11 +537,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { when(mC.getConfigurationProvider()).thenReturn( new LocalConfigurationProvider()); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host0 = "host_0"; NodeManager nm0 = registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(10 * GB, 10)); + Resources.createResource(10 * GB, 10), mockNodeStatus); // ResourceRequest priorities Priority priority0 = Priority.newInstance(0); @@ -1948,17 +1957,20 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + Resources.createResource(4 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + Resources.createResource(2 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2064,17 +2076,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2186,11 +2200,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(6 * GB, 1)); + Resources.createResource(6 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2234,17 +2250,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { public void testMoveAppQueueMetricsCheck() throws Exception { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -4126,9 +4144,12 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { } @Test public void testRemovedNodeDecomissioningNode() throws Exception { + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodemanager NodeManager nm = registerNode("host_decom", 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); @@ -4171,10 +4192,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { ((CapacityScheduler) resourceManager.getResourceScheduler()) .setRMContext(spyContext); ((AsyncDispatcher) mockDispatcher).start(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 1e2465081e5..660b15f8568 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -5101,9 +5103,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testRemovedNodeDecomissioningNode() throws Exception { + NodeStatus mockNodeStatus = createMockNodeStatus(); // Register nodemanager NodeManager nm = registerNode("host_decom", 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); @@ -5146,10 +5150,13 @@ public class TestFairScheduler extends FairSchedulerTestBase { ((FairScheduler) resourceManager.getResourceScheduler()) .setRMContext(spyContext); ((AsyncDispatcher) mockDispatcher).start(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); @@ -5189,10 +5196,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { private NodeManager registerNode(String hostName, int containerManagerPort, int httpPort, String rackName, - Resource capability) + Resource capability, NodeStatus nodeStatus) throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, - rackName, capability, resourceManager); + rackName, capability, resourceManager, nodeStatus); // after YARN-5375, scheduler event is processed in rm main dispatcher, // wait it processed, or may lead dead lock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index f9093c18d4e..d34e8b09c60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -32,6 +33,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -141,10 +143,10 @@ public class TestFifoScheduler { private NodeManager registerNode(String hostName, int containerManagerPort, int nmHttpPort, String rackName, - Resource capability) + Resource capability, NodeStatus nodeStatus) throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, - nmHttpPort, rackName, capability, resourceManager); + nmHttpPort, rackName, capability, resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -403,19 +405,21 @@ public class TestFifoScheduler { LOG.info("--- START: testFifoScheduler ---"); final int GB = 1024; - + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + Resources.createResource(4 * GB, 1), mockNodeStatus); nm_0.heartbeat(); // Register node2 String host_1 = "host_1"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + Resources.createResource(2 * GB, 1), mockNodeStatus); nm_1.heartbeat(); // ResourceRequest priorities @@ -1194,9 +1198,12 @@ public class TestFifoScheduler { @Test public void testRemovedNodeDecomissioningNode() throws Exception { + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodemanager NodeManager nm = registerNode("host_decom", 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); @@ -1239,10 +1246,13 @@ public class TestFifoScheduler { ((FifoScheduler) resourceManager.getResourceScheduler()) .setRMContext(spyContext); ((AsyncDispatcher) mockDispatcher).start(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index f7d470d7cdd..33345a9085f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -191,8 +192,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase { } private void sendStartedEvent(RMNode node) { + NodeStatus mockNodeStatus = createMockNodeStatus(); ((RMNodeImpl) node) - .handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + .handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); } private void sendLostEvent(RMNode node) {