YARN-7332. Compute effectiveCapacity per each resource vector. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2017-10-27 10:16:33 -07:00
parent d52627a7cb
commit aa3f62740f
2 changed files with 153 additions and 7 deletions

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
@ -68,7 +69,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@ -928,24 +931,25 @@ public class ParentQueue extends AbstractCSQueue {
// Factor to scale down effective resource: When cluster has sufficient
// resources, effective_min_resources will be same as configured
// min_resources.
float effectiveMinRatio = 1;
Resource numeratorForMinRatio = null;
ResourceCalculator rc = this.csContext.getResourceCalculator();
if (getQueueName().equals("root")) {
if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc,
clusterResource, resourceByLabel, configuredMinResources)) {
effectiveMinRatio = Resources.divide(rc, clusterResource,
resourceByLabel, configuredMinResources);
numeratorForMinRatio = resourceByLabel;
}
} else {
if (Resources.lessThan(rc, clusterResource,
queueResourceQuotas.getEffectiveMinResource(label),
configuredMinResources)) {
effectiveMinRatio = Resources.divide(rc, clusterResource,
queueResourceQuotas.getEffectiveMinResource(label),
configuredMinResources);
numeratorForMinRatio = queueResourceQuotas
.getEffectiveMinResource(label);
}
}
Map<String, Float> effectiveMinRatioPerResource = getEffectiveMinRatioPerResource(
configuredMinResources, numeratorForMinRatio);
// loop and do this for all child queues
for (CSQueue childQueue : getChildQueues()) {
Resource minResource = childQueue.getQueueResourceQuotas()
@ -955,7 +959,8 @@ public class ParentQueue extends AbstractCSQueue {
if (childQueue.getCapacityConfigType()
.equals(CapacityConfigType.ABSOLUTE_RESOURCE)) {
childQueue.getQueueResourceQuotas().setEffectiveMinResource(label,
Resources.multiply(minResource, effectiveMinRatio));
getMinResourceNormalized(childQueue.getQueueName(), effectiveMinRatioPerResource,
minResource));
// Max resource of a queue should be a minimum of {configuredMaxRes,
// parentMaxRes}. parentMaxRes could be configured value. But if not
@ -1003,6 +1008,53 @@ public class ParentQueue extends AbstractCSQueue {
}
}
private Resource getMinResourceNormalized(String name, Map<String, Float> effectiveMinRatio,
Resource minResource) {
Resource ret = Resource.newInstance(minResource);
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) {
ResourceInformation nResourceInformation = minResource
.getResourceInformation(i);
Float ratio = effectiveMinRatio.get(nResourceInformation.getName());
if (ratio != null) {
ret.setResourceValue(i,
(long) (nResourceInformation.getValue() * ratio.floatValue()));
if (LOG.isDebugEnabled()) {
LOG.debug("Updating min resource for Queue: " + name + " as "
+ ret.getResourceInformation(i) + ", Actual resource: "
+ nResourceInformation.getValue() + ", ratio: "
+ ratio.floatValue());
}
}
}
return ret;
}
private Map<String, Float> getEffectiveMinRatioPerResource(
Resource configuredMinResources, Resource numeratorForMinRatio) {
Map<String, Float> effectiveMinRatioPerResource = new HashMap<>();
if (numeratorForMinRatio != null) {
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) {
ResourceInformation nResourceInformation = numeratorForMinRatio
.getResourceInformation(i);
ResourceInformation dResourceInformation = configuredMinResources
.getResourceInformation(i);
long nValue = nResourceInformation.getValue();
long dValue = UnitsConversionUtil.convert(
dResourceInformation.getUnits(), nResourceInformation.getUnits(),
dResourceInformation.getValue());
if (dValue != 0) {
effectiveMinRatioPerResource.put(nResourceInformation.getName(),
(float) nValue / dValue);
}
}
}
return effectiveMinRatioPerResource;
}
private void deriveCapacityFromAbsoluteConfigurations(String label,
Resource clusterResource, ResourceCalculator rc, CSQueue childQueue) {

View File

@ -68,6 +68,11 @@ import org.mockito.stubbing.Answer;
public class TestParentQueue {
private static final Resource QUEUE_B_RESOURCE = Resource
.newInstance(14 * 1024, 22);
private static final Resource QUEUE_A_RESOURCE = Resource
.newInstance(6 * 1024, 10);
private static final Log LOG = LogFactory.getLog(TestParentQueue.class);
RMContext rmContext;
@ -118,6 +123,23 @@ public class TestParentQueue {
LOG.info("Setup top-level queues a and b");
}
private void setupSingleLevelQueuesWithAbsoluteResource(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A, B});
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
conf.setMinimumResourceRequirement("", Q_A,
QUEUE_A_RESOURCE);
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
conf.setMinimumResourceRequirement("", Q_B,
QUEUE_B_RESOURCE);
LOG.info("Setup top-level queues a and b with absolute resource");
}
private FiCaSchedulerApp getMockApplication(int appId, String user) {
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
doReturn(user).when(application).getUser();
@ -931,6 +953,78 @@ public class TestParentQueue {
reset(c);
}
@Test
public void testAbsoluteResourceWithChangeInClusterResource()
throws Exception {
// Setup queue configs
setupSingleLevelQueuesWithAbsoluteResource(csConf);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf,
null, CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
// Setup some nodes
final int memoryPerNode = 10;
int coresPerNode = 16;
int numNodes = 2;
Resource clusterResource = Resources.createResource(
numNodes * (memoryPerNode * GB), numNodes * coresPerNode);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Start testing
LeafQueue a = (LeafQueue) queues.get(A);
LeafQueue b = (LeafQueue) queues.get(B);
assertEquals(a.getQueueResourceQuotas().getConfiguredMinResource(),
QUEUE_A_RESOURCE);
assertEquals(b.getQueueResourceQuotas().getConfiguredMinResource(),
QUEUE_B_RESOURCE);
assertEquals(a.getQueueResourceQuotas().getEffectiveMinResource(),
QUEUE_A_RESOURCE);
assertEquals(b.getQueueResourceQuotas().getEffectiveMinResource(),
QUEUE_B_RESOURCE);
numNodes = 1;
clusterResource = Resources.createResource(numNodes * (memoryPerNode * GB),
numNodes * coresPerNode);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
Resource QUEUE_B_RESOURCE_HALF = Resource.newInstance(7 * 1024, 11);
Resource QUEUE_A_RESOURCE_HALF = Resource.newInstance(3 * 1024, 5);
assertEquals(a.getQueueResourceQuotas().getConfiguredMinResource(),
QUEUE_A_RESOURCE);
assertEquals(b.getQueueResourceQuotas().getConfiguredMinResource(),
QUEUE_B_RESOURCE);
assertEquals(a.getQueueResourceQuotas().getEffectiveMinResource(),
QUEUE_A_RESOURCE_HALF);
assertEquals(b.getQueueResourceQuotas().getEffectiveMinResource(),
QUEUE_B_RESOURCE_HALF);
coresPerNode = 40;
clusterResource = Resources.createResource(numNodes * (memoryPerNode * GB),
numNodes * coresPerNode);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
Resource QUEUE_B_RESOURCE_70PERC = Resource.newInstance(7 * 1024, 27);
Resource QUEUE_A_RESOURCE_30PERC = Resource.newInstance(3 * 1024, 12);
assertEquals(a.getQueueResourceQuotas().getConfiguredMinResource(),
QUEUE_A_RESOURCE);
assertEquals(b.getQueueResourceQuotas().getConfiguredMinResource(),
QUEUE_B_RESOURCE);
assertEquals(a.getQueueResourceQuotas().getEffectiveMinResource(),
QUEUE_A_RESOURCE_30PERC);
assertEquals(b.getQueueResourceQuotas().getEffectiveMinResource(),
QUEUE_B_RESOURCE_70PERC);
}
@After
public void tearDown() throws Exception {
}