YARN-8138. Add unit test to validate queue priority preemption works under node partition. (Zian Chen via wangda)

Change-Id: Ibebfab98a714c12c2dc643b6d7b9754a7f813632
This commit is contained in:
Wangda Tan 2018-04-14 11:04:49 -07:00
parent 0b88683627
commit 6ee62e6b1c
1 changed files with 150 additions and 0 deletions

View File

@ -18,8 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -38,18 +42,24 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.hamcrest.MatcherAssert.assertThat;
public class TestCapacitySchedulerSurgicalPreemption
extends CapacitySchedulerPreemptionTestBase {
private static final int NUM_NM = 5;
@Override
@Before
public void setUp() throws Exception {
@ -860,6 +870,146 @@ public class TestCapacitySchedulerSurgicalPreemption
rm1.close();
}
private void initializeConfProperties(CapacitySchedulerConfiguration conf)
throws IOException {
conf.setQueues("root", new String[] {"A", "B"});
conf.setCapacity("root.A", 50);
conf.setCapacity("root.B", 50);
conf.setQueuePriority("root.A", 1);
conf.setQueuePriority("root.B", 2);
conf.set(PREFIX + "root.ordering-policy", "priority-utilization");
conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.enabled", "true");
conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.allow-move-reservation", "false");
conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.reserved-container-delay-ms", "0");
conf.set(PREFIX + "root.accessible-node-labels.x.capacity", "100");
// Setup queue access to node labels
conf.set(PREFIX + "root.A.accessible-node-labels", "x");
conf.set(PREFIX + "root.B.accessible-node-labels", "x");
conf.set(PREFIX + "root.A.default-node-label-expression", "x");
conf.set(PREFIX + "root.B.default-node-label-expression", "x");
conf.set(PREFIX + "root.A.accessible-node-labels.x.capacity", "50");
conf.set(PREFIX + "root.B.accessible-node-labels.x.capacity", "50");
conf.set(PREFIX + "root.A.user-limit-factor", "100");
conf.set(PREFIX + "root.B.user-limit-factor", "100");
conf.set(PREFIX + "maximum-am-resource-percent", "1");
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "1");
conf.set(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, "1000");
conf.set(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, "1000");
conf.set(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, "0.5");
conf.set(CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, "1");
}
@Test
public void testPriorityPreemptionWithNodeLabels() throws Exception {
// set up queue priority and capacity
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
initializeConfProperties(conf);
MockRM rm1 = new MockRM(conf) {
protected RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.start();
MockNM[] mockNMs = new MockNM[NUM_NM];
for (int i = 0; i < NUM_NM; i++) {
mockNMs[i] = rm1.registerNode("h" + i + ":1234", 6144);
}
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
mgr.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("x")));
RMNode[] rmNodes = new RMNode[5];
for (int i = 0; i < NUM_NM; i++) {
rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
mgr.replaceLabelsOnNode(
ImmutableMap.of(rmNodes[i].getNodeID(), ImmutableSet.of("x")));
}
// launch an app to queue B, AM container launched in nm4
RMApp app1 = rm1.submitApp(4096, "app", "user", null, "B");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
am1.allocate("*", 4096, NUM_NM-1, new ArrayList<>());
// Do allocation for nm0-nm3
for (int i = 0; i < NUM_NM-1; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// App1 should have 5 containers now, one for each node
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(NUM_NM, schedulerApp1.getLiveContainers().size());
for (int i = 0; i < NUM_NM; i++) {
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(
rmNodes[i].getNodeID()), am1.getApplicationAttemptId(), 1);
}
// Submit app2 to queue A and asks for a 750MB container for AM (on n0)
RMApp app2 = rm1.submitApp(1024, "app", "user", null, "A");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Ask NUM_NM-1 * 1500MB containers
am2.allocate("*", 2048, NUM_NM-1, new ArrayList<>());
// Do allocation for n1-n4
for (int i = 1; i < NUM_NM; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// kill app1
rm1.killApp(app1.getApplicationId());
// Submit app3 to queue B and asks for a 5000MB container for AM (on n2)
RMApp app3 = rm1.submitApp(1024, "app", "user", null, "B");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
// Ask NUM_NM * 5000MB containers
am3.allocate("*", 5120, NUM_NM, new ArrayList<>());
// Do allocation for n0-n4
for (int i = 0; i < NUM_NM; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// Sleep the timeout interval, we should see 2 containers selected
Thread.sleep(1000);
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule();
// We should only allow to preempt 2 containers, on node1 and node2
Set<RMContainer> selectedToPreempt =
editPolicy.getToPreemptContainers().keySet();
Assert.assertEquals(2, selectedToPreempt.size());
List<NodeId> selectedToPreemptNodeIds = new ArrayList<>();
for (RMContainer rmc : selectedToPreempt) {
selectedToPreemptNodeIds.add(rmc.getAllocatedNode());
}
assertThat(selectedToPreemptNodeIds, CoreMatchers.hasItems(
mockNMs[1].getNodeId(), mockNMs[2].getNodeId()));
rm1.close();
}
@Test(timeout = 60000)
public void testPreemptionForFragmentatedCluster() throws Exception {