YARN-10903. Fix the headroom check in ParentQueue and RegularContainerAllocator for DRF (#3352)

Contributed by Jie Wang <jie.wang@hulu.com>
This commit is contained in:
Jack 2021-09-13 10:54:11 +08:00 committed by GitHub
parent edfde6eebc
commit d8026e387e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 16 deletions

View File

@ -1038,10 +1038,9 @@ public class ParentQueue extends AbstractCSQueue {
// Two conditions need to meet when trying to allocate: // Two conditions need to meet when trying to allocate:
// 1) Node doesn't have reserved container // 1) Node doesn't have reserved container
// 2) Node's available-resource + killable-resource should > 0 // 2) Node's available-resource + killable-resource should > 0
boolean accept = node.getReservedContainer() == null && Resources boolean accept = node.getReservedContainer() == null &&
.greaterThanOrEqual(resourceCalculator, clusterResource, Resources Resources.fitsIn(resourceCalculator, minimumAllocation,
.add(node.getUnallocatedResource(), Resources.add(node.getUnallocatedResource(), node.getTotalKillableResources()));
node.getTotalKillableResources()), minimumAllocation);
if (!accept) { if (!accept) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueuePath(), ActivityState.REJECTED, getParentName(), getQueuePath(), ActivityState.REJECTED,

View File

@ -74,9 +74,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
super(application, rc, rmContext, activitiesManager); super(application, rc, rmContext, activitiesManager);
} }
private boolean checkHeadroom(Resource clusterResource, private boolean checkHeadroom(ResourceLimits currentResourceLimits,
ResourceLimits currentResourceLimits, Resource required, Resource required, String nodePartition) {
String nodePartition) {
// If headroom + currentReservation < required, we cannot allocate this // If headroom + currentReservation < required, we cannot allocate this
// require // require
Resource resourceCouldBeUnReserved = Resource resourceCouldBeUnReserved =
@ -86,9 +85,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// we won't allow to unreserve before allocation. // we won't allow to unreserve before allocation.
resourceCouldBeUnReserved = Resources.none(); resourceCouldBeUnReserved = Resources.none();
} }
return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add( return Resources.fitsIn(rc, required,
currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), Resources.add(currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved));
required);
} }
/* /*
@ -97,8 +95,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
* We will consider stuffs like exclusivity, pending resource, node partition, * We will consider stuffs like exclusivity, pending resource, node partition,
* headroom, etc. * headroom, etc.
*/ */
private ContainerAllocation preCheckForNodeCandidateSet( private ContainerAllocation preCheckForNodeCandidateSet(FiCaSchedulerNode node,
Resource clusterResource, FiCaSchedulerNode node,
SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulingMode schedulingMode, ResourceLimits resourceLimits,
SchedulerRequestKey schedulerKey) { SchedulerRequestKey schedulerKey) {
PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey, PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
@ -168,8 +165,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
} }
} }
if (!checkHeadroom(clusterResource, resourceLimits, required, if (!checkHeadroom(resourceLimits, required, node.getPartition())) {
node.getPartition())) {
LOG.debug("cannot allocate required resource={} because of headroom", LOG.debug("cannot allocate required resource={} because of headroom",
required); required);
ActivitiesLogger.APP.recordAppActivityWithoutAllocation( ActivitiesLogger.APP.recordAppActivityWithoutAllocation(
@ -857,7 +853,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
FiCaSchedulerNode node = iter.next(); FiCaSchedulerNode node = iter.next();
if (reservedContainer == null) { if (reservedContainer == null) {
result = preCheckForNodeCandidateSet(clusterResource, node, result = preCheckForNodeCandidateSet(node,
schedulingMode, resourceLimits, schedulerKey); schedulingMode, resourceLimits, schedulerKey);
if (null != result) { if (null != result) {
continue; continue;

View File

@ -800,6 +800,62 @@ public class TestLeafQueue {
assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()), assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()),
a.getMetrics().getAvailableMB()); a.getMetrics().getAvailableMB());
} }
@Test
public void testHeadroomCheckWithDRF() throws Exception {
CSAssignment assignment;
setUpWithDominantResourceCalculator();
// Mock the queue
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
// Users
final String user0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, b,
b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app0, user0);
// Setup some nodes
String host0 = "127.0.0.1";
FiCaSchedulerNode node0 =
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 100 * GB, 100);
int numNodes = 1;
Resource clusterResource =
Resources.createResource(numNodes * (100 * GB), numNodes * 100);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Increase the user-limit-factor to make user_0 fully use max resources of the queue.
// The max resources can be used are 0.99 * [100 * GB, 100]
b.setUserLimitFactor(10.0f);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
ImmutableMap.of(app0.getApplicationAttemptId(), app0);
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(), node0);
Priority priority = TestUtils.createMockPriority(1);
app0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 90 * GB, 10, 1, true,
priority, recordFactory, NO_LABEL)));
assignment = b.assignContainers(clusterResource, node0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, b, nodes, apps);
verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
app0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 10 * GB, 10, 1, true,
priority, recordFactory, NO_LABEL)));
assignment = b.assignContainers(clusterResource, node0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
// This assignment should have no containers assigned,
// because the used memory (90 + 10)GB will exceed the max 99GB
verifyNoContainerAllocated(assignment);
}
@Test @Test
public void testDRFUsageRatioRounding() throws Exception { public void testDRFUsageRatioRounding() throws Exception {
CSAssignment assign; CSAssignment assign;