From 0a2d3e717d9c42090a32ff177991a222a1e34132 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Mon, 19 Jan 2015 16:48:50 -0800 Subject: [PATCH] YARN-2933. Capacity Scheduler preemption policy should only consider capacity without labels temporarily. Contributed by Mayank Bansal --- hadoop-yarn-project/CHANGES.txt | 3 + .../ProportionalCapacityPreemptionPolicy.java | 53 ++++++++- ...tProportionalCapacityPreemptionPolicy.java | 106 ++++++++++++++++-- 3 files changed, 149 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 56709630f44..a29d3168eb0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -391,6 +391,9 @@ Release 2.7.0 - UNRELEASED YARN-3015. yarn classpath command should support same options as hadoop classpath. (Contributed by Varun Saxena) + YARN-2933. Capacity Scheduler preemption policy should only consider capacity + without labels temporarily. (Mayank Bansal via wangda) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 1a3f804b831..0743f60ee2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -30,15 +30,19 @@ import java.util.NavigableSet; import java.util.PriorityQueue; import java.util.Set; +import org.apache.commons.collections.map.HashedMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; @@ -129,6 +133,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private Map> labels; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -168,6 +173,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); + labels = null; } @VisibleForTesting @@ -176,13 +182,38 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } @Override - public void editSchedule(){ + public void editSchedule() { CSQueue root = scheduler.getRootQueue(); - Resource clusterResources = - Resources.clone(scheduler.getClusterResource()); + Resource clusterResources = Resources.clone(scheduler.getClusterResource()); + clusterResources = getNonLabeledResources(clusterResources); + setNodeLabels(scheduler.getRMContext().getNodeLabelManager() + .getNodeLabels()); containerBasedPreemptOrKill(root, clusterResources); } + /** + * Setting Node Labels + * + * @param nodelabels + */ + public void setNodeLabels(Map> nodelabels) { + labels = nodelabels; + } + + /** + * This method returns all non labeled resources. + * + * @param clusterResources + * @return Resources + */ + private Resource getNonLabeledResources(Resource clusterResources) { + RMContext rmcontext = scheduler.getRMContext(); + RMNodeLabelsManager lm = rmcontext.getNodeLabelManager(); + Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, + clusterResources); + return res == null ? clusterResources : res; + } + /** * This method selects and tracks containers to be preempted. If a container * is in the target list for more than maxWaitTime it is killed. @@ -593,7 +624,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param app * @param clusterResource * @param rsrcPreempt - * @return + * @return Set Set of RMContainers */ private Set preemptFrom(FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt, @@ -635,12 +666,26 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic Resources.addTo(skippedAMSize, c.getContainer().getResource()); continue; } + // skip Labeled resource + if(isLabeledContainer(c)){ + continue; + } ret.add(c); Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } return ret; } + + /** + * Checking if given container is a labeled container + * + * @param c + * @return true/false + */ + private boolean isLabeledContainer(RMContainer c) { + return labels.containsKey(c.getAllocatedNode()); + } /** * Compare by reversed priority order first, and then reversed containerId diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index ca67ef0d6e0..0a147f480ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -38,27 +38,37 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Random; +import java.util.Set; import java.util.TreeSet; +import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; @@ -72,12 +82,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; +import org.mortbay.log.Log; public class TestProportionalCapacityPreemptionPolicy { @@ -85,14 +97,18 @@ public class TestProportionalCapacityPreemptionPolicy { int appAlloc = 0; boolean setAMContainer = false; + boolean setLabeledContainer = false; float setAMResourcePercent = 0.0f; Random rand = null; Clock mClock = null; Configuration conf = null; CapacityScheduler mCS = null; + RMContext rmContext = null; + RMNodeLabelsManager lm = null; CapacitySchedulerConfiguration schedConf = null; EventHandler mDisp = null; ResourceCalculator rc = new DefaultResourceCalculator(); + Resource clusterResources = null; final ApplicationAttemptId appA = ApplicationAttemptId.newInstance( ApplicationId.newInstance(TS, 0), 0); final ApplicationAttemptId appB = ApplicationAttemptId.newInstance( @@ -108,6 +124,19 @@ public class TestProportionalCapacityPreemptionPolicy { final ArgumentCaptor evtCaptor = ArgumentCaptor.forClass(ContainerPreemptEvent.class); + public enum priority { + AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2); + int value; + + private priority(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + }; + @Rule public TestName name = new TestName(); @Before @@ -130,8 +159,12 @@ public class TestProportionalCapacityPreemptionPolicy { mClock = mock(Clock.class); mCS = mock(CapacityScheduler.class); when(mCS.getResourceCalculator()).thenReturn(rc); + lm = mock(RMNodeLabelsManager.class); schedConf = new CapacitySchedulerConfiguration(); when(mCS.getConfiguration()).thenReturn(schedConf); + rmContext = mock(RMContext.class); + when(mCS.getRMContext()).thenReturn(rmContext); + when(rmContext.getNodeLabelManager()).thenReturn(lm); mDisp = mock(EventHandler.class); rand = new Random(); long seed = rand.nextLong(); @@ -746,7 +779,51 @@ public class TestProportionalCapacityPreemptionPolicy { verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); setAMContainer = false; } - + + @Test + public void testIdealAllocationForLabels() { + int[][] qData = new int[][] { + // / A B + { 80, 40, 40 }, // abs + { 80, 80, 80 }, // maxcap + { 80, 80, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setLabeledContainer = true; + Map> labels = new HashMap>(); + NodeId node = NodeId.newInstance("node1", 0); + Set labelSet = new HashSet(); + labelSet.add("x"); + labels.put(node, labelSet); + when(lm.getNodeLabels()).thenReturn(labels); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + // Subtracting Label X resources from cluster resources + when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( + Resources.clone(Resource.newInstance(80, 0))); + clusterResources.setMemory(100); + policy.editSchedule(); + + // By skipping AM Container and Labeled container, all other 18 containers + // of appD will be + // preempted + verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container and Labeled container, all other 18 containers + // of appC will be + // preempted + verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // rest 4 containers from appB will be preempted + verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); + setAMContainer = false; + setLabeledContainer = false; + } + @Test public void testPreemptSkippedAMContainers() { int[][] qData = new int[][] { @@ -846,7 +923,7 @@ public class TestProportionalCapacityPreemptionPolicy { ParentQueue mRoot = buildMockRootQueue(rand, qData); when(mCS.getRootQueue()).thenReturn(mRoot); - Resource clusterResources = + clusterResources = Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); when(mCS.getClusterResource()).thenReturn(clusterResources); return policy; @@ -965,7 +1042,8 @@ public class TestProportionalCapacityPreemptionPolicy { Resource unit = Resource.newInstance(gran, 0); List cReserved = new ArrayList(); for (int i = 0; i < reserved; i += gran) { - cReserved.add(mockContainer(appAttId, cAlloc, unit, 1)); + cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER + .getValue())); ++cAlloc; } when(app.getReservedContainers()).thenReturn(cReserved); @@ -973,9 +1051,16 @@ public class TestProportionalCapacityPreemptionPolicy { List cLive = new ArrayList(); for (int i = 0; i < used; i += gran) { if(setAMContainer && i == 0){ - cLive.add(mockContainer(appAttId, cAlloc, unit, 0)); - }else{ - cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER + .getValue())); + }else if(setLabeledContainer && i ==1){ + cLive.add(mockContainer(appAttId, cAlloc, unit, + priority.LABELEDCONTAINER.getValue())); + ++used; + } + else{ + cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER + .getValue())); } ++cAlloc; } @@ -984,18 +1069,21 @@ public class TestProportionalCapacityPreemptionPolicy { } RMContainer mockContainer(ApplicationAttemptId appAttId, int id, - Resource r, int priority) { + Resource r, int cpriority) { ContainerId cId = ContainerId.newContainerId(appAttId, id); Container c = mock(Container.class); when(c.getResource()).thenReturn(r); - when(c.getPriority()).thenReturn(Priority.create(priority)); + when(c.getPriority()).thenReturn(Priority.create(cpriority)); RMContainer mC = mock(RMContainer.class); when(mC.getContainerId()).thenReturn(cId); when(mC.getContainer()).thenReturn(c); when(mC.getApplicationAttemptId()).thenReturn(appAttId); - if(0 == priority){ + if (priority.AMCONTAINER.getValue() == cpriority) { when(mC.isAMContainer()).thenReturn(true); } + if (priority.LABELEDCONTAINER.getValue() == cpriority) { + when(mC.getAllocatedNode()).thenReturn(NodeId.newInstance("node1", 0)); + } return mC; }