YARN-8709: CS preemption monitor always fails since one under-served queue was deleted. Contributed by Tao Yang.
(cherry picked from commit 987d8191ad
)
This commit is contained in:
parent
1c2c0ed209
commit
b6bc0f409a
|
@ -467,6 +467,9 @@ public class ProportionalCapacityPreemptionPolicy
|
||||||
Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
|
Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
|
||||||
percentageClusterPreemptionAllowed);
|
percentageClusterPreemptionAllowed);
|
||||||
|
|
||||||
|
//clear under served queues for every run
|
||||||
|
partitionToUnderServedQueues.clear();
|
||||||
|
|
||||||
// based on ideal allocation select containers to be preemptionCandidates from each
|
// based on ideal allocation select containers to be preemptionCandidates from each
|
||||||
// queue and each application
|
// queue and each application
|
||||||
Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
|
Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
|
||||||
|
|
|
@ -206,6 +206,11 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
|
||||||
mClock);
|
mClock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateQueueConfig(String queuesConfig) {
|
||||||
|
ParentQueue root = mockQueueHierarchy(queuesConfig);
|
||||||
|
when(cs.getRootQueue()).thenReturn(root);
|
||||||
|
}
|
||||||
|
|
||||||
private void mockContainers(String containersConfig, FiCaSchedulerApp app,
|
private void mockContainers(String containersConfig, FiCaSchedulerApp app,
|
||||||
ApplicationAttemptId attemptId, String queueName,
|
ApplicationAttemptId attemptId, String queueName,
|
||||||
List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
|
List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
|
||||||
|
|
|
@ -71,9 +71,9 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
|
||||||
"n1= res=100";
|
"n1= res=100";
|
||||||
String queuesConfig =
|
String queuesConfig =
|
||||||
// guaranteed,max,used,pending,reserved
|
// guaranteed,max,used,pending,reserved
|
||||||
"root(=[100 100 79 120 0]);" + // root
|
"root(=[100 100 79 110 0]);" + // root
|
||||||
"-a(=[11 100 11 50 0]);" + // a
|
"-a(=[11 100 11 50 0]);" + // a
|
||||||
"-b(=[40 100 38 60 0]);" + // b
|
"-b(=[40 100 38 50 0]);" + // b
|
||||||
"-c(=[20 100 10 10 0]);" + // c
|
"-c(=[20 100 10 10 0]);" + // c
|
||||||
"-d(=[29 100 20 0 0])"; // d
|
"-d(=[29 100 20 0 0])"; // d
|
||||||
|
|
||||||
|
@ -128,9 +128,9 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
|
||||||
"n1= res=100";
|
"n1= res=100";
|
||||||
String queuesConfig =
|
String queuesConfig =
|
||||||
// guaranteed,max,used,pending,reserved
|
// guaranteed,max,used,pending,reserved
|
||||||
"root(=[100 100 80 120 0]);" + // root
|
"root(=[100 100 80 110 0]);" + // root
|
||||||
"-a(=[11 100 11 50 0]);" + // a
|
"-a(=[11 100 11 50 0]);" + // a
|
||||||
"-b(=[40 100 38 60 0]);" + // b
|
"-b(=[40 100 38 50 0]);" + // b
|
||||||
"-c(=[20 100 10 10 0]);" + // c
|
"-c(=[20 100 10 10 0]);" + // c
|
||||||
"-d(=[29 100 20 0 0])"; // d
|
"-d(=[29 100 20 0 0])"; // d
|
||||||
|
|
||||||
|
@ -942,4 +942,86 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
|
||||||
verify(mDisp, times(22))
|
verify(mDisp, times(22))
|
||||||
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
|
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIntraQueuePreemptionAfterQueueDropped()
|
||||||
|
throws IOException {
|
||||||
|
/**
|
||||||
|
* Test intra queue preemption after under-served queue dropped,
|
||||||
|
* At first, Queue structure is:
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* root
|
||||||
|
* / | | \
|
||||||
|
* a b c d
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* After dropped under-served queue "c", Queue structure is:
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* root
|
||||||
|
* / | \
|
||||||
|
* a b d
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* Verify no exception is thrown and preemption results is correct
|
||||||
|
*/
|
||||||
|
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
|
||||||
|
"priority_first");
|
||||||
|
|
||||||
|
String labelsConfig = "=100,true;";
|
||||||
|
String nodesConfig = // n1 has no label
|
||||||
|
"n1= res=100";
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending,reserved
|
||||||
|
"root(=[100 100 79 110 0]);" + // root
|
||||||
|
"-a(=[11 100 11 50 0]);" + // a
|
||||||
|
"-b(=[40 100 38 50 0]);" + // b
|
||||||
|
"-c(=[20 100 10 10 0]);" + // c
|
||||||
|
"-d(=[29 100 20 0 0])"; // d
|
||||||
|
|
||||||
|
String appsConfig =
|
||||||
|
// queueName\t(priority,resource,host,expression,#repeat,reserved,
|
||||||
|
// pending)
|
||||||
|
"a\t" // app1 in a
|
||||||
|
+ "(1,1,n1,,6,false,25);" + // app1 a
|
||||||
|
"a\t" // app2 in a
|
||||||
|
+ "(1,1,n1,,5,false,25);" + // app2 a
|
||||||
|
"b\t" // app3 in b
|
||||||
|
+ "(4,1,n1,,34,false,20);" + // app3 b
|
||||||
|
"b\t" // app4 in b
|
||||||
|
+ "(4,1,n1,,2,false,10);" + // app4 b
|
||||||
|
"b\t" // app4 in b
|
||||||
|
+ "(5,1,n1,,1,false,10);" + // app5 b
|
||||||
|
"b\t" // app4 in b
|
||||||
|
+ "(6,1,n1,,1,false,10);" + // app6 in b
|
||||||
|
"c\t" // app1 in a
|
||||||
|
+ "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
|
||||||
|
+ "(1,1,n1,,20,false,0)";
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
queuesConfig =
|
||||||
|
// guaranteed,max,used,pending,reserved
|
||||||
|
"root(=[100 100 69 100 0]);" + // root
|
||||||
|
"-a(=[11 100 11 50 0]);" + // a
|
||||||
|
"-b(=[40 100 38 50 0]);" + // b
|
||||||
|
"-d(=[49 100 20 0 0])"; // d
|
||||||
|
|
||||||
|
updateQueueConfig(queuesConfig);
|
||||||
|
|
||||||
|
// will throw YarnRuntimeException(This shouldn't happen, cannot find
|
||||||
|
// TempQueuePerPartition for queueName=c) without patch in YARN-8709
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
// For queue B, app3 and app4 were of lower priority. Hence take 8
|
||||||
|
// containers from them by hitting the intraQueuePreemptionDemand of 20%.
|
||||||
|
verify(mDisp, times(1)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(4))));
|
||||||
|
verify(mDisp, times(7)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(3))));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue