From 896b473f1b477976e449184ceea075bedd71d6e8 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Sat, 14 Apr 2018 11:04:49 -0700 Subject: [PATCH] YARN-8138. Add unit test to validate queue priority preemption works under node partition. (Zian Chen via wangda) Change-Id: Ibebfab98a714c12c2dc643b6d7b9754a7f813632 (cherry picked from commit 6ee62e6b1c9b4bc3447ce870446068e626b1a492) --- ...stCapacitySchedulerSurgicalPreemption.java | 150 ++++++++++++++++++ 1 file changed, 150 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 9b183c06176..2aff82d090c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -18,8 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -38,18 +42,24 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Set; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; +import static org.hamcrest.MatcherAssert.assertThat; + public class TestCapacitySchedulerSurgicalPreemption extends CapacitySchedulerPreemptionTestBase { + private static final int NUM_NM = 5; @Override @Before public void setUp() throws Exception { @@ -860,6 +870,146 @@ protected RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } + private void initializeConfProperties(CapacitySchedulerConfiguration conf) + throws IOException { + + conf.setQueues("root", new String[] {"A", "B"}); + conf.setCapacity("root.A", 50); + conf.setCapacity("root.B", 50); + conf.setQueuePriority("root.A", 1); + conf.setQueuePriority("root.B", 2); + + conf.set(PREFIX + "root.ordering-policy", "priority-utilization"); + conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.enabled", "true"); + conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.allow-move-reservation", "false"); + conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.reserved-container-delay-ms", "0"); + conf.set(PREFIX + "root.accessible-node-labels.x.capacity", "100"); + + // Setup queue access to node labels + conf.set(PREFIX + "root.A.accessible-node-labels", "x"); + conf.set(PREFIX + "root.B.accessible-node-labels", "x"); + conf.set(PREFIX + "root.A.default-node-label-expression", "x"); + conf.set(PREFIX + "root.B.default-node-label-expression", "x"); + conf.set(PREFIX + "root.A.accessible-node-labels.x.capacity", "50"); + conf.set(PREFIX + "root.B.accessible-node-labels.x.capacity", "50"); + conf.set(PREFIX + "root.A.user-limit-factor", "100"); + conf.set(PREFIX + "root.B.user-limit-factor", "100"); + conf.set(PREFIX + "maximum-am-resource-percent", "1"); + + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "1"); + conf.set(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, "1000"); + conf.set(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, "1000"); + conf.set(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, "0.5"); + conf.set(CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, "1"); + + } + + @Test + public void testPriorityPreemptionWithNodeLabels() throws Exception { + // set up queue priority and capacity + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + + initializeConfProperties(conf); + + MockRM rm1 = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); + + MockNM[] mockNMs = new MockNM[NUM_NM]; + for (int i = 0; i < NUM_NM; i++) { + mockNMs[i] = rm1.registerNode("h" + i + ":1234", 6144); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + mgr.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("x"))); + + RMNode[] rmNodes = new RMNode[5]; + for (int i = 0; i < NUM_NM; i++) { + rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId()); + mgr.replaceLabelsOnNode( + ImmutableMap.of(rmNodes[i].getNodeID(), ImmutableSet.of("x"))); + } + + // launch an app to queue B, AM container launched in nm4 + RMApp app1 = rm1.submitApp(4096, "app", "user", null, "B"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]); + + am1.allocate("*", 4096, NUM_NM-1, new ArrayList<>()); + + // Do allocation for nm0-nm3 + for (int i = 0; i < NUM_NM-1; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // App1 should have 5 containers now, one for each node + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(NUM_NM, schedulerApp1.getLiveContainers().size()); + for (int i = 0; i < NUM_NM; i++) { + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode( + rmNodes[i].getNodeID()), am1.getApplicationAttemptId(), 1); + } + + // Submit app2 to queue A and asks for a 750MB container for AM (on n0) + RMApp app2 = rm1.submitApp(1024, "app", "user", null, "A"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Ask NUM_NM-1 * 1500MB containers + am2.allocate("*", 2048, NUM_NM-1, new ArrayList<>()); + + // Do allocation for n1-n4 + for (int i = 1; i < NUM_NM; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // kill app1 + rm1.killApp(app1.getApplicationId()); + + // Submit app3 to queue B and asks for a 5000MB container for AM (on n2) + RMApp app3 = rm1.submitApp(1024, "app", "user", null, "B"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]); + FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app3.getApplicationId(), 1)); + + // Ask NUM_NM * 5000MB containers + am3.allocate("*", 5120, NUM_NM, new ArrayList<>()); + + // Do allocation for n0-n4 + for (int i = 0; i < NUM_NM; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // Sleep the timeout interval, we should see 2 containers selected + Thread.sleep(1000); + + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); + editPolicy.editSchedule(); + + // We should only allow to preempt 2 containers, on node1 and node2 + Set selectedToPreempt = + editPolicy.getToPreemptContainers().keySet(); + Assert.assertEquals(2, selectedToPreempt.size()); + List selectedToPreemptNodeIds = new ArrayList<>(); + for (RMContainer rmc : selectedToPreempt) { + selectedToPreemptNodeIds.add(rmc.getAllocatedNode()); + } + assertThat(selectedToPreemptNodeIds, CoreMatchers.hasItems( + mockNMs[1].getNodeId(), mockNMs[2].getNodeId())); + + rm1.close(); + + } @Test(timeout = 60000) public void testPreemptionForFragmentatedCluster() throws Exception {