From 6fc26ad5392a2a61ace60b88ed931fed3859365d Mon Sep 17 00:00:00 2001 From: bibinchundatt Date: Wed, 3 Feb 2021 08:50:45 +0530 Subject: [PATCH] YARN-10352 Skip schedule on not heartbeated nodes in Multi Node Placement. Contributed by Prabhu Joseph and Qi Zhu --- .../hadoop/yarn/conf/YarnConfiguration.java | 22 ++++ .../src/main/resources/yarn-default.xml | 7 ++ .../scheduler/AbstractYarnScheduler.java | 6 + .../scheduler/SchedulerUtils.java | 8 ++ .../scheduler/capacity/CapacityScheduler.java | 100 ++++++++++------ .../scheduler/placement/MultiNodeSorter.java | 2 +- .../placement/MultiNodeSortingManager.java | 43 ++++++- .../TestCapacitySchedulerAsyncScheduling.java | 108 +++++++++++++++++- .../TestCapacitySchedulerMultiNodes.java | 38 ++++++ ...citySchedulerMultiNodesWithPreemption.java | 1 + ...edulerActivitiesWithMultiNodesEnabled.java | 1 + 11 files changed, 299 insertions(+), 37 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5fa3ea9a0c1..d56fc64dce9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -712,6 +712,14 @@ public class YarnConfiguration extends Configuration { public static final float DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f; + /** + * Number of consecutive missed heartbeats after which node will be + * skipped from scheduling. + */ + public static final String SCHEDULER_SKIP_NODE_MULTIPLIER = + YARN_PREFIX + "scheduler.skip.node.multiplier"; + public static final int DEFAULT_SCHEDULER_SKIP_NODE_MULTIPLIER = 2; + /** Number of worker threads that write the history data. */ public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE = RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size"; @@ -4751,6 +4759,20 @@ public class YarnConfiguration extends Configuration { DEFAULT_NM_NUMA_AWARENESS_ENABLED); } + /** + * Returns Timeout to skip node from scheduling if not heartbeated. + * @param conf the configuration + * @return timeout in milliseconds. + */ + public static long getSkipNodeInterval(Configuration conf) { + long heartbeatIntvl = conf.getLong( + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); + int multiplier = conf.getInt(SCHEDULER_SKIP_NODE_MULTIPLIER, + DEFAULT_SCHEDULER_SKIP_NODE_MULTIPLIER); + return multiplier * heartbeatIntvl; + } + /* For debugging. mp configurations to system output as XML format. */ public static void main(String[] args) throws Exception { new YarnConfiguration(new Configuration()).writeXml(System.out); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 4349d56731f..23eba6e6075 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -910,6 +910,13 @@ 1.0 + + The Number of consecutive missed heartbeats after which node will be + skipped from scheduling + yarn.scheduler.skip.node.multiplier + 2 + + The minimum allowed version of a connecting nodemanager. The valid values are NONE (no version checking), EqualToRM (the nodemanager's version is equal to diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 41442363711..f95b30b1e5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -159,6 +159,7 @@ public abstract class AbstractYarnScheduler protected ConcurrentMap> applications; protected int nmExpireInterval; protected long nmHeartbeatInterval; + private long skipNodeInterval; private final static List EMPTY_CONTAINER_LIST = new ArrayList(); @@ -210,6 +211,7 @@ public abstract class AbstractYarnScheduler nmHeartbeatInterval = conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); + skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf); long configuredMaximumAllocationWaitTime = conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); @@ -368,6 +370,10 @@ public abstract class AbstractYarnScheduler return lastNodeUpdateTime; } + public long getSkipNodeInterval(){ + return skipNodeInterval; + } + protected void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 58e25979d17..abb274e5099 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -602,4 +603,11 @@ public class SchedulerUtils { node.allocateContainer(rmContainer); return rmContainer; } + + public static boolean isNodeHeartbeated(SchedulerNode node, + long skipNodeInterval) { + long timeElapsedFromLastHeartbeat = + Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); + return timeElapsedFromLastHeartbeat <= skipNodeInterval; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1bb74a092e6..158c9cd7daa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -232,7 +233,7 @@ public class CapacityScheduler extends private CapacitySchedulerAutoQueueHandler autoQueueHandler; - private static boolean printedVerboseLoggingForAsyncScheduling = false; + private boolean printedVerboseLoggingForAsyncScheduling; /** * EXPERT @@ -518,22 +519,47 @@ public class CapacityScheduler extends private final static Random random = new Random(System.currentTimeMillis()); - private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, + @VisibleForTesting + public static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, CapacityScheduler cs, boolean printVerboseLog) { - // Skip node which missed 2 heartbeats since the node might be dead and - // we should not continue allocate containers on that. - long timeElapsedFromLastHeartbeat = - Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); - if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) { + // Skip node which missed YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER + // heartbeats since the node might be dead and we should not continue + // allocate containers on that. + if (!SchedulerUtils.isNodeHeartbeated(node, cs.getSkipNodeInterval())) { if (printVerboseLog && LOG.isDebugEnabled()) { - LOG.debug("Skip scheduling on node because it haven't heartbeated for " + long timeElapsedFromLastHeartbeat = + Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); + LOG.debug("Skip scheduling on node " + node.getNodeID() + + " because it haven't heartbeated for " + timeElapsedFromLastHeartbeat / 1000.0f + " secs"); } return true; } + + if (node.getRMNode().getState() != NodeState.RUNNING) { + if (printVerboseLog && LOG.isDebugEnabled()) { + LOG.debug("Skip scheduling on node because it is in " + + node.getRMNode().getState() + " state"); + } + return true; + } return false; } + private static boolean isPrintSkippedNodeLogging(CapacityScheduler cs) { + // To avoid too verbose DEBUG logging, only print debug log once for + // every 10 secs. + boolean printSkipedNodeLogging = false; + if (LOG.isDebugEnabled()) { + if (Time.monotonicNow() / 1000 % 10 == 0) { + printSkipedNodeLogging = (!cs.printedVerboseLoggingForAsyncScheduling); + } else { + cs.printedVerboseLoggingForAsyncScheduling = false; + } + } + return printSkipedNodeLogging; + } + /** * Schedule on all nodes by starting at a random point. * Schedule on all partitions by starting at a random partition @@ -555,19 +581,12 @@ public class CapacityScheduler extends if (!cs.multiNodePlacementEnabled) { int start = random.nextInt(nodeSize); - // To avoid too verbose DEBUG logging, only print debug log once for - // every 10 secs. - boolean printSkipedNodeLogging = false; - if (Time.monotonicNow() / 1000 % 10 == 0) { - printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling); - } else { - printedVerboseLoggingForAsyncScheduling = false; - } + boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(cs); // Allocate containers of node [start, end) for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { - if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) { continue; } cs.allocateContainersToNode(node.getNodeID(), false); @@ -581,14 +600,14 @@ public class CapacityScheduler extends if (current++ > start) { break; } - if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) { continue; } cs.allocateContainersToNode(node.getNodeID(), false); } - if (printSkipedNodeLogging) { - printedVerboseLoggingForAsyncScheduling = true; + if (printSkippedNodeLogging) { + cs.printedVerboseLoggingForAsyncScheduling = true; } } else { // Get all partitions @@ -1541,20 +1560,37 @@ public class CapacityScheduler extends || assignedContainers < maxAssignPerHeartbeat); } - private CandidateNodeSet getCandidateNodeSet( - String partition) { - CandidateNodeSet candidates = null; + private Map getNodesHeartbeated(String partition) { Map nodesByPartition = new HashMap<>(); + boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(this); List nodes = nodeTracker - .getNodesPerPartition(partition); + .getNodesPerPartition(partition); + if (nodes != null && !nodes.isEmpty()) { //Filter for node heartbeat too long nodes.stream() - .filter(node -> !shouldSkipNodeSchedule(node, this, true)) - .forEach(n -> nodesByPartition.put(n.getNodeID(), n)); - candidates = new SimpleCandidateNodeSet( - nodesByPartition, partition); + .filter(node -> + !shouldSkipNodeSchedule(node, this, printSkippedNodeLogging)) + .forEach(n -> nodesByPartition.put(n.getNodeID(), n)); } + + if (printSkippedNodeLogging) { + printedVerboseLoggingForAsyncScheduling = true; + } + return nodesByPartition; + } + + private CandidateNodeSet getCandidateNodeSet( + String partition) { + CandidateNodeSet candidates = null; + Map nodesByPartition + = getNodesHeartbeated(partition); + + if (!nodesByPartition.isEmpty()) { + candidates = new SimpleCandidateNodeSet( + nodesByPartition, partition); + } + return candidates; } @@ -1563,11 +1599,9 @@ public class CapacityScheduler extends CandidateNodeSet candidates = null; candidates = new SimpleCandidateNodeSet<>(node); if (multiNodePlacementEnabled) { - Map nodesByPartition = new HashMap<>(); - List nodes = nodeTracker - .getNodesPerPartition(node.getPartition()); - if (nodes != null && !nodes.isEmpty()) { - nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + Map nodesByPartition = + getNodesHeartbeated(node.getPartition()); + if (!nodesByPartition.isEmpty()) { candidates = new SimpleCandidateNodeSet( nodesByPartition, node.getPartition()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java index f9fcdfdd531..f77a55d36bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java @@ -135,7 +135,7 @@ public class MultiNodeSorter extends AbstractService { Map nodesByPartition = new HashMap<>(); List nodes = ((AbstractYarnScheduler) rmContext .getScheduler()).getNodeTracker().getNodesPerPartition(label); - if (nodes != null && !nodes.isEmpty()) { + if (nodes != null) { nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); multiNodePolicy.addAndRefreshNodesSet( (Collection) nodesByPartition.values(), label); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java index c8a7e66f5fe..8c5691f189f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -30,8 +31,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; /** * Node Sorting Manager which runs all sorter threads and policies. @@ -48,6 +51,7 @@ public class MultiNodeSortingManager private Set policySpecs = new HashSet(); private Configuration conf; private boolean multiNodePlacementEnabled; + private long skipNodeInterval; public MultiNodeSortingManager() { super("MultiNodeSortingManager"); @@ -59,6 +63,7 @@ public class MultiNodeSortingManager LOG.info("Initializing NodeSortingService=" + getName()); super.serviceInit(configuration); this.conf = configuration; + this.skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf); } @Override @@ -134,6 +139,42 @@ public class MultiNodeSortingManager policy.addAndRefreshNodesSet(nodes, partition); } - return policy.getPreferredNodeIterator(nodes, partition); + Iterator nodesIterator = policy.getPreferredNodeIterator(nodes, + partition); + + // Skip node which missed YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER + // heartbeats since the node might be dead and we should not continue + // allocate containers on that. + Iterator filteringIterator = new Iterator() { + private N cached; + private boolean hasCached; + @Override + public boolean hasNext() { + if (hasCached) { + return true; + } + while (nodesIterator.hasNext()) { + cached = nodesIterator.next(); + if (SchedulerUtils.isNodeHeartbeated(cached, skipNodeInterval)) { + hasCached = true; + return true; + } + } + return false; + } + + @Override + public N next() { + if (hasCached) { + hasCached = false; + return cached; + } + if (!hasNext()) { + throw new NoSuchElementException(); + } + return next(); + } + }; + return filteringIterator; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 5f2bbf0190c..653a6ba0e93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -574,8 +574,6 @@ public class TestCapacitySchedulerAsyncScheduling { + ".scheduling-interval-ms", 100); // Heartbeat interval is 100 ms. conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval); - - final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(conf); // inject node label manager @@ -648,6 +646,112 @@ public class TestCapacitySchedulerAsyncScheduling { rm.close(); } + /** + * Make sure scheduler skips NMs which are not RUNNING. + * @throws Exception + */ + @Test + public void testAsyncSchedulerSkipNoRunningNMs() throws Exception { + int heartbeatInterval = 100; + conf.setInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 1); + conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 100); + // Heartbeat interval is 100 ms. + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, + heartbeatInterval); + conf.setInt(YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER, + 5); + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + // inject node label manager + MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + List nms = new ArrayList<>(); + // Add 10 nodes to the cluster, in the cluster we have 200 GB resource + for (int i = 0; i < 10; i++) { + nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB)); + } + + keepNMHeartbeat(nms, heartbeatInterval); + + List ams = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + RMApp rmApp = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder.createWithMemory(1024, rm) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withUnmanagedAM(false) + .withQueue(Character.toString((char) (i % 34 + 97))) + .withMaxAppAttempts(1) + .withCredentials(null) + .withAppType(null) + .withWaitForAppAcceptedState(false) + .build()); + MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm); + am.registerAppAttempt(); + ams.add(am); + } + + // Test for no NodeState.RUNNING node + for (int i = 0; i < 5; i++) { + RMNode rmNode = cs.getNode(nms.get(i).getNodeId()).getRMNode(); + cs.getRMContext().getDispatcher().getEventHandler().handle( + new RMNodeEvent(rmNode.getNodeID(), + RMNodeEventType.GRACEFUL_DECOMMISSION)); + rm.drainEvents(); + Assert.assertEquals(NodeState.DECOMMISSIONING, rmNode.getState()); + boolean shouldSkip = + cs.shouldSkipNodeSchedule(cs.getNode(nms.get(i).getNodeId()), + cs, true); + // make sure should skip + Assert.assertTrue(shouldSkip); + } + + for (int i = 5; i < 9; i++) { + boolean shouldSkip = + cs.shouldSkipNodeSchedule(cs.getNode(nms.get(i).getNodeId()), + cs, true); + // make sure should not skip + Assert.assertFalse(shouldSkip); + } + + pauseNMHeartbeat(); + + //Not exceed configured 5 + Thread.sleep(heartbeatInterval * 3); + + // Applications request containers. + for (int i = 0; i < 3; i++) { + ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>()); + } + + // Wait for 2000 ms. + Thread.sleep(2000); + + //Make sure that NM 0-5 don't have non-AM containers. + for (int i = 0; i < 9; i++) { + if (i < 5) { + Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) == 0); + } else { + Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) > 0); + } + } + rm.close(); + } + public static class NMHeartbeatThread extends Thread { private List mockNMS; private int interval; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java index 29de815040e..b20f8e99b19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Iterators; + import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -437,4 +440,39 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase { rm1.close(); } + + @Test + public void testMultiNodeSorterAfterHeartbeatInterval() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + rm.registerNode("127.0.0.1:1234", 10 * GB); + rm.registerNode("127.0.0.2:1234", 10 * GB); + rm.registerNode("127.0.0.3:1234", 10 * GB); + rm.registerNode("127.0.0.4:1234", 10 * GB); + + Set nodes = new HashSet<>(); + String partition = ""; + + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + waitforNMRegistered(scheduler, 4, 5); + MultiNodeSortingManager mns = rm.getRMContext() + .getMultiNodeSortingManager(); + MultiNodeSorter sorter = mns + .getMultiNodePolicy(POLICY_CLASS_NAME); + sorter.reSortClusterNodes(); + + Iterator nodeIterator = mns.getMultiNodeSortIterator( + nodes, partition, POLICY_CLASS_NAME); + Assert.assertEquals(4, Iterators.size(nodeIterator)); + + // Validate the count after missing 3 node heartbeats + Thread.sleep(YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS * 3); + + nodeIterator = mns.getMultiNodeSortIterator( + nodes, partition, POLICY_CLASS_NAME); + Assert.assertEquals(0, Iterators.size(nodeIterator)); + + rm.stop(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java index 65e0a1743e6..e1435ba62b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java @@ -111,6 +111,7 @@ public class TestCapacitySchedulerMultiNodesWithPreemption conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, ProportionalCapacityPreemptionPolicy.class.getCanonicalName()); conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 60000); } @Test(timeout=60000) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java index 67ff8cc0a51..e37a8d83c05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java @@ -127,6 +127,7 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled conf.set(policyConfPrefix + ".sorting-interval.ms", "0"); conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 30000); rm = new MockRM(conf); bind(ResourceManager.class).toInstance(rm); serve("/*").with(GuiceContainer.class);