YARN-3215. Respect labels in CapacityScheduler when computing headroom. (Naganarasimha G R via wangda)

This commit is contained in:
Wangda Tan 2016-04-22 15:59:49 -07:00
parent 9891f51829
commit 358b54d063
11 changed files with 405 additions and 119 deletions

View File

@ -18,19 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -45,10 +32,24 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* This class keeps track of all the consumption of an application. This also
* keeps track of current running/completed containers for the application.
@ -74,6 +75,7 @@ public class AppSchedulingInfo {
private final Set<String> amBlacklist = new HashSet<>();
private Set<String> userBlacklist = new HashSet<>();
private Set<String> requestedPartitions = new HashSet<>();
final Set<Priority> priorities = new TreeSet<>(COMPARATOR);
final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
@ -118,6 +120,10 @@ public synchronized boolean isPending() {
return pending;
}
public Set<String> getRequestedPartitions() {
return requestedPartitions;
}
/**
* Clear any pending requests from this application.
*/
@ -339,6 +345,10 @@ public synchronized boolean updateResourceRequests(
asks.put(resourceName, request);
if (resourceName.equals(ResourceRequest.ANY)) {
//update the applications requested labels set
requestedPartitions.add(request.getNodeLabelExpression() == null
? RMNodeLabelsManager.NO_LABEL : request.getNodeLabelExpression());
anyResourcesUpdated = true;
// Activate application. Metrics activation is done here.

View File

@ -434,14 +434,12 @@ private Resource getCurrentLimitResource(String nodePartition,
* limit-set-by-parent)
*/
Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator,
labelManager.getResourceByLabel(nodePartition, clusterResource),
queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation);
getQueueMaxResource(nodePartition, clusterResource);
if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
return Resources.min(resourceCalculator, clusterResource,
queueMaxResource, currentResourceLimits.getLimit());
}
return queueMaxResource;
return queueMaxResource;
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
// When we doing non-exclusive resource allocation, maximum capacity of
// all queues on this label equals to total resource with the label.
@ -450,7 +448,14 @@ private Resource getCurrentLimitResource(String nodePartition,
return Resources.none();
}
Resource getQueueMaxResource(String nodePartition, Resource clusterResource) {
return Resources.multiplyAndNormalizeDown(resourceCalculator,
labelManager.getResourceByLabel(nodePartition, clusterResource),
queueCapacities.getAbsoluteMaximumCapacity(nodePartition),
minimumAllocation);
}
synchronized boolean canAssignToThisQueue(Resource clusterResource,
String nodePartition, ResourceLimits currentResourceLimits,
Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {

View File

@ -17,8 +17,12 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
public class CapacityHeadroomProvider {
@ -38,22 +42,32 @@ public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue,
}
public Resource getHeadroom() {
Resource queueCurrentLimit;
Resource clusterResource;
synchronized (queueResourceLimitsInfo) {
queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
clusterResource = queueResourceLimitsInfo.getClusterResource();
}
Resource headroom = queue.getHeadroom(user, queueCurrentLimit,
clusterResource, application);
Set<String> requestedPartitions =
application.getAppSchedulingInfo().getRequestedPartitions();
Resource headroom;
if (requestedPartitions.isEmpty() || (requestedPartitions.size() == 1
&& requestedPartitions.contains(RMNodeLabelsManager.NO_LABEL))) {
headroom = queue.getHeadroom(user, queueCurrentLimit, clusterResource,
application);
} else {
headroom = Resource.newInstance(0, 0);
for (String partition : requestedPartitions) {
Resource partitionHeadRoom = queue.getHeadroom(user, queueCurrentLimit,
clusterResource, application, partition);
Resources.addTo(headroom, partitionHeadRoom);
}
}
// Corner case to deal with applications being slightly over-limit
if (headroom.getMemory() < 0) {
headroom.setMemory(0);
}
return headroom;
}
}

View File

@ -18,24 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -101,6 +85,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -131,8 +116,23 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@LimitedPrivate("yarn")
@Evolving
@ -2057,4 +2057,9 @@ public void updateApplicationPriority(Priority newPriority,
+ rmApp.getQueue() + " for application: " + applicationId
+ " for the user: " + rmApp.getUser());
}
@Override
public ResourceUsage getClusterResourceUsage() {
return root.getQueueResourceUsage();
}
}

View File

@ -18,17 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import java.util.Comparator;
/**
* Read-only interface to {@link CapacityScheduler} context.
*/
@ -61,4 +64,15 @@ public interface CapacitySchedulerContext {
PartitionedQueueComparator getPartitionedQueueComparator();
FiCaSchedulerNode getNode(NodeId nodeId);
FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId attemptId);
SchedulerHealth getSchedulerHealth();
/**
* @return QueueCapacities root queue of the Capacity Scheduler Queue, root
* queue used capacities for different labels are same as that of the
* cluster.
*/
ResourceUsage getClusterResourceUsage();
}

View File

@ -945,14 +945,21 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
computeUserLimit(application, clusterResource, user,
RMNodeLabelsManager.NO_LABEL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
return getHeadroom(user, queueCurrentLimit, clusterResource, application,
RMNodeLabelsManager.NO_LABEL);
}
private Resource getHeadroom(User user, Resource currentResourceLimit,
Resource clusterResource, Resource userLimit) {
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application,
String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
computeUserLimit(application, clusterResource, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition);
}
private Resource getHeadroom(User user,
Resource currentPartitionResourceLimit, Resource clusterResource,
Resource userLimitResource, String partition) {
/**
* Headroom is:
* min(
@ -969,15 +976,33 @@ private Resource getHeadroom(User user, Resource currentResourceLimit,
*
* >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) <<
*
* sum of queue max capacities of multiple queue's will be greater than the
* actual capacity of a given partition, hence we need to ensure that the
* headroom is not greater than the available resource for a given partition
*
* headroom = min (unused resourcelimit of a label, calculated headroom )
*/
Resource headroom =
Resources.componentwiseMin(
Resources.subtract(userLimit, user.getUsed()),
Resources.subtract(currentResourceLimit, queueUsage.getUsed())
);
currentPartitionResourceLimit =
partition.equals(RMNodeLabelsManager.NO_LABEL)
? currentPartitionResourceLimit
: getQueueMaxResource(partition, clusterResource);
Resource headroom = Resources.componentwiseMin(
Resources.subtract(userLimitResource, user.getUsed(partition)),
Resources.subtract(currentPartitionResourceLimit,
queueUsage.getUsed(partition)));
// Normalize it before return
headroom =
Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
//headroom = min (unused resourcelimit of a label, calculated headroom )
Resource clusterPartitionResource =
labelManager.getResourceByLabel(partition, clusterResource);
Resource clusterFreePartitionResource =
Resources.subtract(clusterPartitionResource,
csContext.getClusterResourceUsage().getUsed(partition));
headroom = Resources.min(resourceCalculator, clusterPartitionResource,
clusterFreePartitionResource, headroom);
return headroom;
}
@ -1004,10 +1029,10 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
nodePartition, schedulingMode);
setQueueResourceLimitsInfo(clusterResource);
Resource headroom =
getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
clusterResource, userLimit);
clusterResource, userLimit, nodePartition);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +

View File

@ -17,23 +17,6 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
@ -52,6 +35,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -64,6 +48,23 @@
import org.mockito.Matchers;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestApplicationLimits {
private static final Log LOG = LogFactory.getLog(TestApplicationLimits.class);
@ -577,8 +578,12 @@ public void testHeadroom() throws Exception {
when(csContext.getClusterResource()).thenReturn(clusterResource);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
queues, queues, TestUtils.spyHook);
CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
"root", queues, queues, TestUtils.spyHook);
ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())
.thenReturn(queueCapacities);
// Manipulate queue 'a'
LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
@ -655,8 +660,7 @@ public void testHeadroom() throws Exception {
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// TODO, need fix headroom in future patch
// assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
// Submit first application from user_1, check for new headroom
final ApplicationAttemptId appAttemptId_1_0 =
@ -677,9 +681,8 @@ public void testHeadroom() throws Exception {
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// TODO, need fix headroom in future patch
// assertEquals(expectedHeadroom, app_0_1.getHeadroom());
// assertEquals(expectedHeadroom, app_1_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());
assertEquals(expectedHeadroom, app_1_0.getHeadroom());
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
@ -687,9 +690,8 @@ public void testHeadroom() throws Exception {
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// TODO, need fix headroom in future patch
// assertEquals(expectedHeadroom, app_0_1.getHeadroom());
// assertEquals(expectedHeadroom, app_1_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());
assertEquals(expectedHeadroom, app_1_0.getHeadroom());
}

View File

@ -18,12 +18,29 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -31,11 +48,21 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@ -47,7 +74,8 @@ public class TestApplicationLimitsByPartition {
RMNodeLabelsManager mgr;
private YarnConfiguration conf;
RMContext rmContext = null;
private final ResourceCalculator resourceCalculator =
new DefaultResourceCalculator();
@Before
public void setUp() throws IOException {
@ -538,4 +566,174 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm1.close();
}
@Test
public void testHeadroom() throws Exception {
/*
* Test Case: Verify Headroom calculated is sum of headrooms for each
* partition requested. So submit a app with requests for default partition
* and 'x' partition, so the total headroom for the user should be sum of
* the head room for both labels.
*/
simpleNodeLabelMappingToManager();
CapacitySchedulerConfiguration csConf =
(CapacitySchedulerConfiguration) TestUtils
.getComplexConfigurationWithQueueLabels(conf);
final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
final String B2 = CapacitySchedulerConfiguration.ROOT + ".b" + ".b2";
csConf.setUserLimit(A1, 25);
csConf.setUserLimit(B2, 25);
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getMinimumResourceCapability())
.thenReturn(Resources.createResource(GB));
when(csContext.getMaximumResourceCapability())
.thenReturn(Resources.createResource(16 * GB));
when(csContext.getNonPartitionedQueueComparator())
.thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
RMContext rmContext = TestUtils.getMockRMContext();
RMContext spyRMContext = spy(rmContext);
when(spyRMContext.getNodeLabelManager()).thenReturn(mgr);
when(csContext.getRMContext()).thenReturn(spyRMContext);
mgr.activateNode(NodeId.newInstance("h0", 0),
Resource.newInstance(160 * GB, 16)); // default Label
mgr.activateNode(NodeId.newInstance("h1", 0),
Resource.newInstance(160 * GB, 16)); // label x
mgr.activateNode(NodeId.newInstance("h2", 0),
Resource.newInstance(160 * GB, 16)); // label y
// Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(160 * GB);
when(csContext.getClusterResource()).thenReturn(clusterResource);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
"root", queues, queues, TestUtils.spyHook);
ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())
.thenReturn(queueResUsage);
// Manipulate queue 'a'
LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue) queues.get("b2"));
queue.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
String rack_0 = "rack_0";
FiCaSchedulerNode node_0 = TestUtils.getMockNode("h0", rack_0, 0, 160 * GB);
FiCaSchedulerNode node_1 = TestUtils.getMockNode("h1", rack_0, 0, 160 * GB);
final String user_0 = "user_0";
final String user_1 = "user_1";
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ConcurrentMap<ApplicationId, RMApp> spyApps =
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
RMApp rmApp = mock(RMApp.class);
ResourceRequest amResourceRequest = mock(ResourceRequest.class);
Resource amResource = Resources.createResource(0, 0);
when(amResourceRequest.getCapability()).thenReturn(amResource);
when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
when(spyRMContext.getRMApps()).thenReturn(spyApps);
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
.thenReturn(rmAppAttempt);
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
Mockito.doReturn(true).when(spyApps)
.containsKey((ApplicationId) Matchers.any());
Priority priority_1 = TestUtils.createMockPriority(1);
// Submit first application with some resource-requests from user_0,
// and check headroom
final ApplicationAttemptId appAttemptId_0_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(appAttemptId_0_0, user_0,
queue, queue.getActiveUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_0, user_0);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
app_0_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory));
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
//head room = queue capacity = 50 % 90% 160 GB
Resource expectedHeadroom =
Resources.createResource((int) (0.5 * 0.9 * 160) * GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// Submit second application from user_0, check headroom
final ApplicationAttemptId appAttemptId_0_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(appAttemptId_0_1, user_0,
queue, queue.getActiveUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_1, user_0);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory));
app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory, "y"));
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
queue.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
assertEquals(expectedHeadroom, app_0_0.getHeadroom());// no change
//head room for default label + head room for y partition
//head room for y partition = 100% 50%(b queue capacity ) * 160 * GB
Resource expectedHeadroomWithReqInY =
Resources.add(Resources.createResource((int) (0.5 * 160) * GB, 1), expectedHeadroom);
assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
// Submit first application from user_1, check for new headroom
final ApplicationAttemptId appAttemptId_1_0 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(appAttemptId_1_0, user_1,
queue, queue.getActiveUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_1_0, user_1);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory));
app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory, "y"));
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
//head room = queue capacity = (50 % 90% 160 GB)/2 (for 2 users)
expectedHeadroom =
Resources.createResource((int) (0.5 * 0.9 * 160 * 0.5) * GB, 1);
//head room for default label + head room for y partition
//head room for y partition = 100% 50%(b queue capacity ) * 160 * GB
expectedHeadroomWithReqInY =
Resources.add(Resources.createResource((int) (0.5 * 0.5 * 160) * GB, 1),
expectedHeadroom);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
assertEquals(expectedHeadroomWithReqInY, app_1_0.getHeadroom());
}
}

View File

@ -18,36 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -71,6 +44,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -90,7 +64,29 @@
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mortbay.log.Log;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestLeafQueue {
private final RecordFactory recordFactory =
@ -163,6 +159,10 @@ public void setUp() throws Exception {
queues, queues,
TestUtils.spyHook);
ResourceUsage queueResUsage = root.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())
.thenReturn(queueResUsage);
cs.setRMContext(spyRMContext);
cs.init(csConf);
cs.start();

View File

@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -136,6 +137,10 @@ private void setup(CapacitySchedulerConfiguration csConf,
root = CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
ResourceUsage queueResUsage = root.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())
.thenReturn(queueResUsage);
spyRMContext = spy(rmContext);
when(spyRMContext.getScheduler()).thenReturn(cs);
when(spyRMContext.getYarnConfiguration())

View File

@ -159,7 +159,7 @@ public static Priority createMockPriority( int priority) {
public static ResourceRequest createResourceRequest(
String resourceName, int memory, int numContainers, boolean relaxLocality,
Priority priority, RecordFactory recordFactory) {
Priority priority, RecordFactory recordFactory, String labelExpression) {
ResourceRequest request =
recordFactory.newRecordInstance(ResourceRequest.class);
Resource capability = Resources.createResource(memory, 1);
@ -169,10 +169,18 @@ public static ResourceRequest createResourceRequest(
request.setCapability(capability);
request.setRelaxLocality(relaxLocality);
request.setPriority(priority);
request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
request.setNodeLabelExpression(labelExpression);
return request;
}
public static ResourceRequest createResourceRequest(
String resourceName, int memory, int numContainers, boolean relaxLocality,
Priority priority,
RecordFactory recordFactory) {
return createResourceRequest(resourceName, memory, numContainers,
relaxLocality, priority, recordFactory, RMNodeLabelsManager.NO_LABEL);
}
public static ApplicationId getMockApplicationId(int appId) {
return ApplicationId.newInstance(0L, appId);
}