diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 1776bd4d946..572fb3d5773 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAFairOrderingComparator; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -263,8 +266,17 @@ private TreeSet calculateIdealAssignedResourcePerApp( Resource queueReassignableResource, PriorityQueue orderedByPriority) { - Comparator reverseComp = Collections - .reverseOrder(new TAPriorityComparator()); + Comparator reverseComp; + OrderingPolicy queueOrderingPolicy = + tq.leafQueue.getOrderingPolicy(); + if (queueOrderingPolicy instanceof FairOrderingPolicy + && (context.getIntraQueuePreemptionOrderPolicy() + == IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) { + reverseComp = Collections.reverseOrder( + new TAFairOrderingComparator(this.rc, clusterResource)); + } else { + reverseComp = Collections.reverseOrder(new TAPriorityComparator()); + } TreeSet orderedApps = new TreeSet<>(reverseComp); String partition = tq.partition; @@ -355,7 +367,16 @@ private PriorityQueue createTempAppForResCalculation( TempQueuePerPartition tq, Collection apps, Resource clusterResource, Map perUserAMUsed) { - TAPriorityComparator taComparator = new TAPriorityComparator(); + Comparator taComparator; + OrderingPolicy orderingPolicy = + tq.leafQueue.getOrderingPolicy(); + if (orderingPolicy instanceof FairOrderingPolicy + && (context.getIntraQueuePreemptionOrderPolicy() + == IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) { + taComparator = new TAFairOrderingComparator(this.rc, clusterResource); + } else { + taComparator = new TAPriorityComparator(); + } PriorityQueue orderedByPriority = new PriorityQueue<>( 100, taComparator); @@ -393,13 +414,12 @@ private PriorityQueue createTempAppForResCalculation( // Set ideal allocation of app as 0. tmpApp.idealAssigned = Resources.createResource(0, 0); - orderedByPriority.add(tmpApp); - // Create a TempUserPerPartition structure to hold more information // regarding each user's entities such as UserLimit etc. This could // be kept in a user to TempUserPerPartition map for further reference. String userName = app.getUser(); - if (!usersPerPartition.containsKey(userName)) { + TempUserPerPartition tmpUser = usersPerPartition.get(userName); + if (tmpUser == null) { ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName) .getResourceUsage(); @@ -409,7 +429,7 @@ private PriorityQueue createTempAppForResCalculation( amUsed = (userSpecificAmUsed == null) ? Resources.none() : userSpecificAmUsed; - TempUserPerPartition tmpUser = new TempUserPerPartition( + tmpUser = new TempUserPerPartition( tq.leafQueue.getUser(userName), tq.queueName, Resources.clone(userResourceUsage.getUsed(partition)), Resources.clone(amUsed), @@ -432,7 +452,10 @@ private PriorityQueue createTempAppForResCalculation( tmpUser.idealAssigned = Resources.createResource(0, 0); tq.addUserPerPartition(userName, tmpUser); } + tmpApp.setTempUserPerPartition(tmpUser); + orderedByPriority.add(tmpApp); } + return orderedByPriority; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 5b6932e6403..3e580735e2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.io.Serializable; @@ -64,6 +66,44 @@ public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { } } + /* + * Order first by amount used from least to most. Then order from oldest to + * youngest if amount used is the same. + */ + static class TAFairOrderingComparator + implements Comparator { + + private ResourceCalculator rc; + private Resource clusterRes; + + TAFairOrderingComparator(ResourceCalculator rc, Resource clusterRes) { + this.rc = rc; + this.clusterRes = clusterRes; + } + + @Override + public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { + if (ta1.getUser().equals(ta2.getUser())) { + AbstractComparatorOrderingPolicy acop = + (AbstractComparatorOrderingPolicy) + ta1.getFiCaSchedulerApp().getCSLeafQueue().getOrderingPolicy(); + return acop.getComparator() + .compare(ta1.getFiCaSchedulerApp(), ta2.getFiCaSchedulerApp()); + } else { + Resource usedByUser1 = ta1.getTempUserPerPartition().getUsedDeductAM(); + Resource usedByUser2 = ta2.getTempUserPerPartition().getUsedDeductAM(); + if (Resources.equals(usedByUser1, usedByUser2)) { + return ta1.getApplicationId().compareTo(ta2.getApplicationId()); + } + if (Resources.lessThan(rc, clusterRes, usedByUser1, usedByUser2)) { + return -1; + } else { + return 1; + } + } + } + } + IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null; final CapacitySchedulerPreemptionContext context; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java index e9a934b8403..05d8096a273 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -34,6 +34,7 @@ public class TempAppPerPartition extends AbstractPreemptionEntity { // Following fields are settled and used by candidate selection policies private final int priority; private final ApplicationId applicationId; + private TempUserPerPartition tempUser; FiCaSchedulerApp app; @@ -102,4 +103,12 @@ public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); } } + + public void setTempUserPerPartition(TempUserPerPartition tu) { + tempUser = tu; + } + + public TempUserPerPartition getTempUserPerPartition() { + return tempUser; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index b7cb1bff693..09dd3bf3cc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import com.google.common.annotations.VisibleForTesting; /** @@ -89,7 +88,6 @@ protected void entityRequiresReordering(S schedulableEntity) { } } - @VisibleForTesting public Comparator getComparator() { return comparator; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 1d56a816312..d44033c709a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -60,6 +61,7 @@ import org.junit.Assert; import org.junit.Before; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -305,9 +307,11 @@ public Integer answer(InvocationOnMock invocation) throws Throwable { .thenReturn(pendingForDefaultPartition); // need to set pending resource in resource usage as well - ResourceUsage ru = new ResourceUsage(); + ResourceUsage ru = Mockito.spy(new ResourceUsage()); ru.setUsed(label, used); + when(ru.getCachedUsed(anyString())).thenReturn(used); when(app.getAppAttemptResourceUsage()).thenReturn(ru); + when(app.getSchedulingResourceUsage()).thenReturn(ru); start = end + 1; } @@ -593,6 +597,12 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { when(leafQueue.getApplications()).thenReturn(apps); when(leafQueue.getAllApplications()).thenReturn(apps); OrderingPolicy so = mock(OrderingPolicy.class); + String opName = conf.get(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q) + + ".ordering-policy", "fifo"); + if (opName.equals("fair")) { + so = Mockito.spy(new FairOrderingPolicy()); + } when(so.getPreemptionIterator()).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { return apps.descendingIterator(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java new file mode 100644 index 00000000000..1678651169c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.Before; +import org.junit.Test; + +/* + * Test class for testing intra-queue preemption when the fair ordering policy + * is enabled on a capacity queue. + */ +public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering + extends ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + /* + * When the capacity scheduler fair ordering policy is enabled, preempt first + * from the application owned by the user that is the farthest over their + * user limit. + */ + @Test + public void testIntraQueuePreemptionFairOrderingPolicyEnabledOneAppPerUser() + throws IOException { + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 60 resources in queue a + // user2/app2 has 40 resources in queue a + // user3/app3 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // With FairOrderingPolicy enabled on queue a, all 20 resources should be + // preempted from app1 + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,60,false,0,user1);" + + "a\t" // app2, user2 in a + + "(1,1,n1,,40,false,0,user2);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,0,false,20,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + /* + * When the capacity scheduler fifo ordering policy is enabled, preempt first + * from the youngest application until reduced to user limit, then preempt + * from next youngest app. + */ + @Test + public void testIntraQueuePreemptionFifoOrderingPolicyEnabled() + throws IOException { + // Enable FifoOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 60 resources in queue a + // user2/app2 has 40 resources in queue a + // user3/app3 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // With FifoOrderingPolicy enabled on queue a, the first 5 should come from + // the youngest app, app2, until app2 is reduced to the user limit of 35. + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,60,false,0,user1);" + + "a\t" // app2, user2 in a + + "(1,1,n1,,40,false,0,user2);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,0,false,5,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + + // user1/app1 has 60 resources in queue a + // user2/app2 has 35 resources in queue a + // user3/app3 has 5 resources and is requesting 15 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // The next 15 should come from app1 even though app2 is younger since app2 + // has already been reduced to its user limit. + appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,60,false,0,user1);" + + "a\t" // app2, user2 in a + + "(1,1,n1,,35,false,0,user2);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,5,false,15,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + /* + * When the capacity scheduler fair ordering policy is enabled, preempt first + * from the youngest application from the user that is the farthest over their + * user limit. + */ + @Test + public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser() + throws IOException { + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 35 resources in queue a + // user1/app2 has 25 resources in queue a + // user2/app3 has 40 resources in queue a + // user3/app4 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // With FairOrderingPolicy enabled on queue a, all 20 resources should be + // preempted from app1 since it's the most over served app from the most + // over served user + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,35,false,0,user1);" + + "a\t" + + "(1,1,n1,,25,false,0,user1);" + + "a\t" // app3, user2 in a + + "(1,1,n1,,40,false,0,user2);" + + "a\t" // app4, user3 in a + + "(1,1,n1,,0,false,20,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + /* + * When the capacity scheduler fifo ordering policy is enabled and a user has + * multiple apps, preempt first from the youngest application. + */ + @Test + public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser() + throws IOException { + // Enable FifoOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 40 resources in queue a + // user1/app2 has 20 resources in queue a + // user3/app3 has 40 resources in queue a + // user4/app4 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,40,false,0,user1);" + + "a\t" // app2, user1 in a + + "(1,1,n1,,20,false,0,user1);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,40,false,0,user3);" + + "a\t" // app4, user4 in a + + "(1,1,n1,,0,false,25,user4)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app3 is the younges and also over its user limit. 5 should be preempted + // from app3 until it comes down to user3's user limit. + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + + // User1's app2 is its youngest. 19 should be preempted from app2, leaving + // only the AM + verify(mDisp, times(19)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + + // Preempt the remaining resource from User1's oldest app1. + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } +}