YARN-9903: Support reservations continue looking for Node Labels. Contributed by Jim Brennan (Jim_Brennan).
(cherry picked from commit 74fc13cf91
)
This commit is contained in:
parent
3e9422d1c7
commit
d7696453a0
|
@ -1066,14 +1066,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
|
||||
usedExceptKillable, currentLimitResource)) {
|
||||
|
||||
// if reservation continous looking enabled, check to see if could we
|
||||
// if reservation continue looking enabled, check to see if could we
|
||||
// potentially use this node instead of a reserved node if the application
|
||||
// has reserved containers.
|
||||
// TODO, now only consider reservation cases when the node has no label
|
||||
if (this.reservationsContinueLooking && nodePartition.equals(
|
||||
RMNodeLabelsManager.NO_LABEL) && Resources.greaterThan(
|
||||
resourceCalculator, clusterResource, resourceCouldBeUnreserved,
|
||||
Resources.none())) {
|
||||
if (this.reservationsContinueLooking
|
||||
&& Resources.greaterThan(resourceCalculator, clusterResource,
|
||||
resourceCouldBeUnreserved, Resources.none())) {
|
||||
// resource-without-reserved = used - reserved
|
||||
Resource newTotalWithoutReservedResource = Resources.subtract(
|
||||
usedExceptKillable, resourceCouldBeUnreserved);
|
||||
|
|
|
@ -1574,8 +1574,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
user.getUsed(nodePartition), limit)) {
|
||||
// if enabled, check to see if could we potentially use this node instead
|
||||
// of a reserved node if the application has reserved containers
|
||||
if (this.reservationsContinueLooking && nodePartition.equals(
|
||||
CommonNodeLabelsManager.NO_LABEL)) {
|
||||
if (this.reservationsContinueLooking) {
|
||||
if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
|
||||
Resources.subtract(user.getUsed(),
|
||||
application.getCurrentReservation()), limit)) {
|
||||
|
|
|
@ -79,12 +79,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|||
String nodePartition) {
|
||||
// If headroom + currentReservation < required, we cannot allocate this
|
||||
// require
|
||||
Resource resourceCouldBeUnReserved = application.getCurrentReservation();
|
||||
if (!application.getCSLeafQueue().getReservationContinueLooking()
|
||||
|| !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
// If we don't allow reservation continuous looking, OR we're looking at
|
||||
// non-default node partition, we won't allow to unreserve before
|
||||
// allocation.
|
||||
Resource resourceCouldBeUnReserved =
|
||||
application.getAppAttemptResourceUsage().getReserved(nodePartition);
|
||||
if (!application.getCSLeafQueue().getReservationContinueLooking()) {
|
||||
// If we don't allow reservation continuous looking,
|
||||
// we won't allow to unreserve before allocation.
|
||||
resourceCouldBeUnReserved = Resources.none();
|
||||
}
|
||||
return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
|
||||
|
@ -583,13 +582,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|||
// Allocate...
|
||||
// We will only do continuous reservation when this is not allocated from
|
||||
// reserved container
|
||||
if (rmContainer == null && reservationsContinueLooking
|
||||
&& node.getLabels().isEmpty()) {
|
||||
if (rmContainer == null && reservationsContinueLooking) {
|
||||
// when reservationsContinueLooking is set, we may need to unreserve
|
||||
// some containers to meet this queue, its parents', or the users'
|
||||
// resource limits.
|
||||
// TODO, need change here when we want to support continuous reservation
|
||||
// looking for labeled partitions.
|
||||
if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
|
||||
if (!needToUnreserve) {
|
||||
// If we shouldn't allocate/reserve new container then we should
|
||||
|
|
|
@ -642,6 +642,295 @@ public class TestNodeLabelContainerAllocation {
|
|||
rm1.close();
|
||||
}
|
||||
|
||||
@Test (timeout = 120000)
|
||||
public void testContainerReservationContinueLookingWithLabels()
|
||||
throws Exception {
|
||||
// set node -> label
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
|
||||
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
|
||||
toSet("x"), NodeId.newInstance("h2", 0), toSet("x")));
|
||||
|
||||
// inject node label manager
|
||||
MockRM rm1 = new MockRM(
|
||||
TestUtils.getConfigurationWithQueueLabels(conf)) {
|
||||
@Override
|
||||
public RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
|
||||
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||
rm1.start();
|
||||
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
|
||||
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
||||
|
||||
ContainerId containerId;
|
||||
|
||||
// launch an app to queue a1 (label = x)
|
||||
MockRMAppSubmissionData data1 =
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
|
||||
.withAppName("app1")
|
||||
.withUser("user")
|
||||
.withAcls(null)
|
||||
.withQueue("a1")
|
||||
.withUnmanagedAM(false)
|
||||
.withAmLabel("x")
|
||||
.build();
|
||||
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
|
||||
.getApplicationAttemptId());
|
||||
|
||||
// Verify live on node1
|
||||
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h1");
|
||||
|
||||
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
||||
Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getReserved("x").getMemorySize());
|
||||
Assert.assertEquals(2 * GB,
|
||||
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB,
|
||||
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
||||
|
||||
// request map containers for app1.
|
||||
am1.allocate("*", 5 * GB, 2, 5, new ArrayList<ContainerId>(), "x");
|
||||
|
||||
// Do node heartbeat to allocate first mapper on node1
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||
|
||||
// Verify live on node1
|
||||
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h1");
|
||||
|
||||
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
||||
Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getReserved("x").getMemorySize());
|
||||
Assert.assertEquals(7 * GB,
|
||||
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB,
|
||||
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
||||
|
||||
// Do node heartbeat to allocate second mapper on node2
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
||||
|
||||
// Verify live on node2
|
||||
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h2");
|
||||
|
||||
// node1 7 GB used, node2 5 GB used
|
||||
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
||||
Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getReserved("x").getMemorySize());
|
||||
Assert.assertEquals(12 * GB,
|
||||
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB,
|
||||
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
||||
|
||||
// request reducer containers for app1.
|
||||
am1.allocate("*", 3 * GB, 2, 10, new ArrayList<ContainerId>(), "x");
|
||||
|
||||
// Do node heartbeat to reserve reducer on node1
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||
|
||||
// node1 7 GB used and 3 GB reserved, node2 5 GB used
|
||||
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
|
||||
Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getReserved("x").getMemorySize());
|
||||
Assert.assertEquals(15 * GB,
|
||||
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(3 * GB,
|
||||
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
||||
|
||||
// Do node heartbeat to allocate container for second reducer on node2
|
||||
// This should unreserve the reserved container
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
||||
|
||||
// Verify live on node2
|
||||
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h2");
|
||||
|
||||
// node1 7 GB used and 0 GB reserved, node2 8 GB used
|
||||
Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
|
||||
Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getReserved("x").getMemorySize());
|
||||
Assert.assertEquals(15 * GB,
|
||||
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB,
|
||||
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
||||
|
||||
rm1.close();
|
||||
}
|
||||
|
||||
@Test (timeout = 120000)
|
||||
public void testContainerReservationContinueLookingWithDefaultLabels()
|
||||
throws Exception {
|
||||
// This is the same as testContainerReservationContinueLookingWithLabels,
|
||||
// but this test doesn't specify the label expression in the
|
||||
// ResourceRequest, instead it uses default queue label expressions
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
|
||||
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
|
||||
toSet("x"), NodeId.newInstance("h2", 0), toSet("x")));
|
||||
|
||||
// inject node label manager
|
||||
MockRM rm1 = new MockRM(
|
||||
TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
|
||||
@Override
|
||||
public RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
|
||||
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||
rm1.start();
|
||||
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
|
||||
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
||||
|
||||
ContainerId containerId;
|
||||
|
||||
// launch an app to queue a1 (label = x)
|
||||
MockRMAppSubmissionData data1 =
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
|
||||
.withAppName("app1")
|
||||
.withUser("user")
|
||||
.withAcls(null)
|
||||
.withQueue("a1")
|
||||
.withUnmanagedAM(false)
|
||||
.build();
|
||||
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
|
||||
.getApplicationAttemptId());
|
||||
|
||||
// Verify live on node1
|
||||
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h1");
|
||||
|
||||
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
||||
Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getReserved("x").getMemorySize());
|
||||
Assert.assertEquals(2 * GB,
|
||||
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB,
|
||||
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
||||
|
||||
// request map containers for app1.
|
||||
am1.allocate("*", 5 * GB, 2, 5, new ArrayList<ContainerId>(), null);
|
||||
|
||||
// Do node heartbeat to allocate first mapper on node1
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||
|
||||
// Verify live on node1
|
||||
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h1");
|
||||
|
||||
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
||||
Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getReserved("x").getMemorySize());
|
||||
Assert.assertEquals(7 * GB,
|
||||
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB,
|
||||
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
||||
|
||||
// Do node heartbeat to allocate second mapper on node2
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
||||
|
||||
// Verify live on node2
|
||||
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h2");
|
||||
|
||||
// node1 7 GB used, node2 5 GB used
|
||||
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
||||
Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getReserved("x").getMemorySize());
|
||||
Assert.assertEquals(12 * GB,
|
||||
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB,
|
||||
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
||||
|
||||
// request reducer containers for app1.
|
||||
am1.allocate("*", 3 * GB, 2, 10, new ArrayList<ContainerId>(), null);
|
||||
|
||||
// Do node heartbeat to reserve reducer on node1
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||
|
||||
// node1 7 GB used and 3 GB reserved, node2 5 GB used
|
||||
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
|
||||
Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getReserved("x").getMemorySize());
|
||||
Assert.assertEquals(15 * GB,
|
||||
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(3 * GB,
|
||||
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
||||
|
||||
// Do node heartbeat to allocate container for second reducer on node2
|
||||
// This should unreserve the reserved container
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
||||
|
||||
// Verify live on node2
|
||||
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h2");
|
||||
|
||||
// node1 7 GB used and 0 GB reserved, node2 8 GB used
|
||||
Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
|
||||
Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||
.getReserved("x").getMemorySize());
|
||||
Assert.assertEquals(15 * GB,
|
||||
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
||||
Assert.assertEquals(0 * GB,
|
||||
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
||||
|
||||
rm1.close();
|
||||
}
|
||||
|
||||
@Test (timeout = 120000)
|
||||
public void testRMContainerLeakInLeafQueue() throws Exception {
|
||||
// set node -> label
|
||||
|
|
Loading…
Reference in New Issue