YARN-4781. Support intra-queue preemption for fairness ordering policy. Contributed by Eric Payne.
This commit is contained in:
parent
61df174e8b
commit
7c343669ba
|
@ -34,6 +34,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAFairOrderingComparator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
@ -263,8 +266,17 @@ public class FifoIntraQueuePreemptionPlugin
|
||||||
Resource queueReassignableResource,
|
Resource queueReassignableResource,
|
||||||
PriorityQueue<TempAppPerPartition> orderedByPriority) {
|
PriorityQueue<TempAppPerPartition> orderedByPriority) {
|
||||||
|
|
||||||
Comparator<TempAppPerPartition> reverseComp = Collections
|
Comparator<TempAppPerPartition> reverseComp;
|
||||||
.reverseOrder(new TAPriorityComparator());
|
OrderingPolicy<FiCaSchedulerApp> queueOrderingPolicy =
|
||||||
|
tq.leafQueue.getOrderingPolicy();
|
||||||
|
if (queueOrderingPolicy instanceof FairOrderingPolicy
|
||||||
|
&& (context.getIntraQueuePreemptionOrderPolicy()
|
||||||
|
== IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
|
||||||
|
reverseComp = Collections.reverseOrder(
|
||||||
|
new TAFairOrderingComparator(this.rc, clusterResource));
|
||||||
|
} else {
|
||||||
|
reverseComp = Collections.reverseOrder(new TAPriorityComparator());
|
||||||
|
}
|
||||||
TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
|
TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
|
||||||
|
|
||||||
String partition = tq.partition;
|
String partition = tq.partition;
|
||||||
|
@ -355,7 +367,16 @@ public class FifoIntraQueuePreemptionPlugin
|
||||||
TempQueuePerPartition tq, Collection<FiCaSchedulerApp> apps,
|
TempQueuePerPartition tq, Collection<FiCaSchedulerApp> apps,
|
||||||
Resource clusterResource,
|
Resource clusterResource,
|
||||||
Map<String, Resource> perUserAMUsed) {
|
Map<String, Resource> perUserAMUsed) {
|
||||||
TAPriorityComparator taComparator = new TAPriorityComparator();
|
Comparator<TempAppPerPartition> taComparator;
|
||||||
|
OrderingPolicy<FiCaSchedulerApp> orderingPolicy =
|
||||||
|
tq.leafQueue.getOrderingPolicy();
|
||||||
|
if (orderingPolicy instanceof FairOrderingPolicy
|
||||||
|
&& (context.getIntraQueuePreemptionOrderPolicy()
|
||||||
|
== IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
|
||||||
|
taComparator = new TAFairOrderingComparator(this.rc, clusterResource);
|
||||||
|
} else {
|
||||||
|
taComparator = new TAPriorityComparator();
|
||||||
|
}
|
||||||
PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
|
PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
|
||||||
100, taComparator);
|
100, taComparator);
|
||||||
|
|
||||||
|
@ -393,13 +414,12 @@ public class FifoIntraQueuePreemptionPlugin
|
||||||
// Set ideal allocation of app as 0.
|
// Set ideal allocation of app as 0.
|
||||||
tmpApp.idealAssigned = Resources.createResource(0, 0);
|
tmpApp.idealAssigned = Resources.createResource(0, 0);
|
||||||
|
|
||||||
orderedByPriority.add(tmpApp);
|
|
||||||
|
|
||||||
// Create a TempUserPerPartition structure to hold more information
|
// Create a TempUserPerPartition structure to hold more information
|
||||||
// regarding each user's entities such as UserLimit etc. This could
|
// regarding each user's entities such as UserLimit etc. This could
|
||||||
// be kept in a user to TempUserPerPartition map for further reference.
|
// be kept in a user to TempUserPerPartition map for further reference.
|
||||||
String userName = app.getUser();
|
String userName = app.getUser();
|
||||||
if (!usersPerPartition.containsKey(userName)) {
|
TempUserPerPartition tmpUser = usersPerPartition.get(userName);
|
||||||
|
if (tmpUser == null) {
|
||||||
ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
|
ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
|
||||||
.getResourceUsage();
|
.getResourceUsage();
|
||||||
|
|
||||||
|
@ -409,7 +429,7 @@ public class FifoIntraQueuePreemptionPlugin
|
||||||
amUsed = (userSpecificAmUsed == null)
|
amUsed = (userSpecificAmUsed == null)
|
||||||
? Resources.none() : userSpecificAmUsed;
|
? Resources.none() : userSpecificAmUsed;
|
||||||
|
|
||||||
TempUserPerPartition tmpUser = new TempUserPerPartition(
|
tmpUser = new TempUserPerPartition(
|
||||||
tq.leafQueue.getUser(userName), tq.queueName,
|
tq.leafQueue.getUser(userName), tq.queueName,
|
||||||
Resources.clone(userResourceUsage.getUsed(partition)),
|
Resources.clone(userResourceUsage.getUsed(partition)),
|
||||||
Resources.clone(amUsed),
|
Resources.clone(amUsed),
|
||||||
|
@ -432,7 +452,10 @@ public class FifoIntraQueuePreemptionPlugin
|
||||||
tmpUser.idealAssigned = Resources.createResource(0, 0);
|
tmpUser.idealAssigned = Resources.createResource(0, 0);
|
||||||
tq.addUserPerPartition(userName, tmpUser);
|
tq.addUserPerPartition(userName, tmpUser);
|
||||||
}
|
}
|
||||||
|
tmpApp.setTempUserPerPartition(tmpUser);
|
||||||
|
orderedByPriority.add(tmpApp);
|
||||||
}
|
}
|
||||||
|
|
||||||
return orderedByPriority;
|
return orderedByPriority;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
@ -64,6 +66,44 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Order first by amount used from least to most. Then order from oldest to
|
||||||
|
* youngest if amount used is the same.
|
||||||
|
*/
|
||||||
|
static class TAFairOrderingComparator
|
||||||
|
implements Comparator<TempAppPerPartition> {
|
||||||
|
|
||||||
|
private ResourceCalculator rc;
|
||||||
|
private Resource clusterRes;
|
||||||
|
|
||||||
|
TAFairOrderingComparator(ResourceCalculator rc, Resource clusterRes) {
|
||||||
|
this.rc = rc;
|
||||||
|
this.clusterRes = clusterRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) {
|
||||||
|
if (ta1.getUser().equals(ta2.getUser())) {
|
||||||
|
AbstractComparatorOrderingPolicy<FiCaSchedulerApp> acop =
|
||||||
|
(AbstractComparatorOrderingPolicy<FiCaSchedulerApp>)
|
||||||
|
ta1.getFiCaSchedulerApp().getCSLeafQueue().getOrderingPolicy();
|
||||||
|
return acop.getComparator()
|
||||||
|
.compare(ta1.getFiCaSchedulerApp(), ta2.getFiCaSchedulerApp());
|
||||||
|
} else {
|
||||||
|
Resource usedByUser1 = ta1.getTempUserPerPartition().getUsedDeductAM();
|
||||||
|
Resource usedByUser2 = ta2.getTempUserPerPartition().getUsedDeductAM();
|
||||||
|
if (Resources.equals(usedByUser1, usedByUser2)) {
|
||||||
|
return ta1.getApplicationId().compareTo(ta2.getApplicationId());
|
||||||
|
}
|
||||||
|
if (Resources.lessThan(rc, clusterRes, usedByUser1, usedByUser2)) {
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null;
|
IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null;
|
||||||
final CapacitySchedulerPreemptionContext context;
|
final CapacitySchedulerPreemptionContext context;
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,7 @@ public class TempAppPerPartition extends AbstractPreemptionEntity {
|
||||||
// Following fields are settled and used by candidate selection policies
|
// Following fields are settled and used by candidate selection policies
|
||||||
private final int priority;
|
private final int priority;
|
||||||
private final ApplicationId applicationId;
|
private final ApplicationId applicationId;
|
||||||
|
private TempUserPerPartition tempUser;
|
||||||
|
|
||||||
FiCaSchedulerApp app;
|
FiCaSchedulerApp app;
|
||||||
|
|
||||||
|
@ -102,4 +103,12 @@ public class TempAppPerPartition extends AbstractPreemptionEntity {
|
||||||
Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
|
Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setTempUserPerPartition(TempUserPerPartition tu) {
|
||||||
|
tempUser = tu;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TempUserPerPartition getTempUserPerPartition() {
|
||||||
|
return tempUser;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
||||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -89,7 +88,6 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public Comparator<SchedulableEntity> getComparator() {
|
public Comparator<SchedulableEntity> getComparator() {
|
||||||
return comparator;
|
return comparator;
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
@ -64,6 +65,7 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -337,9 +339,11 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
|
||||||
.thenReturn(pendingForDefaultPartition);
|
.thenReturn(pendingForDefaultPartition);
|
||||||
|
|
||||||
// need to set pending resource in resource usage as well
|
// need to set pending resource in resource usage as well
|
||||||
ResourceUsage ru = new ResourceUsage();
|
ResourceUsage ru = Mockito.spy(new ResourceUsage());
|
||||||
ru.setUsed(label, used);
|
ru.setUsed(label, used);
|
||||||
|
when(ru.getCachedUsed(anyString())).thenReturn(used);
|
||||||
when(app.getAppAttemptResourceUsage()).thenReturn(ru);
|
when(app.getAppAttemptResourceUsage()).thenReturn(ru);
|
||||||
|
when(app.getSchedulingResourceUsage()).thenReturn(ru);
|
||||||
|
|
||||||
start = end + 1;
|
start = end + 1;
|
||||||
}
|
}
|
||||||
|
@ -637,6 +641,12 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
|
||||||
when(leafQueue.getApplications()).thenReturn(apps);
|
when(leafQueue.getApplications()).thenReturn(apps);
|
||||||
when(leafQueue.getAllApplications()).thenReturn(apps);
|
when(leafQueue.getAllApplications()).thenReturn(apps);
|
||||||
OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
|
OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
|
||||||
|
String opName = conf.get(CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q)
|
||||||
|
+ ".ordering-policy", "fifo");
|
||||||
|
if (opName.equals("fair")) {
|
||||||
|
so = Mockito.spy(new FairOrderingPolicy<FiCaSchedulerApp>());
|
||||||
|
}
|
||||||
when(so.getPreemptionIterator()).thenAnswer(new Answer() {
|
when(so.getPreemptionIterator()).thenAnswer(new Answer() {
|
||||||
public Object answer(InvocationOnMock invocation) {
|
public Object answer(InvocationOnMock invocation) {
|
||||||
return apps.descendingIterator();
|
return apps.descendingIterator();
|
||||||
|
|
|
@ -0,0 +1,276 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.argThat;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test class for testing intra-queue preemption when the fair ordering policy
|
||||||
|
* is enabled on a capacity queue.
|
||||||
|
*/
|
||||||
|
public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
|
||||||
|
extends ProportionalCapacityPreemptionPolicyMockFramework {
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
super.setup();
|
||||||
|
conf.setBoolean(
|
||||||
|
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
|
||||||
|
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When the capacity scheduler fair ordering policy is enabled, preempt first
|
||||||
|
* from the application owned by the user that is the farthest over their
|
||||||
|
* user limit.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIntraQueuePreemptionFairOrderingPolicyEnabledOneAppPerUser()
|
||||||
|
throws IOException {
|
||||||
|
// Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a
|
||||||
|
conf.set(CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair");
|
||||||
|
// Make sure all containers will be preempted in a single round.
|
||||||
|
conf.setFloat(CapacitySchedulerConfiguration.
|
||||||
|
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
||||||
|
(float) 1.0);
|
||||||
|
|
||||||
|
String labelsConfig = "=100,true;";
|
||||||
|
String nodesConfig = // n1 has no label
|
||||||
|
"n1= res=100";
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending,reserved
|
||||||
|
"root(=[100 100 100 1 0]);" + // root
|
||||||
|
"-a(=[100 100 100 1 0])"; // a
|
||||||
|
|
||||||
|
// user1/app1 has 60 resources in queue a
|
||||||
|
// user2/app2 has 40 resources in queue a
|
||||||
|
// user3/app3 is requesting 20 resources in queue a
|
||||||
|
// With 3 users, preemptable user limit should be around 35 resources each.
|
||||||
|
// With FairOrderingPolicy enabled on queue a, all 20 resources should be
|
||||||
|
// preempted from app1
|
||||||
|
String appsConfig =
|
||||||
|
// queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user)
|
||||||
|
"a\t" // app1, user1 in a
|
||||||
|
+ "(1,1,n1,,60,false,0,user1);" +
|
||||||
|
"a\t" // app2, user2 in a
|
||||||
|
+ "(1,1,n1,,40,false,0,user2);" +
|
||||||
|
"a\t" // app3, user3 in a
|
||||||
|
+ "(1,1,n1,,0,false,20,user3)"
|
||||||
|
;
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
verify(mDisp, times(20)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(1))));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When the capacity scheduler fifo ordering policy is enabled, preempt first
|
||||||
|
* from the youngest application until reduced to user limit, then preempt
|
||||||
|
* from next youngest app.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIntraQueuePreemptionFifoOrderingPolicyEnabled()
|
||||||
|
throws IOException {
|
||||||
|
// Enable FifoOrderingPolicy for yarn.scheduler.capacity.root.a
|
||||||
|
conf.set(CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo");
|
||||||
|
// Make sure all containers will be preempted in a single round.
|
||||||
|
conf.setFloat(CapacitySchedulerConfiguration.
|
||||||
|
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
||||||
|
(float) 1.0);
|
||||||
|
|
||||||
|
String labelsConfig = "=100,true;";
|
||||||
|
String nodesConfig = // n1 has no label
|
||||||
|
"n1= res=100";
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending,reserved
|
||||||
|
"root(=[100 100 100 1 0]);" + // root
|
||||||
|
"-a(=[100 100 100 1 0])"; // a
|
||||||
|
|
||||||
|
// user1/app1 has 60 resources in queue a
|
||||||
|
// user2/app2 has 40 resources in queue a
|
||||||
|
// user3/app3 is requesting 20 resources in queue a
|
||||||
|
// With 3 users, preemptable user limit should be around 35 resources each.
|
||||||
|
// With FifoOrderingPolicy enabled on queue a, the first 5 should come from
|
||||||
|
// the youngest app, app2, until app2 is reduced to the user limit of 35.
|
||||||
|
String appsConfig =
|
||||||
|
// queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user)
|
||||||
|
"a\t" // app1, user1 in a
|
||||||
|
+ "(1,1,n1,,60,false,0,user1);" +
|
||||||
|
"a\t" // app2, user2 in a
|
||||||
|
+ "(1,1,n1,,40,false,0,user2);" +
|
||||||
|
"a\t" // app3, user3 in a
|
||||||
|
+ "(1,1,n1,,0,false,5,user3)"
|
||||||
|
;
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
verify(mDisp, times(5)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(2))));
|
||||||
|
|
||||||
|
// user1/app1 has 60 resources in queue a
|
||||||
|
// user2/app2 has 35 resources in queue a
|
||||||
|
// user3/app3 has 5 resources and is requesting 15 resources in queue a
|
||||||
|
// With 3 users, preemptable user limit should be around 35 resources each.
|
||||||
|
// The next 15 should come from app1 even though app2 is younger since app2
|
||||||
|
// has already been reduced to its user limit.
|
||||||
|
appsConfig =
|
||||||
|
// queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user)
|
||||||
|
"a\t" // app1, user1 in a
|
||||||
|
+ "(1,1,n1,,60,false,0,user1);" +
|
||||||
|
"a\t" // app2, user2 in a
|
||||||
|
+ "(1,1,n1,,35,false,0,user2);" +
|
||||||
|
"a\t" // app3, user3 in a
|
||||||
|
+ "(1,1,n1,,5,false,15,user3)"
|
||||||
|
;
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
verify(mDisp, times(15)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(1))));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When the capacity scheduler fair ordering policy is enabled, preempt first
|
||||||
|
* from the youngest application from the user that is the farthest over their
|
||||||
|
* user limit.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser()
|
||||||
|
throws IOException {
|
||||||
|
// Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a
|
||||||
|
conf.set(CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair");
|
||||||
|
// Make sure all containers will be preempted in a single round.
|
||||||
|
conf.setFloat(CapacitySchedulerConfiguration.
|
||||||
|
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
||||||
|
(float) 1.0);
|
||||||
|
|
||||||
|
String labelsConfig = "=100,true;";
|
||||||
|
String nodesConfig = // n1 has no label
|
||||||
|
"n1= res=100";
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending,reserved
|
||||||
|
"root(=[100 100 100 1 0]);" + // root
|
||||||
|
"-a(=[100 100 100 1 0])"; // a
|
||||||
|
|
||||||
|
// user1/app1 has 35 resources in queue a
|
||||||
|
// user1/app2 has 25 resources in queue a
|
||||||
|
// user2/app3 has 40 resources in queue a
|
||||||
|
// user3/app4 is requesting 20 resources in queue a
|
||||||
|
// With 3 users, preemptable user limit should be around 35 resources each.
|
||||||
|
// With FairOrderingPolicy enabled on queue a, all 20 resources should be
|
||||||
|
// preempted from app1 since it's the most over served app from the most
|
||||||
|
// over served user
|
||||||
|
String appsConfig =
|
||||||
|
// queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user)
|
||||||
|
"a\t" // app1 and app2, user1 in a
|
||||||
|
+ "(1,1,n1,,35,false,0,user1);" +
|
||||||
|
"a\t"
|
||||||
|
+ "(1,1,n1,,25,false,0,user1);" +
|
||||||
|
"a\t" // app3, user2 in a
|
||||||
|
+ "(1,1,n1,,40,false,0,user2);" +
|
||||||
|
"a\t" // app4, user3 in a
|
||||||
|
+ "(1,1,n1,,0,false,20,user3)"
|
||||||
|
;
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
verify(mDisp, times(20)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(1))));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When the capacity scheduler fifo ordering policy is enabled and a user has
|
||||||
|
* multiple apps, preempt first from the youngest application.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser()
|
||||||
|
throws IOException {
|
||||||
|
// Enable FifoOrderingPolicy for yarn.scheduler.capacity.root.a
|
||||||
|
conf.set(CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo");
|
||||||
|
// Make sure all containers will be preempted in a single round.
|
||||||
|
conf.setFloat(CapacitySchedulerConfiguration.
|
||||||
|
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
||||||
|
(float) 1.0);
|
||||||
|
|
||||||
|
String labelsConfig = "=100,true;";
|
||||||
|
String nodesConfig = // n1 has no label
|
||||||
|
"n1= res=100";
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending,reserved
|
||||||
|
"root(=[100 100 100 1 0]);" + // root
|
||||||
|
"-a(=[100 100 100 1 0])"; // a
|
||||||
|
|
||||||
|
// user1/app1 has 40 resources in queue a
|
||||||
|
// user1/app2 has 20 resources in queue a
|
||||||
|
// user3/app3 has 40 resources in queue a
|
||||||
|
// user4/app4 is requesting 20 resources in queue a
|
||||||
|
// With 3 users, preemptable user limit should be around 35 resources each.
|
||||||
|
String appsConfig =
|
||||||
|
// queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user)
|
||||||
|
"a\t" // app1, user1 in a
|
||||||
|
+ "(1,1,n1,,40,false,0,user1);" +
|
||||||
|
"a\t" // app2, user1 in a
|
||||||
|
+ "(1,1,n1,,20,false,0,user1);" +
|
||||||
|
"a\t" // app3, user3 in a
|
||||||
|
+ "(1,1,n1,,40,false,0,user3);" +
|
||||||
|
"a\t" // app4, user4 in a
|
||||||
|
+ "(1,1,n1,,0,false,25,user4)"
|
||||||
|
;
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
// app3 is the younges and also over its user limit. 5 should be preempted
|
||||||
|
// from app3 until it comes down to user3's user limit.
|
||||||
|
verify(mDisp, times(5)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(3))));
|
||||||
|
|
||||||
|
// User1's app2 is its youngest. 19 should be preempted from app2, leaving
|
||||||
|
// only the AM
|
||||||
|
verify(mDisp, times(19)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(2))));
|
||||||
|
|
||||||
|
// Preempt the remaining resource from User1's oldest app1.
|
||||||
|
verify(mDisp, times(1)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(1))));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue