YARN-2933. Capacity Scheduler preemption policy should only consider capacity without labels temporarily. Contributed by Mayank Bansal

This commit is contained in:
Wangda Tan 2015-01-19 16:48:50 -08:00
parent cb0a15d201
commit 0a2d3e717d
3 changed files with 149 additions and 13 deletions

View File

@ -391,6 +391,9 @@ Release 2.7.0 - UNRELEASED
YARN-3015. yarn classpath command should support same options as hadoop YARN-3015. yarn classpath command should support same options as hadoop
classpath. (Contributed by Varun Saxena) classpath. (Contributed by Varun Saxena)
YARN-2933. Capacity Scheduler preemption policy should only consider capacity
without labels temporarily. (Mayank Bansal via wangda)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -30,15 +30,19 @@ import java.util.NavigableSet;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
@ -129,6 +133,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
private float percentageClusterPreemptionAllowed; private float percentageClusterPreemptionAllowed;
private double naturalTerminationFactor; private double naturalTerminationFactor;
private boolean observeOnly; private boolean observeOnly;
private Map<NodeId, Set<String>> labels;
public ProportionalCapacityPreemptionPolicy() { public ProportionalCapacityPreemptionPolicy() {
clock = new SystemClock(); clock = new SystemClock();
@ -168,6 +173,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
observeOnly = config.getBoolean(OBSERVE_ONLY, false); observeOnly = config.getBoolean(OBSERVE_ONLY, false);
rc = scheduler.getResourceCalculator(); rc = scheduler.getResourceCalculator();
labels = null;
} }
@VisibleForTesting @VisibleForTesting
@ -178,11 +184,36 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
@Override @Override
public void editSchedule() { public void editSchedule() {
CSQueue root = scheduler.getRootQueue(); CSQueue root = scheduler.getRootQueue();
Resource clusterResources = Resource clusterResources = Resources.clone(scheduler.getClusterResource());
Resources.clone(scheduler.getClusterResource()); clusterResources = getNonLabeledResources(clusterResources);
setNodeLabels(scheduler.getRMContext().getNodeLabelManager()
.getNodeLabels());
containerBasedPreemptOrKill(root, clusterResources); containerBasedPreemptOrKill(root, clusterResources);
} }
/**
* Setting Node Labels
*
* @param nodelabels
*/
public void setNodeLabels(Map<NodeId, Set<String>> nodelabels) {
labels = nodelabels;
}
/**
* This method returns all non labeled resources.
*
* @param clusterResources
* @return Resources
*/
private Resource getNonLabeledResources(Resource clusterResources) {
RMContext rmcontext = scheduler.getRMContext();
RMNodeLabelsManager lm = rmcontext.getNodeLabelManager();
Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
clusterResources);
return res == null ? clusterResources : res;
}
/** /**
* This method selects and tracks containers to be preempted. If a container * This method selects and tracks containers to be preempted. If a container
* is in the target list for more than maxWaitTime it is killed. * is in the target list for more than maxWaitTime it is killed.
@ -593,7 +624,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* @param app * @param app
* @param clusterResource * @param clusterResource
* @param rsrcPreempt * @param rsrcPreempt
* @return * @return Set<RMContainer> Set of RMContainers
*/ */
private Set<RMContainer> preemptFrom(FiCaSchedulerApp app, private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
Resource clusterResource, Resource rsrcPreempt, Resource clusterResource, Resource rsrcPreempt,
@ -635,6 +666,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
Resources.addTo(skippedAMSize, c.getContainer().getResource()); Resources.addTo(skippedAMSize, c.getContainer().getResource());
continue; continue;
} }
// skip Labeled resource
if(isLabeledContainer(c)){
continue;
}
ret.add(c); ret.add(c);
Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
} }
@ -642,6 +677,16 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
return ret; return ret;
} }
/**
* Checking if given container is a labeled container
*
* @param c
* @return true/false
*/
private boolean isLabeledContainer(RMContainer c) {
return labels.containsKey(c.getAllocatedNode());
}
/** /**
* Compare by reversed priority order first, and then reversed containerId * Compare by reversed priority order first, and then reversed containerId
* order * order

View File

@ -38,27 +38,37 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.Deque; import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
@ -72,12 +82,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.mortbay.log.Log;
public class TestProportionalCapacityPreemptionPolicy { public class TestProportionalCapacityPreemptionPolicy {
@ -85,14 +97,18 @@ public class TestProportionalCapacityPreemptionPolicy {
int appAlloc = 0; int appAlloc = 0;
boolean setAMContainer = false; boolean setAMContainer = false;
boolean setLabeledContainer = false;
float setAMResourcePercent = 0.0f; float setAMResourcePercent = 0.0f;
Random rand = null; Random rand = null;
Clock mClock = null; Clock mClock = null;
Configuration conf = null; Configuration conf = null;
CapacityScheduler mCS = null; CapacityScheduler mCS = null;
RMContext rmContext = null;
RMNodeLabelsManager lm = null;
CapacitySchedulerConfiguration schedConf = null; CapacitySchedulerConfiguration schedConf = null;
EventHandler<ContainerPreemptEvent> mDisp = null; EventHandler<ContainerPreemptEvent> mDisp = null;
ResourceCalculator rc = new DefaultResourceCalculator(); ResourceCalculator rc = new DefaultResourceCalculator();
Resource clusterResources = null;
final ApplicationAttemptId appA = ApplicationAttemptId.newInstance( final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 0), 0); ApplicationId.newInstance(TS, 0), 0);
final ApplicationAttemptId appB = ApplicationAttemptId.newInstance( final ApplicationAttemptId appB = ApplicationAttemptId.newInstance(
@ -108,6 +124,19 @@ public class TestProportionalCapacityPreemptionPolicy {
final ArgumentCaptor<ContainerPreemptEvent> evtCaptor = final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
ArgumentCaptor.forClass(ContainerPreemptEvent.class); ArgumentCaptor.forClass(ContainerPreemptEvent.class);
public enum priority {
AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2);
int value;
private priority(int value) {
this.value = value;
}
public int getValue() {
return this.value;
}
};
@Rule public TestName name = new TestName(); @Rule public TestName name = new TestName();
@Before @Before
@ -130,8 +159,12 @@ public class TestProportionalCapacityPreemptionPolicy {
mClock = mock(Clock.class); mClock = mock(Clock.class);
mCS = mock(CapacityScheduler.class); mCS = mock(CapacityScheduler.class);
when(mCS.getResourceCalculator()).thenReturn(rc); when(mCS.getResourceCalculator()).thenReturn(rc);
lm = mock(RMNodeLabelsManager.class);
schedConf = new CapacitySchedulerConfiguration(); schedConf = new CapacitySchedulerConfiguration();
when(mCS.getConfiguration()).thenReturn(schedConf); when(mCS.getConfiguration()).thenReturn(schedConf);
rmContext = mock(RMContext.class);
when(mCS.getRMContext()).thenReturn(rmContext);
when(rmContext.getNodeLabelManager()).thenReturn(lm);
mDisp = mock(EventHandler.class); mDisp = mock(EventHandler.class);
rand = new Random(); rand = new Random();
long seed = rand.nextLong(); long seed = rand.nextLong();
@ -747,6 +780,50 @@ public class TestProportionalCapacityPreemptionPolicy {
setAMContainer = false; setAMContainer = false;
} }
@Test
public void testIdealAllocationForLabels() {
int[][] qData = new int[][] {
// / A B
{ 80, 40, 40 }, // abs
{ 80, 80, 80 }, // maxcap
{ 80, 80, 0 }, // used
{ 70, 20, 50 }, // pending
{ 0, 0, 0 }, // reserved
{ 5, 4, 1 }, // apps
{ -1, 1, 1 }, // req granularity
{ 2, 0, 0 }, // subqueues
};
setAMContainer = true;
setLabeledContainer = true;
Map<NodeId, Set<String>> labels = new HashMap<NodeId, Set<String>>();
NodeId node = NodeId.newInstance("node1", 0);
Set<String> labelSet = new HashSet<String>();
labelSet.add("x");
labels.put(node, labelSet);
when(lm.getNodeLabels()).thenReturn(labels);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
// Subtracting Label X resources from cluster resources
when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
Resources.clone(Resource.newInstance(80, 0)));
clusterResources.setMemory(100);
policy.editSchedule();
// By skipping AM Container and Labeled container, all other 18 containers
// of appD will be
// preempted
verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD)));
// By skipping AM Container and Labeled container, all other 18 containers
// of appC will be
// preempted
verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC)));
// rest 4 containers from appB will be preempted
verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
setAMContainer = false;
setLabeledContainer = false;
}
@Test @Test
public void testPreemptSkippedAMContainers() { public void testPreemptSkippedAMContainers() {
int[][] qData = new int[][] { int[][] qData = new int[][] {
@ -846,7 +923,7 @@ public class TestProportionalCapacityPreemptionPolicy {
ParentQueue mRoot = buildMockRootQueue(rand, qData); ParentQueue mRoot = buildMockRootQueue(rand, qData);
when(mCS.getRootQueue()).thenReturn(mRoot); when(mCS.getRootQueue()).thenReturn(mRoot);
Resource clusterResources = clusterResources =
Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
when(mCS.getClusterResource()).thenReturn(clusterResources); when(mCS.getClusterResource()).thenReturn(clusterResources);
return policy; return policy;
@ -965,7 +1042,8 @@ public class TestProportionalCapacityPreemptionPolicy {
Resource unit = Resource.newInstance(gran, 0); Resource unit = Resource.newInstance(gran, 0);
List<RMContainer> cReserved = new ArrayList<RMContainer>(); List<RMContainer> cReserved = new ArrayList<RMContainer>();
for (int i = 0; i < reserved; i += gran) { for (int i = 0; i < reserved; i += gran) {
cReserved.add(mockContainer(appAttId, cAlloc, unit, 1)); cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
.getValue()));
++cAlloc; ++cAlloc;
} }
when(app.getReservedContainers()).thenReturn(cReserved); when(app.getReservedContainers()).thenReturn(cReserved);
@ -973,9 +1051,16 @@ public class TestProportionalCapacityPreemptionPolicy {
List<RMContainer> cLive = new ArrayList<RMContainer>(); List<RMContainer> cLive = new ArrayList<RMContainer>();
for (int i = 0; i < used; i += gran) { for (int i = 0; i < used; i += gran) {
if(setAMContainer && i == 0){ if(setAMContainer && i == 0){
cLive.add(mockContainer(appAttId, cAlloc, unit, 0)); cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER
}else{ .getValue()));
cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); }else if(setLabeledContainer && i ==1){
cLive.add(mockContainer(appAttId, cAlloc, unit,
priority.LABELEDCONTAINER.getValue()));
++used;
}
else{
cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
.getValue()));
} }
++cAlloc; ++cAlloc;
} }
@ -984,18 +1069,21 @@ public class TestProportionalCapacityPreemptionPolicy {
} }
RMContainer mockContainer(ApplicationAttemptId appAttId, int id, RMContainer mockContainer(ApplicationAttemptId appAttId, int id,
Resource r, int priority) { Resource r, int cpriority) {
ContainerId cId = ContainerId.newContainerId(appAttId, id); ContainerId cId = ContainerId.newContainerId(appAttId, id);
Container c = mock(Container.class); Container c = mock(Container.class);
when(c.getResource()).thenReturn(r); when(c.getResource()).thenReturn(r);
when(c.getPriority()).thenReturn(Priority.create(priority)); when(c.getPriority()).thenReturn(Priority.create(cpriority));
RMContainer mC = mock(RMContainer.class); RMContainer mC = mock(RMContainer.class);
when(mC.getContainerId()).thenReturn(cId); when(mC.getContainerId()).thenReturn(cId);
when(mC.getContainer()).thenReturn(c); when(mC.getContainer()).thenReturn(c);
when(mC.getApplicationAttemptId()).thenReturn(appAttId); when(mC.getApplicationAttemptId()).thenReturn(appAttId);
if(0 == priority){ if (priority.AMCONTAINER.getValue() == cpriority) {
when(mC.isAMContainer()).thenReturn(true); when(mC.isAMContainer()).thenReturn(true);
} }
if (priority.LABELEDCONTAINER.getValue() == cpriority) {
when(mC.getAllocatedNode()).thenReturn(NodeId.newInstance("node1", 0));
}
return mC; return mC;
} }