From 74fc13cf91818a70f434401244f7560c4db3a676 Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Mon, 29 Jun 2020 18:39:53 +0000 Subject: [PATCH] YARN-9903: Support reservations continue looking for Node Labels. Contributed by Jim Brennan (Jim_Brennan). --- .../scheduler/capacity/AbstractCSQueue.java | 10 +- .../scheduler/capacity/LeafQueue.java | 3 +- .../allocator/RegularContainerAllocator.java | 16 +- .../TestNodeLabelContainerAllocation.java | 289 ++++++++++++++++++ 4 files changed, 300 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 968d971ce1f..f1467a10626 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -1076,14 +1076,12 @@ public abstract class AbstractCSQueue implements CSQueue { if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, usedExceptKillable, currentLimitResource)) { - // if reservation continous looking enabled, check to see if could we + // if reservation continue looking enabled, check to see if could we // potentially use this node instead of a reserved node if the application // has reserved containers. - // TODO, now only consider reservation cases when the node has no label - if (this.reservationsContinueLooking && nodePartition.equals( - RMNodeLabelsManager.NO_LABEL) && Resources.greaterThan( - resourceCalculator, clusterResource, resourceCouldBeUnreserved, - Resources.none())) { + if (this.reservationsContinueLooking + && Resources.greaterThan(resourceCalculator, clusterResource, + resourceCouldBeUnreserved, Resources.none())) { // resource-without-reserved = used - reserved Resource newTotalWithoutReservedResource = Resources.subtract( usedExceptKillable, resourceCouldBeUnreserved); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 4d83538c981..05150a373ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1574,8 +1574,7 @@ public class LeafQueue extends AbstractCSQueue { user.getUsed(nodePartition), limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && nodePartition.equals( - CommonNodeLabelsManager.NO_LABEL)) { + if (this.reservationsContinueLooking) { if (Resources.lessThanOrEqual(resourceCalculator, clusterResource, Resources.subtract(user.getUsed(), application.getCurrentReservation()), limit)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 287dc67ded4..cced238b601 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -79,12 +79,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { String nodePartition) { // If headroom + currentReservation < required, we cannot allocate this // require - Resource resourceCouldBeUnReserved = application.getCurrentReservation(); - if (!application.getCSLeafQueue().getReservationContinueLooking() - || !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { - // If we don't allow reservation continuous looking, OR we're looking at - // non-default node partition, we won't allow to unreserve before - // allocation. + Resource resourceCouldBeUnReserved = + application.getAppAttemptResourceUsage().getReserved(nodePartition); + if (!application.getCSLeafQueue().getReservationContinueLooking()) { + // If we don't allow reservation continuous looking, + // we won't allow to unreserve before allocation. resourceCouldBeUnReserved = Resources.none(); } return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add( @@ -583,13 +582,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Allocate... // We will only do continuous reservation when this is not allocated from // reserved container - if (rmContainer == null && reservationsContinueLooking - && node.getLabels().isEmpty()) { + if (rmContainer == null && reservationsContinueLooking) { // when reservationsContinueLooking is set, we may need to unreserve // some containers to meet this queue, its parents', or the users' // resource limits. - // TODO, need change here when we want to support continuous reservation - // looking for labeled partitions. if (!shouldAllocOrReserveNewContainer || needToUnreserve) { if (!needToUnreserve) { // If we shouldn't allocate/reserve new container then we should diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 55f98d2ec80..4ac57dd809c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -642,6 +642,295 @@ public class TestNodeLabelContainerAllocation { rm1.close(); } + @Test (timeout = 120000) + public void testContainerReservationContinueLookingWithLabels() + throws Exception { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + toSet("x"), NodeId.newInstance("h2", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM( + TestUtils.getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + + ContainerId containerId; + + // launch an app to queue a1 (label = x) + MockRMAppSubmissionData data1 = + MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1) + .withAppName("app1") + .withUser("user") + .withAcls(null) + .withQueue("a1") + .withUnmanagedAM(false) + .withAmLabel("x") + .build(); + RMApp app1 = MockRMAppSubmitter.submit(rm1, data1); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1 + .getApplicationAttemptId()); + + // Verify live on node1 + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0); + Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(2 * GB, + leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + // request map containers for app1. + am1.allocate("*", 5 * GB, 2, 5, new ArrayList(), "x"); + + // Do node heartbeat to allocate first mapper on node1 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Verify live on node1 + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0); + Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(7 * GB, + leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + // Do node heartbeat to allocate second mapper on node2 + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // Verify live on node2 + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // node1 7 GB used, node2 5 GB used + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0); + Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(12 * GB, + leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + // request reducer containers for app1. + am1.allocate("*", 3 * GB, 2, 10, new ArrayList(), "x"); + + // Do node heartbeat to reserve reducer on node1 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // node1 7 GB used and 3 GB reserved, node2 5 GB used + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(15 * GB, + leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize()); + Assert.assertEquals(3 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + // Do node heartbeat to allocate container for second reducer on node2 + // This should unreserve the reserved container + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // Verify live on node2 + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // node1 7 GB used and 0 GB reserved, node2 8 GB used + Assert.assertEquals(4, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(0, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(15 * GB, + leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + rm1.close(); + } + + @Test (timeout = 120000) + public void testContainerReservationContinueLookingWithDefaultLabels() + throws Exception { + // This is the same as testContainerReservationContinueLookingWithLabels, + // but this test doesn't specify the label expression in the + // ResourceRequest, instead it uses default queue label expressions + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + toSet("x"), NodeId.newInstance("h2", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM( + TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + + ContainerId containerId; + + // launch an app to queue a1 (label = x) + MockRMAppSubmissionData data1 = + MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1) + .withAppName("app1") + .withUser("user") + .withAcls(null) + .withQueue("a1") + .withUnmanagedAM(false) + .build(); + RMApp app1 = MockRMAppSubmitter.submit(rm1, data1); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1 + .getApplicationAttemptId()); + + // Verify live on node1 + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0); + Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(2 * GB, + leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + // request map containers for app1. + am1.allocate("*", 5 * GB, 2, 5, new ArrayList(), null); + + // Do node heartbeat to allocate first mapper on node1 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Verify live on node1 + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0); + Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(7 * GB, + leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + // Do node heartbeat to allocate second mapper on node2 + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // Verify live on node2 + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // node1 7 GB used, node2 5 GB used + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0); + Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(12 * GB, + leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + // request reducer containers for app1. + am1.allocate("*", 3 * GB, 2, 10, new ArrayList(), null); + + // Do node heartbeat to reserve reducer on node1 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // node1 7 GB used and 3 GB reserved, node2 5 GB used + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(15 * GB, + leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize()); + Assert.assertEquals(3 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + // Do node heartbeat to allocate container for second reducer on node2 + // This should unreserve the reserved container + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // Verify live on node2 + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // node1 7 GB used and 0 GB reserved, node2 8 GB used + Assert.assertEquals(4, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(0, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(15 * GB, + leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize()); + Assert.assertEquals(0 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + rm1.close(); + } + @Test (timeout = 120000) public void testRMContainerLeakInLeafQueue() throws Exception { // set node -> label