From 1df39c1efc9ed26d3f1a5887c31c38c873e0b784 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Sat, 11 Jul 2015 10:26:46 -0700 Subject: [PATCH] YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda) --- hadoop-yarn-project/CHANGES.txt | 3 + .../ProportionalCapacityPreemptionPolicy.java | 4 +- ...tProportionalCapacityPreemptionPolicy.java | 251 +++++++++++++----- ...cityPreemptionPolicyForNodePartitions.java | 114 ++++++-- 4 files changed, 288 insertions(+), 84 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f78bbfa1548..1365747516d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -613,6 +613,9 @@ Release 2.8.0 - UNRELEASED YARN-3888. ApplicationMaster link is broken in RM WebUI when appstate is NEW. (Bibin A Chundatt via xgong) + YARN-3849. Too much of preemption activity causing continuos killing of + containers across queues. (Sunil G via wangda) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 5a20a6f7bf9..6e661d4b670 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -840,12 +840,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic synchronized (curQueue) { String queueName = curQueue.getQueueName(); QueueCapacities qc = curQueue.getQueueCapacities(); - float absUsed = qc.getAbsoluteUsedCapacity(partitionToLookAt); float absCap = qc.getAbsoluteCapacity(partitionToLookAt); float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt); boolean preemptionDisabled = curQueue.getPreemptionDisabled(); - Resource current = Resources.multiply(partitionResource, absUsed); + Resource current = curQueue.getQueueResourceUsage().getUsed( + partitionToLookAt); Resource guaranteed = Resources.multiply(partitionResource, absCap); Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 2c0c6d75658..305736030fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -81,7 +81,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -372,7 +374,7 @@ public class TestProportionalCapacityPreemptionPolicy { appA.getApplicationId(), appA.getAttemptId()); assertTrue("appA should be running on queueB", mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB)); - verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); // Need to call setup() again to reset mDisp setup(); @@ -395,7 +397,7 @@ public class TestProportionalCapacityPreemptionPolicy { // Resources should have come from queueE (appC) and neither of queueA's // children. verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); - verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); } @Test @@ -470,8 +472,8 @@ public class TestProportionalCapacityPreemptionPolicy { // With all queues preemptable, resources should be taken from queueC(appA) // and queueD(appB). Resources taken more from queueD(appB) than // queueC(appA) because it's over its capacity by a larger percentage. - verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); - verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, times(17)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(183)).handle(argThat(new IsPreemptionRequestFor(appB))); // Turn off preemption for queueA and it's children. queueF(appC)'s request // should starve. @@ -635,7 +637,7 @@ public class TestProportionalCapacityPreemptionPolicy { policy.editSchedule(); // verify capacity taken from A1, not B1 despite B1 being far over // its absolute guaranteed capacity - verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); } @Test @@ -676,7 +678,7 @@ public class TestProportionalCapacityPreemptionPolicy { // we verify both that C has priority on B and D (has it has >0 guarantees) // and that B and D are force to share their over capacity fairly (as they // are both zero-guarantees) hence D sees some of its containers preempted - verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC))); } @@ -703,8 +705,8 @@ public class TestProportionalCapacityPreemptionPolicy { // XXX note: compensating for rounding error in Resources.multiplyTo // which is likely triggered since we use small numbers for readability - verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA))); - verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE))); + verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appE))); } @Test @@ -868,6 +870,34 @@ public class TestProportionalCapacityPreemptionPolicy { setAMContainer = false; } + @Test + public void testPreemptionWithVCoreResource() { + int[][] qData = new int[][]{ + // / A B + {100, 100, 100}, // maxcap + {5, 1, 1}, // apps + {2, 0, 0}, // subqueues + }; + + // Resources can be set like memory:vcores + String[][] resData = new String[][]{ + // / A B + {"100:100", "50:50", "50:50"}, // abs + {"10:100", "10:100", "0"}, // used + {"70:20", "70:20", "10:100"}, // pending + {"0", "0", "0"}, // reserved + {"-1", "1:10", "1:10"}, // req granularity + }; + + // Passing last param as TRUE to use DominantResourceCalculator + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData, + true); + policy.editSchedule(); + + // 5 containers will be preempted here + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA))); + } + static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; @@ -892,13 +922,40 @@ public class TestProportionalCapacityPreemptionPolicy { } ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { - ProportionalCapacityPreemptionPolicy policy = - new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock); + ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy( + conf, rmContext, mCS, mClock); + clusterResources = Resource.newInstance( + leafAbsCapacities(qData[0], qData[7]), 0); ParentQueue mRoot = buildMockRootQueue(rand, qData); when(mCS.getRootQueue()).thenReturn(mRoot); - clusterResources = - Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); + setResourceAndNodeDetails(); + return policy; + } + + ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, + String[][] resData) { + return buildPolicy(qData, resData, false); + } + + ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, + String[][] resData, boolean useDominantResourceCalculator) { + if (useDominantResourceCalculator) { + when(mCS.getResourceCalculator()).thenReturn( + new DominantResourceCalculator()); + } + ProportionalCapacityPreemptionPolicy policy = + new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock); + clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]), + qData[2]); + ParentQueue mRoot = buildMockRootQueue(rand, resData, qData); + when(mCS.getRootQueue()).thenReturn(mRoot); + + setResourceAndNodeDetails(); + return policy; + } + + private void setResourceAndNodeDetails() { when(mCS.getClusterResource()).thenReturn(clusterResources); when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( clusterResources); @@ -906,35 +963,78 @@ public class TestProportionalCapacityPreemptionPolicy { SchedulerNode mNode = mock(SchedulerNode.class); when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL); when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode); - return policy; } ParentQueue buildMockRootQueue(Random r, int[]... queueData) { - int[] abs = queueData[0]; - int[] maxCap = queueData[1]; - int[] used = queueData[2]; - int[] pending = queueData[3]; - int[] reserved = queueData[4]; - int[] apps = queueData[5]; - int[] gran = queueData[6]; - int[] queues = queueData[7]; + Resource[] abs = generateResourceList(queueData[0]); + Resource[] used = generateResourceList(queueData[2]); + Resource[] pending = generateResourceList(queueData[3]); + Resource[] reserved = generateResourceList(queueData[4]); + Resource[] gran = generateResourceList(queueData[6]); + int[] maxCap = queueData[1]; + int[] apps = queueData[5]; + int[] queues = queueData[7]; - return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); + return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); } - ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, - int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) { - float tot = leafAbsCapacities(abs, queues); + ParentQueue buildMockRootQueue(Random r, String[][] resData, + int[]... queueData) { + Resource[] abs = parseResourceDetails(resData[0]); + Resource[] used = parseResourceDetails(resData[1]); + Resource[] pending = parseResourceDetails(resData[2]); + Resource[] reserved = parseResourceDetails(resData[3]); + Resource[] gran = parseResourceDetails(resData[4]); + int[] maxCap = queueData[0]; + int[] apps = queueData[1]; + int[] queues = queueData[2]; + + return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); + } + + Resource[] parseResourceDetails(String[] resData) { + List resourceList = new ArrayList(); + for (int i = 0; i < resData.length; i++) { + String[] resource = resData[i].split(":"); + if (resource.length == 1) { + resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), 0)); + } else { + resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), + Integer.valueOf(resource[1]))); + } + } + return resourceList.toArray(new Resource[resourceList.size()]); + } + + Resource[] generateResourceList(int[] qData) { + List resourceList = new ArrayList(); + for (int i = 0; i < qData.length; i++) { + resourceList.add(Resource.newInstance(qData[i], 0)); + } + return resourceList.toArray(new Resource[resourceList.size()]); + } + + ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used, + Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran, + int[] queues) { + ResourceCalculator rc = mCS.getResourceCalculator(); + Resource tot = leafAbsCapacities(abs, queues); Deque pqs = new LinkedList(); ParentQueue root = mockParentQueue(null, queues[0], pqs); + ResourceUsage resUsage = new ResourceUsage(); + resUsage.setUsed(used[0]); when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT); - when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot); - when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); - when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot); + when(root.getAbsoluteUsedCapacity()).thenReturn( + Resources.divide(rc, tot, used[0], tot)); + when(root.getAbsoluteCapacity()).thenReturn( + Resources.divide(rc, tot, abs[0], tot)); + when(root.getAbsoluteMaximumCapacity()).thenReturn( + maxCap[0] / (float) tot.getMemory()); + when(root.getQueueResourceUsage()).thenReturn(resUsage); QueueCapacities rootQc = new QueueCapacities(true); - rootQc.setAbsoluteUsedCapacity(used[0] / tot); - rootQc.setAbsoluteCapacity(abs[0] / tot); - rootQc.setAbsoluteMaximumCapacity(maxCap[0] / tot); + rootQc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[0], tot)); + rootQc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[0], tot)); + rootQc.setAbsoluteMaximumCapacity(maxCap[0] / (float) tot.getMemory()); when(root.getQueueCapacities()).thenReturn(rootQc); when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT); boolean preemptionDisabled = mockPreemptionStatus("root"); @@ -943,28 +1043,35 @@ public class TestProportionalCapacityPreemptionPolicy { for (int i = 1; i < queues.length; ++i) { final CSQueue q; final ParentQueue p = pqs.removeLast(); - final String queueName = "queue" + ((char)('A' + i - 1)); + final String queueName = "queue" + ((char) ('A' + i - 1)); if (queues[i] > 0) { q = mockParentQueue(p, queues[i], pqs); + ResourceUsage resUsagePerQueue = new ResourceUsage(); + resUsagePerQueue.setUsed(used[i]); + when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue); } else { q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran); } when(q.getParent()).thenReturn(p); when(q.getQueueName()).thenReturn(queueName); - when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); - when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); - when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + when(q.getAbsoluteUsedCapacity()).thenReturn( + Resources.divide(rc, tot, used[i], tot)); + when(q.getAbsoluteCapacity()).thenReturn( + Resources.divide(rc, tot, abs[i], tot)); + when(q.getAbsoluteMaximumCapacity()).thenReturn( + maxCap[i] / (float) tot.getMemory()); // We need to make these fields to QueueCapacities QueueCapacities qc = new QueueCapacities(false); - qc.setAbsoluteUsedCapacity(used[i] / tot); - qc.setAbsoluteCapacity(abs[i] / tot); - qc.setAbsoluteMaximumCapacity(maxCap[i] / tot); + qc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[i], tot)); + qc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[i], tot)); + qc.setAbsoluteMaximumCapacity(maxCap[i] / (float) tot.getMemory()); when(q.getQueueCapacities()).thenReturn(qc); String parentPathName = p.getQueuePath(); parentPathName = (parentPathName == null) ? "root" : parentPathName; - String queuePathName = (parentPathName+"."+queueName).replace("/","root"); + String queuePathName = (parentPathName + "." + queueName).replace("/", + "root"); when(q.getQueuePath()).thenReturn(queuePathName); preemptionDisabled = mockPreemptionStatus(queuePathName); when(q.getPreemptionDisabled()).thenReturn(preemptionDisabled); @@ -1004,16 +1111,18 @@ public class TestProportionalCapacityPreemptionPolicy { } @SuppressWarnings("rawtypes") - LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, - int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { + LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs, + Resource[] used, Resource[] pending, Resource[] reserved, int[] apps, + Resource[] gran) { LeafQueue lq = mock(LeafQueue.class); + ResourceCalculator rc = mCS.getResourceCalculator(); List appAttemptIdList = new ArrayList(); - when(lq.getTotalResourcePending()).thenReturn( - Resource.newInstance(pending[i], 0)); + when(lq.getTotalResourcePending()).thenReturn(pending[i]); // need to set pending resource in resource usage as well ResourceUsage ru = new ResourceUsage(); - ru.setPending(Resource.newInstance(pending[i], 0)); + ru.setPending(pending[i]); + ru.setUsed(used[i]); when(lq.getQueueResourceUsage()).thenReturn(ru); // consider moving where CapacityScheduler::comparator accessible final NavigableSet qApps = new TreeSet( @@ -1026,9 +1135,9 @@ public class TestProportionalCapacityPreemptionPolicy { }); // applications are added in global L->R order in queues if (apps[i] != 0) { - int aUsed = used[i] / apps[i]; - int aPending = pending[i] / apps[i]; - int aReserve = reserved[i] / apps[i]; + Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]); + Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]); + Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]); for (int a = 0; a < apps[i]; ++a) { FiCaSchedulerApp mockFiCaApp = mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]); @@ -1055,9 +1164,10 @@ public class TestProportionalCapacityPreemptionPolicy { return lq; } - FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, - int gran) { + FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending, + Resource reserved, Resource gran) { FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + ResourceCalculator rc = mCS.getResourceCalculator(); ApplicationId appId = ApplicationId.newInstance(TS, id); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0); @@ -1065,30 +1175,35 @@ public class TestProportionalCapacityPreemptionPolicy { when(app.getApplicationAttemptId()).thenReturn(appAttId); int cAlloc = 0; - Resource unit = Resource.newInstance(gran, 0); + Resource unit = gran; List cReserved = new ArrayList(); - for (int i = 0; i < reserved; i += gran) { - cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER - .getValue())); + Resource resIter = Resource.newInstance(0, 0); + for (; Resources.lessThan(rc, clusterResources, resIter, reserved); Resources + .addTo(resIter, gran)) { + cReserved.add(mockContainer(appAttId, cAlloc, unit, + priority.CONTAINER.getValue())); ++cAlloc; } when(app.getReservedContainers()).thenReturn(cReserved); List cLive = new ArrayList(); - for (int i = 0; i < used; i += gran) { - if(setAMContainer && i == 0){ - cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER - .getValue())); - }else if(setLabeledContainer && i ==1){ + Resource usedIter = Resource.newInstance(0, 0); + int i = 0; + for (; Resources.lessThan(rc, clusterResources, usedIter, used); Resources + .addTo(usedIter, gran)) { + if (setAMContainer && i == 0) { + cLive.add(mockContainer(appAttId, cAlloc, unit, + priority.AMCONTAINER.getValue())); + } else if (setLabeledContainer && i == 1) { cLive.add(mockContainer(appAttId, cAlloc, unit, priority.LABELEDCONTAINER.getValue())); - ++used; - } - else{ - cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER - .getValue())); + Resources.addTo(used, Resource.newInstance(1, 1)); + } else { + cLive.add(mockContainer(appAttId, cAlloc, unit, + priority.CONTAINER.getValue())); } ++cAlloc; + ++i; } when(app.getLiveContainers()).thenReturn(cLive); return app; @@ -1124,6 +1239,16 @@ public class TestProportionalCapacityPreemptionPolicy { return ret; } + static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) { + Resource ret = Resource.newInstance(0, 0); + for (int i = 0; i < abs.length; ++i) { + if (0 == subqueues[i]) { + Resources.addTo(ret, abs[i]); + } + } + return ret; + } + void printString(CSQueue nq, String indent) { if (nq instanceof ParentQueue) { System.out.println(indent + nq.getQueueName() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index 114769c9838..b3ac79bb072 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -771,6 +772,60 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); } + @Test + public void testNodePartitionPreemptionWithVCoreResource() throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Both a/b can access x, and guaranteed capacity of them is 50:50. Two + * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster, + * app1/app2 in a, and app3/app4 in b. app1 uses 80 x, app2 uses 20 + * NO_LABEL, app3 uses 20 x, app4 uses 80 NO_LABEL. Both a/b have 50 pending + * resource for x and NO_LABEL + * + * After preemption, it should preempt 30 from app1, and 30 from app4. + */ + String labelsConfig = "=100:200,true;" + // default partition + "x=100:200,true"; // partition=x + String nodesConfig = "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100:200 100:200 100:200 100:200],x=[100:200 100:200 100:200 100:200]);" + + // root + "-a(=[50:100 100:200 20:40 50:100],x=[50:100 100:200 80:160 50:100]);" + // a + "-b(=[50:100 100:200 80:160 50:100],x=[50:100 100:200 20:40 50:100])"; // b + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1:2,n1,x,80,false);" + // 80 * x in n1 + "a\t" // app2 in a + + "(1,1:2,n2,,20,false);" + // 20 default in n2 + "b\t" // app3 in b + + "(1,1:2,n1,x,20,false);" + // 20 * x in n1 + "b\t" // app4 in b + + "(1,1:2,n2,,80,false)"; // 80 default in n2 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true); + policy.editSchedule(); + + // 30 preempted from app1, 30 preempted from app4, and nothing preempted + // from app2/app3 + verify(mDisp, times(30)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, times(30)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(4)))); + verify(mDisp, never()).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + verify(mDisp, never()).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); + } private ApplicationAttemptId getAppAttemptId(int id) { ApplicationId appId = ApplicationId.newInstance(0L, id); @@ -821,6 +876,16 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { private void buildEnv(String labelsConfig, String nodesConfig, String queuesConfig, String appsConfig) throws IOException { + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false); + } + + private void buildEnv(String labelsConfig, String nodesConfig, + String queuesConfig, String appsConfig, + boolean useDominantResourceCalculator) throws IOException { + if (useDominantResourceCalculator) { + when(cs.getResourceCalculator()).thenReturn( + new DominantResourceCalculator()); + } mockNodeLabelsManager(labelsConfig); mockSchedulerNodes(nodesConfig); for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) { @@ -832,7 +897,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { when(cs.getClusterResource()).thenReturn(clusterResource); mockApplications(appsConfig); - policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock); + policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, + mClock); } private void mockContainers(String containersConfig, ApplicationAttemptId attemptId, @@ -868,7 +934,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { + "(priority,resource,host,expression,repeat,reserved)"); } Priority pri = Priority.newInstance(Integer.valueOf(values[0])); - Resource res = Resources.createResource(Integer.valueOf(values[1])); + Resource res = parseResourceFromString(values[1]); NodeId host = NodeId.newInstance(values[2], 1); String exp = values[3]; int repeat = Integer.valueOf(values[4]); @@ -1002,11 +1068,10 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { clusterResource = Resources.createResource(0); for (String p : partitionConfigArr) { String partitionName = p.substring(0, p.indexOf("=")); - int totalResource = - Integer.valueOf(p.substring(p.indexOf("=") + 1, p.indexOf(","))); - boolean exclusivity = + Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1, + p.indexOf(","))); + boolean exclusivity = Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length())); - Resource res = Resources.createResource(totalResource); when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class))) .thenReturn(res); when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity); @@ -1022,6 +1087,18 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { partitionToResource.keySet()); } + private Resource parseResourceFromString(String p) { + String[] resource = p.split(":"); + Resource res = Resources.createResource(0); + if (resource.length == 1) { + res = Resources.createResource(Integer.valueOf(resource[0])); + } else { + res = Resources.createResource(Integer.valueOf(resource[0]), + Integer.valueOf(resource[1])); + } + return res; + } + /** * Format is: *
@@ -1120,23 +1197,22 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
       // Add a small epsilon to capacities to avoid truncate when doing
       // Resources.multiply
       float epsilon = 1e-6f;
-      float absGuaranteed =
-          Integer.valueOf(values[0].trim())
-              / (float) (partitionToResource.get(partitionName).getMemory())
-              + epsilon;
-      float absMax =
-          Integer.valueOf(values[1].trim())
-              / (float) (partitionToResource.get(partitionName).getMemory())
-              + epsilon;
-      float absUsed =
-          Integer.valueOf(values[2].trim())
-              / (float) (partitionToResource.get(partitionName).getMemory())
-              + epsilon;
-      Resource pending = Resources.createResource(Integer.valueOf(values[3].trim()));
+      Resource totResoucePerPartition = partitionToResource.get(partitionName);
+      float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[0].trim()), totResoucePerPartition)
+          + epsilon;
+      float absMax = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[1].trim()), totResoucePerPartition)
+          + epsilon;
+      float absUsed = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[2].trim()), totResoucePerPartition)
+          + epsilon;
+      Resource pending = parseResourceFromString(values[3].trim());
       qc.setAbsoluteCapacity(partitionName, absGuaranteed);
       qc.setAbsoluteMaximumCapacity(partitionName, absMax);
       qc.setAbsoluteUsedCapacity(partitionName, absUsed);
       ru.setPending(partitionName, pending);
+      ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
       LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
           + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
           + ",abs_used" + absUsed + ",pending_resource=" + pending + "]");