From 2ad1386b2b3d4602c787bd2cce455bec5f8ca666 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 20 Nov 2015 11:45:23 -0800 Subject: [PATCH] YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda) --- hadoop-yarn-project/CHANGES.txt | 3 + .../ProportionalCapacityPreemptionPolicy.java | 3 +- ...tProportionalCapacityPreemptionPolicy.java | 243 ++++++++++++++---- 3 files changed, 190 insertions(+), 59 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1fe828f9ea3..8eea1e7f23e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -22,6 +22,9 @@ Release 2.7.3 - UNRELEASED YARN-4374. RM capacity scheduler UI rounds user limit factor (Chang Li via jlowe) + YARN-3849. Too much of preemption activity causing continuos killing of containers + across queues. (Sunil G via wangda) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index abcb1a2bc8e..1fe38b2da46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -742,12 +742,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic TempQueue ret; synchronized (root) { String queueName = root.getQueueName(); - float absUsed = root.getAbsoluteUsedCapacity(); float absCap = root.getAbsoluteCapacity(); float absMaxCap = root.getAbsoluteMaximumCapacity(); boolean preemptionDisabled = root.getPreemptionDisabled(); - Resource current = Resources.multiply(clusterResources, absUsed); + Resource current = root.getQueueResourceUsage().getUsed(); Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8e9545d2764..cc55eda280c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; @@ -371,7 +373,7 @@ public class TestProportionalCapacityPreemptionPolicy { appA.getApplicationId(), appA.getAttemptId()); assertTrue("appA should be running on queueB", mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB)); - verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); // Need to call setup() again to reset mDisp setup(); @@ -394,7 +396,7 @@ public class TestProportionalCapacityPreemptionPolicy { // Resources should have come from queueE (appC) and neither of queueA's // children. verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); - verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); } @Test @@ -469,8 +471,8 @@ public class TestProportionalCapacityPreemptionPolicy { // With all queues preemptable, resources should be taken from queueC(appA) // and queueD(appB). Resources taken more from queueD(appB) than // queueC(appA) because it's over its capacity by a larger percentage. - verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); - verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, times(17)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(183)).handle(argThat(new IsPreemptionRequestFor(appB))); // Turn off preemption for queueA and it's children. queueF(appC)'s request // should starve. @@ -634,7 +636,7 @@ public class TestProportionalCapacityPreemptionPolicy { policy.editSchedule(); // verify capacity taken from A1, not B1 despite B1 being far over // its absolute guaranteed capacity - verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); } @Test @@ -675,7 +677,7 @@ public class TestProportionalCapacityPreemptionPolicy { // we verify both that C has priority on B and D (has it has >0 guarantees) // and that B and D are force to share their over capacity fairly (as they // are both zero-guarantees) hence D sees some of its containers preempted - verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC))); } @@ -702,8 +704,8 @@ public class TestProportionalCapacityPreemptionPolicy { // XXX note: compensating for rounding error in Resources.multiplyTo // which is likely triggered since we use small numbers for readability - verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA))); - verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE))); + verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appE))); } @Test @@ -828,15 +830,15 @@ public class TestProportionalCapacityPreemptionPolicy { // By skipping AM Container and Labeled container, all other 18 containers // of appD will be // preempted - verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD))); + verify(mDisp, times(18)).handle(argThat(new IsPreemptionRequestFor(appD))); // By skipping AM Container and Labeled container, all other 18 containers // of appC will be // preempted - verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, times(18)).handle(argThat(new IsPreemptionRequestFor(appC))); // rest 4 containers from appB will be preempted - verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); setAMContainer = false; setLabeledContainer = false; } @@ -911,6 +913,34 @@ public class TestProportionalCapacityPreemptionPolicy { setAMContainer = false; } + @Test + public void testPreemptionWithVCoreResource() { + int[][] qData = new int[][]{ + // / A B + {100, 100, 100}, // maxcap + {5, 1, 1}, // apps + {2, 0, 0}, // subqueues + }; + + // Resources can be set like memory:vcores + String[][] resData = new String[][]{ + // / A B + {"100:100", "50:50", "50:50"}, // abs + {"10:100", "10:100", "0"}, // used + {"70:20", "70:20", "10:100"}, // pending + {"0", "0", "0"}, // reserved + {"-1", "1:10", "1:10"}, // req granularity + }; + + // Passing last param as TRUE to use DominantResourceCalculator + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData, + true); + policy.editSchedule(); + + // 5 containers will be preempted here + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA))); + } + static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; @@ -937,37 +967,109 @@ public class TestProportionalCapacityPreemptionPolicy { ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock); + clusterResources = + Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); ParentQueue mRoot = buildMockRootQueue(rand, qData); when(mCS.getRootQueue()).thenReturn(mRoot); - clusterResources = - Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); - when(mCS.getClusterResource()).thenReturn(clusterResources); + setResourceAndNodeDetails(); return policy; } - ParentQueue buildMockRootQueue(Random r, int[]... queueData) { - int[] abs = queueData[0]; - int[] maxCap = queueData[1]; - int[] used = queueData[2]; - int[] pending = queueData[3]; - int[] reserved = queueData[4]; - int[] apps = queueData[5]; - int[] gran = queueData[6]; - int[] queues = queueData[7]; - - return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); + ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, + String[][] resData) { + return buildPolicy(qData, resData, false); } - ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, - int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) { - float tot = leafAbsCapacities(abs, queues); + ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, + String[][] resData, boolean useDominantResourceCalculator) { + if (useDominantResourceCalculator) { + when(mCS.getResourceCalculator()).thenReturn( + new DominantResourceCalculator()); + } + ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy( + conf, rmContext, mCS, mClock); + clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]), + qData[2]); + ParentQueue mRoot = buildMockRootQueue(rand, resData, qData); + when(mCS.getRootQueue()).thenReturn(mRoot); + + setResourceAndNodeDetails(); + return policy; + } + + private void setResourceAndNodeDetails() { + when(mCS.getClusterResource()).thenReturn(clusterResources); + when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( + clusterResources); + } + + ParentQueue buildMockRootQueue(Random r, int[]... queueData) { + Resource[] abs = generateResourceList(queueData[0]); + Resource[] used = generateResourceList(queueData[2]); + Resource[] pending = generateResourceList(queueData[3]); + Resource[] reserved = generateResourceList(queueData[4]); + Resource[] gran = generateResourceList(queueData[6]); + int[] maxCap = queueData[1]; + int[] apps = queueData[5]; + int[] queues = queueData[7]; + + return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); + } + + private ParentQueue buildMockRootQueue(Random rand2, String[][] resData, + int[][] queueData) { + Resource[] abs = parseResourceDetails(resData[0]); + Resource[] used = parseResourceDetails(resData[1]); + Resource[] pending = parseResourceDetails(resData[2]); + Resource[] reserved = parseResourceDetails(resData[3]); + Resource[] gran = parseResourceDetails(resData[4]); + int[] maxCap = queueData[0]; + int[] apps = queueData[1]; + int[] queues = queueData[2]; + + return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); + } + + Resource[] parseResourceDetails(String[] resData) { + List resourceList = new ArrayList(); + for (int i = 0; i < resData.length; i++) { + String[] resource = resData[i].split(":"); + if (resource.length == 1) { + resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), 0)); + } else { + resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), + Integer.valueOf(resource[1]))); + } + } + return resourceList.toArray(new Resource[resourceList.size()]); + } + + Resource[] generateResourceList(int[] qData) { + List resourceList = new ArrayList(); + for (int i = 0; i < qData.length; i++) { + resourceList.add(Resource.newInstance(qData[i], 0)); + } + return resourceList.toArray(new Resource[resourceList.size()]); + } + + ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used, + Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran, + int[] queues) { + ResourceCalculator rc = mCS.getResourceCalculator(); + Resource tot = leafAbsCapacities(abs, queues); Deque pqs = new LinkedList(); ParentQueue root = mockParentQueue(null, queues[0], pqs); + ResourceUsage resUsage = new ResourceUsage(); + resUsage.setUsed(used[0]); when(root.getQueueName()).thenReturn("/"); - when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot); - when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); - when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot); + when(root.getAbsoluteUsedCapacity()).thenReturn( + Resources.divide(rc, tot, used[0], tot)); + when(root.getAbsoluteCapacity()).thenReturn( + Resources.divide(rc, tot, abs[0], tot)); + when(root.getAbsoluteMaximumCapacity()).thenReturn( + maxCap[0] / (float) tot.getMemory()); + when(root.getQueueResourceUsage()).thenReturn(resUsage); when(root.getQueuePath()).thenReturn("root"); boolean preemptionDisabled = mockPreemptionStatus("root"); when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled); @@ -978,14 +1080,20 @@ public class TestProportionalCapacityPreemptionPolicy { final String queueName = "queue" + ((char)('A' + i - 1)); if (queues[i] > 0) { q = mockParentQueue(p, queues[i], pqs); + ResourceUsage resUsagePerQueue = new ResourceUsage(); + resUsagePerQueue.setUsed(used[i]); + when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue); } else { q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran); } when(q.getParent()).thenReturn(p); when(q.getQueueName()).thenReturn(queueName); - when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); - when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); - when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + when(q.getAbsoluteUsedCapacity()).thenReturn( + Resources.divide(rc, tot, used[i], tot)); + when(q.getAbsoluteCapacity()).thenReturn( + Resources.divide(rc, tot, abs[i], tot)); + when(q.getAbsoluteMaximumCapacity()).thenReturn( + maxCap[i] / (float) tot.getMemory()); String parentPathName = p.getQueuePath(); parentPathName = (parentPathName == null) ? "root" : parentPathName; String queuePathName = (parentPathName+"."+queueName).replace("/","root"); @@ -1027,13 +1135,19 @@ public class TestProportionalCapacityPreemptionPolicy { return pq; } - LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, - int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { + LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs, + Resource[] used, Resource[] pending, Resource[] reserved, int[] apps, + Resource[] gran) { LeafQueue lq = mock(LeafQueue.class); + ResourceCalculator rc = mCS.getResourceCalculator(); List appAttemptIdList = new ArrayList(); - when(lq.getTotalResourcePending()).thenReturn( - Resource.newInstance(pending[i], 0)); + when(lq.getTotalResourcePending()).thenReturn(pending[i]); + // need to set pending resource in resource usage as well + ResourceUsage ru = new ResourceUsage(); + ru.setPending(pending[i]); + ru.setUsed(used[i]); + when(lq.getQueueResourceUsage()).thenReturn(ru); // consider moving where CapacityScheduler::comparator accessible NavigableSet qApps = new TreeSet( new Comparator() { @@ -1045,9 +1159,9 @@ public class TestProportionalCapacityPreemptionPolicy { }); // applications are added in global L->R order in queues if (apps[i] != 0) { - int aUsed = used[i] / apps[i]; - int aPending = pending[i] / apps[i]; - int aReserve = reserved[i] / apps[i]; + Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]); + Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]); + Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]); for (int a = 0; a < apps[i]; ++a) { FiCaSchedulerApp mockFiCaApp = mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]); @@ -1066,9 +1180,10 @@ public class TestProportionalCapacityPreemptionPolicy { return lq; } - FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, - int gran) { + FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending, + Resource reserved, Resource gran) { FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + ResourceCalculator rc = mCS.getResourceCalculator(); ApplicationId appId = ApplicationId.newInstance(TS, id); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0); @@ -1076,30 +1191,34 @@ public class TestProportionalCapacityPreemptionPolicy { when(app.getApplicationAttemptId()).thenReturn(appAttId); int cAlloc = 0; - Resource unit = Resource.newInstance(gran, 0); + Resource unit = gran; List cReserved = new ArrayList(); - for (int i = 0; i < reserved; i += gran) { - cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER - .getValue())); + Resource resIter = Resource.newInstance(0, 0); + for (; Resources.lessThan(rc, clusterResources, resIter, reserved); Resources + .addTo(resIter, gran)) { + cReserved.add(mockContainer(appAttId, cAlloc, unit, + priority.CONTAINER.getValue())); ++cAlloc; } when(app.getReservedContainers()).thenReturn(cReserved); List cLive = new ArrayList(); - for (int i = 0; i < used; i += gran) { - if(setAMContainer && i == 0){ - cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER - .getValue())); - }else if(setLabeledContainer && i ==1){ + Resource usedIter = Resource.newInstance(0, 0); + int i = 0; + for (; Resources.lessThan(rc, clusterResources, usedIter, used); Resources + .addTo(usedIter, gran)) { + if (setAMContainer && i == 0) { + cLive.add(mockContainer(appAttId, cAlloc, unit, + priority.AMCONTAINER.getValue())); + } else if (setLabeledContainer && i == 1) { cLive.add(mockContainer(appAttId, cAlloc, unit, priority.LABELEDCONTAINER.getValue())); - ++used; - } - else{ - cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER - .getValue())); + } else { + cLive.add(mockContainer(appAttId, cAlloc, unit, + priority.CONTAINER.getValue())); } ++cAlloc; + ++i; } when(app.getLiveContainers()).thenReturn(cLive); return app; @@ -1134,6 +1253,16 @@ public class TestProportionalCapacityPreemptionPolicy { return ret; } + static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) { + Resource ret = Resource.newInstance(0, 0); + for (int i = 0; i < abs.length; ++i) { + if (0 == subqueues[i]) { + Resources.addTo(ret, abs[i]); + } + } + return ret; + } + void printString(CSQueue nq, String indent) { if (nq instanceof ParentQueue) { System.out.println(indent + nq.getQueueName()