YARN-7557. It should be possible to specify resource types in the fair scheduler increment value (grepas via rkanter)

This commit is contained in:
Robert Kanter 2018-01-05 11:15:06 -08:00
parent 83b513ac6d
commit f8e7dd9b10
4 changed files with 379 additions and 13 deletions

View File

@ -59,7 +59,7 @@ public class ResourceUtils {
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
private static final String VCORES = ResourceInformation.VCORES.getName();
private static final Pattern RESOURCE_REQUEST_VALUE_PATTERN =
public static final Pattern RESOURCE_REQUEST_VALUE_PATTERN =
Pattern.compile("^([0-9]+) ?([a-zA-Z]*)$");
private static final Pattern RESOURCE_NAME_PATTERN = Pattern.compile(

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.apache.hadoop.yarn.util.resource.ResourceUtils.RESOURCE_REQUEST_VALUE_PATTERN;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -27,8 +31,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -38,12 +44,18 @@ public class FairSchedulerConfiguration extends Configuration {
public static final Log LOG = LogFactory.getLog(
FairSchedulerConfiguration.class.getName());
/** Increment request grant-able by the RM scheduler.
* These properties are looked up in the yarn-site.xml */
/** Increment request grant-able by the RM scheduler.
* These properties are looked up in the yarn-site.xml.
* Kept for backward-compatibility - the new preferred way to configure the
* increment is the yarn.resource-types.{RESOURCE_NAME}.increment-allocation
* property, specifically: yarn.resource-types.memory-mb.increment-allocation
* for memory and yarn.resource-types.vcores.increment-allocation for CPU */
@Deprecated
public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB =
YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-mb";
public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB = 1024;
@Deprecated
public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES =
YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores";
public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1;
@ -112,6 +124,12 @@ public class FairSchedulerConfiguration extends Configuration {
protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
/**
* Postfix for resource allocation increments in the
* yarn.resource-types.{RESOURCE_NAME}.increment-allocation property.
*/
static final String INCREMENT_ALLOCATION = ".increment-allocation";
/**
* Configurable delay (ms) before an app's starvation is considered after
* it is identified. This is to give the scheduler enough time to
@ -185,13 +203,69 @@ public Resource getMaximumAllocation() {
}
public Resource getIncrementAllocation() {
int incrementMemory = getInt(
RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
int incrementCores = getInt(
RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES);
return Resources.createResource(incrementMemory, incrementCores);
Long memory = null;
Integer vCores = null;
Map<String, Long> others = new HashMap<>();
ResourceInformation[] resourceTypes = ResourceUtils.getResourceTypesArray();
for (int i=0; i < resourceTypes.length; ++i) {
String name = resourceTypes[i].getName();
String propertyKey = getAllocationIncrementPropKey(name);
String propValue = get(propertyKey);
if (propValue != null) {
Matcher matcher = RESOURCE_REQUEST_VALUE_PATTERN.matcher(propValue);
if (matcher.matches()) {
long value = Long.parseLong(matcher.group(1));
String unit = matcher.group(2);
long valueInDefaultUnits = getValueInDefaultUnits(value, unit, name);
others.put(name, valueInDefaultUnits);
} else {
throw new IllegalArgumentException("Property " + propertyKey +
" is not in \"value [unit]\" format: " + propValue);
}
}
}
if (others.containsKey(ResourceInformation.MEMORY_MB.getName())) {
memory = others.get(ResourceInformation.MEMORY_MB.getName());
if (get(RM_SCHEDULER_INCREMENT_ALLOCATION_MB) != null) {
String overridingKey = getAllocationIncrementPropKey(
ResourceInformation.MEMORY_MB.getName());
LOG.warn("Configuration " + overridingKey + "=" + get(overridingKey) +
" is overriding the " + RM_SCHEDULER_INCREMENT_ALLOCATION_MB +
"=" + get(RM_SCHEDULER_INCREMENT_ALLOCATION_MB) + " property");
}
others.remove(ResourceInformation.MEMORY_MB.getName());
} else {
memory = getLong(
RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
}
if (others.containsKey(ResourceInformation.VCORES.getName())) {
vCores = others.get(ResourceInformation.VCORES.getName()).intValue();
if (get(RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES) != null) {
String overridingKey = getAllocationIncrementPropKey(
ResourceInformation.VCORES.getName());
LOG.warn("Configuration " + overridingKey + "=" + get(overridingKey) +
" is overriding the " + RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES +
"=" + get(RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES) + " property");
}
others.remove(ResourceInformation.VCORES.getName());
} else {
vCores = getInt(
RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES);
}
return Resource.newInstance(memory, vCores, others);
}
private long getValueInDefaultUnits(long value, String unit,
String resourceName) {
return unit.isEmpty() ? value : UnitsConversionUtil.convert(unit,
ResourceUtils.getDefaultUnit(resourceName), value);
}
private String getAllocationIncrementPropKey(String resourceName) {
return YarnConfiguration.RESOURCE_TYPES + "." + resourceName +
INCREMENT_ALLOCATION;
}
public float getReservationThresholdIncrementMultiple() {

View File

@ -19,12 +19,87 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.parseResourceConfigValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Assert;
import org.junit.Test;
public class TestFairSchedulerConfiguration {
private static final String A_CUSTOM_RESOURCE = "a-custom-resource";
private static class CustomResourceTypesConfigurationProvider
extends LocalConfigurationProvider {
@Override
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
String name) throws YarnException, IOException {
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
return new ByteArrayInputStream((
"<configuration>\n" +
" <property>\n" +
" <name>yarn.resource-types</name>\n" +
" <value>" + A_CUSTOM_RESOURCE + "</value>\n" +
" </property>\n" +
" <property>\n" +
" <name>yarn.resource-types.a-custom-resource.units</name>\n" +
" <value>k</value>\n" +
" </property>\n" +
"</configuration>\n")
.getBytes());
} else {
return super.getConfigurationInputStream(bootstrapConf, name);
}
}
}
private static class TestAppender extends AppenderSkeleton {
private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
@Override
public boolean requiresLayout() {
return false;
}
@Override
public void close() {
}
@Override
protected void append(LoggingEvent arg0) {
logEvents.add(arg0);
}
private List<LoggingEvent> getLogEvents() {
return logEvents;
}
}
@Test
public void testParseResourceConfigValue() throws Exception {
assertEquals(BuilderUtils.newResource(1024, 2),
@ -122,4 +197,218 @@ public void testCpuPercentageMemoryAbsolute() throws Exception {
public void testMemoryPercentageCpuAbsolute() throws Exception {
parseResourceConfigValue("50% memory, 2 vcores");
}
@Test
public void testAllocationIncrementMemoryDefaultUnit() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RESOURCE_TYPES + "." +
ResourceInformation.MEMORY_MB.getName() +
FairSchedulerConfiguration.INCREMENT_ALLOCATION, "256");
FairSchedulerConfiguration fsc = new FairSchedulerConfiguration(conf);
Resource minimum = Resources.createResource(0L, 0);
Resource maximum =
Resources.createResource(Long.MAX_VALUE, Integer.MAX_VALUE);
Resource increment = fsc.getIncrementAllocation();
DominantResourceCalculator resourceCalculator =
new DominantResourceCalculator();
assertEquals(1024L, resourceCalculator.normalize(
Resources.createResource(769L), minimum, maximum, increment)
.getMemorySize());
assertEquals(1024L, resourceCalculator.normalize(
Resources.createResource(1023L), minimum, maximum, increment)
.getMemorySize());
assertEquals(1024L, resourceCalculator.normalize(
Resources.createResource(1024L), minimum, maximum, increment)
.getMemorySize());
assertEquals(1280L, resourceCalculator.normalize(
Resources.createResource(1025L), minimum, maximum, increment)
.getMemorySize());
}
@Test
public void testAllocationIncrementMemoryNonDefaultUnit() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RESOURCE_TYPES + "." +
ResourceInformation.MEMORY_MB.getName() +
FairSchedulerConfiguration.INCREMENT_ALLOCATION, "1 Gi");
FairSchedulerConfiguration fsc = new FairSchedulerConfiguration(conf);
Resource minimum = Resources.createResource(0L, 0);
Resource maximum =
Resources.createResource(Long.MAX_VALUE, Integer.MAX_VALUE);
Resource increment = fsc.getIncrementAllocation();
DominantResourceCalculator resourceCalculator =
new DominantResourceCalculator();
assertEquals(1024L, resourceCalculator.normalize(
Resources.createResource(1023L), minimum, maximum, increment)
.getMemorySize());
assertEquals(1024L, resourceCalculator.normalize(
Resources.createResource(1024L), minimum, maximum, increment)
.getMemorySize());
assertEquals(2048L, resourceCalculator.normalize(
Resources.createResource(1025L), minimum, maximum, increment)
.getMemorySize());
}
@Test(expected=IllegalArgumentException.class)
public void testAllocationIncrementInvalidUnit() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RESOURCE_TYPES + "." +
ResourceInformation.MEMORY_MB.getName() +
FairSchedulerConfiguration.INCREMENT_ALLOCATION, "1 Xi");
new FairSchedulerConfiguration(conf).getIncrementAllocation();
}
@Test
public void testAllocationIncrementVCoreNoUnit() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RESOURCE_TYPES + "." +
ResourceInformation.VCORES.getName() +
FairSchedulerConfiguration.INCREMENT_ALLOCATION, "10");
FairSchedulerConfiguration fsc = new FairSchedulerConfiguration(conf);
Resource min = Resources.createResource(0L, 0);
Resource max = Resources.createResource(Long.MAX_VALUE, Integer.MAX_VALUE);
Resource increment = fsc.getIncrementAllocation();
DominantResourceCalculator resourceCalculator =
new DominantResourceCalculator();
assertEquals(10, resourceCalculator.normalize(
Resources.createResource(0L, 9), min, max, increment)
.getVirtualCores());
assertEquals(10, resourceCalculator.normalize(
Resources.createResource(0L, 10), min, max, increment)
.getVirtualCores());
assertEquals(20, resourceCalculator.normalize(
Resources.createResource(0L, 11), min, max, increment)
.getVirtualCores());
}
@Test
public void testAllocationIncrementVCoreWithUnit() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RESOURCE_TYPES + "." +
ResourceInformation.VCORES.getName() +
FairSchedulerConfiguration.INCREMENT_ALLOCATION, "1k");
FairSchedulerConfiguration fsc = new FairSchedulerConfiguration(conf);
Resource min = Resources.createResource(0L, 0);
Resource max = Resources.createResource(Long.MAX_VALUE, Integer.MAX_VALUE);
Resource increment = fsc.getIncrementAllocation();
DominantResourceCalculator resourceCalculator =
new DominantResourceCalculator();
assertEquals(1000, resourceCalculator.normalize(
Resources.createResource(0L, 999), min, max, increment)
.getVirtualCores());
assertEquals(1000, resourceCalculator.normalize(
Resources.createResource(0L, 1000), min, max, increment)
.getVirtualCores());
assertEquals(2000, resourceCalculator.normalize(
Resources.createResource(0L, 1001), min, max, increment)
.getVirtualCores());
}
@Test
public void testAllocationIncrementCustomResource() throws Exception {
try {
initResourceTypes();
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RESOURCE_TYPES + ".a-custom-resource" +
FairSchedulerConfiguration.INCREMENT_ALLOCATION, "10");
FairSchedulerConfiguration fsc = new FairSchedulerConfiguration(conf);
Resource increment = fsc.getIncrementAllocation();
DominantResourceCalculator calculator =
new DominantResourceCalculator();
Resource min = Resources.createResource(0L, 0);
Resource max = Resource.newInstance(Long.MAX_VALUE,
Integer.MAX_VALUE, Collections.singletonMap(A_CUSTOM_RESOURCE,
Long.MAX_VALUE / UnitsConversionUtil.convert("k", "", 1L)));
assertEquals(customResourceInformation(10000L, ""),
calculator.normalize(customResource(9999L, ""), min, max, increment)
.getResourceInformation(A_CUSTOM_RESOURCE));
assertEquals(customResourceInformation(10000L, ""),
calculator.normalize(customResource(10000L, ""), min, max, increment)
.getResourceInformation(A_CUSTOM_RESOURCE));
assertEquals(customResourceInformation(20000L, ""),
calculator.normalize(customResource(10001L, ""), min, max, increment)
.getResourceInformation(A_CUSTOM_RESOURCE));
assertEquals(customResourceInformation(10L, "k"),
calculator.normalize(customResource(9L, "k"), min, max, increment)
.getResourceInformation(A_CUSTOM_RESOURCE));
assertEquals(customResourceInformation(10L, "k"),
calculator.normalize(customResource(10L, "k"), min, max, increment)
.getResourceInformation(A_CUSTOM_RESOURCE));
assertEquals(customResourceInformation(20L, "k"),
calculator.normalize(customResource(11L, "k"), min, max, increment)
.getResourceInformation(A_CUSTOM_RESOURCE));
} finally {
ResourceUtils.resetResourceTypes(new Configuration());
}
}
private Resource customResource(long value, String units) {
return new LightWeightResource(0L, 0, new ResourceInformation[] {
null, null, customResourceInformation(value, units) });
}
private ResourceInformation customResourceInformation(long value,
String units) {
return ResourceInformation.newInstance(A_CUSTOM_RESOURCE, units, value,
ResourceTypes.COUNTABLE, 0L, Long.MAX_VALUE);
}
private void initResourceTypes() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
CustomResourceTypesConfigurationProvider.class.getName());
ResourceUtils.resetResourceTypes(conf);
}
@Test
public void testMemoryIncrementConfiguredViaMultipleProperties() {
TestAppender testAppender = new TestAppender();
Log4JLogger logger = (Log4JLogger) FairSchedulerConfiguration.LOG;
logger.getLogger().addAppender(testAppender);
try {
Configuration conf = new Configuration();
conf.set("yarn.scheduler.increment-allocation-mb", "7");
conf.set(YarnConfiguration.RESOURCE_TYPES + "." +
ResourceInformation.MEMORY_MB.getName() +
FairSchedulerConfiguration.INCREMENT_ALLOCATION, "13");
FairSchedulerConfiguration fsc = new FairSchedulerConfiguration(conf);
Resource increment = fsc.getIncrementAllocation();
Assert.assertEquals(13L, increment.getMemorySize());
assertTrue("Warning message is not logged when specifying memory " +
"increment via multiple properties",
testAppender.getLogEvents().stream().anyMatch(
e -> e.getLevel() == Level.WARN && ("Configuration " +
"yarn.resource-types.memory-mb.increment-allocation=13 is " +
"overriding the yarn.scheduler.increment-allocation-mb=7 " +
"property").equals(e.getMessage())));
} finally {
logger.getLogger().removeAppender(testAppender);
}
}
@Test
public void testCpuIncrementConfiguredViaMultipleProperties() {
TestAppender testAppender = new TestAppender();
Log4JLogger logger = (Log4JLogger) FairSchedulerConfiguration.LOG;
logger.getLogger().addAppender(testAppender);
try {
Configuration conf = new Configuration();
conf.set("yarn.scheduler.increment-allocation-vcores", "7");
conf.set(YarnConfiguration.RESOURCE_TYPES + "." +
ResourceInformation.VCORES.getName() +
FairSchedulerConfiguration.INCREMENT_ALLOCATION, "13");
FairSchedulerConfiguration fsc = new FairSchedulerConfiguration(conf);
Resource increment = fsc.getIncrementAllocation();
Assert.assertEquals(13, increment.getVirtualCores());
assertTrue("Warning message is not logged when specifying CPU vCores " +
"increment via multiple properties",
testAppender.getLogEvents().stream().anyMatch(
e -> e.getLevel() == Level.WARN && ("Configuration " +
"yarn.resource-types.vcores.increment-allocation=13 is " +
"overriding the yarn.scheduler.increment-allocation-vcores=7 " +
"property").equals(e.getMessage())));
} finally {
logger.getLogger().removeAppender(testAppender);
}
}
}

View File

@ -74,8 +74,11 @@ Customizing the Fair Scheduler typically involves altering two files. First, sch
| `yarn.scheduler.fair.locality.threshold.rack` | For applications that request containers on particular racks, the number of scheduling opportunities since the last container assignment to wait before accepting a placement on another rack. Expressed as a float between 0 and 1, which, as a fraction of the cluster size, is the number of scheduling opportunities to pass up. The default value of -1.0 means don't pass up any scheduling opportunities. |
| `yarn.scheduler.fair.allow-undeclared-pools` | If this is true, new queues can be created at application submission time, whether because they are specified as the application's queue by the submitter or because they are placed there by the user-as-default-queue property. If this is false, any time an app would be placed in a queue that is not specified in the allocations file, it is placed in the "default" queue instead. Defaults to true. If a queue placement policy is given in the allocations file, this property is ignored. |
| `yarn.scheduler.fair.update-interval-ms` | The interval at which to lock the scheduler and recalculate fair shares, recalculate demand, and check whether anything is due for preemption. Defaults to 500 ms. |
| `yarn.scheduler.increment-allocation-mb` | The fairscheduler grants memory in increments of this value. If you submit a task with resource request that is not a multiple of increment-allocation-mb, the request will be rounded up to the nearest increment. Defaults to 1024 MB. |
| `yarn.scheduler.increment-allocation-vcores` | The fairscheduler grants vcores in increments of this value. If you submit a task with resource request that is not a multiple of increment-allocation-vcores, the request will be rounded up to the nearest increment. Defaults to 1. |
| `yarn.resource-types.memory-mb.increment-allocation` | The fairscheduler grants memory in increments of this value. If you submit a task with resource request that is not a multiple of `memory-mb.increment-allocation`, the request will be rounded up to the nearest increment. Defaults to 1024 MB. |
| `yarn.resource-types.vcores.increment-allocation` | The fairscheduler grants vcores in increments of this value. If you submit a task with resource request that is not a multiple of `vcores.increment-allocation`, the request will be rounded up to the nearest increment. Defaults to 1. |
| `yarn.resource-types.<resource>.increment-allocation` | The fairscheduler grants `<resource>` in increments of this value. If you submit a task with resource request that is not a multiple of `<resource>.increment-allocation`, the request will be rounded up to the nearest increment. If this property is not specified for a resource, the increment round-up will not be applied. If no unit is specified, the default unit for the resource is assumed. |
| `yarn.scheduler.increment-allocation-mb` | The allocation increment for memory. No longer preferred. Use `yarn.resource-types.memory-mb.increment-allocation` instead. Defaults to 1024 MB. |
| `yarn.scheduler.increment-allocation-vcores` | The allocation increment for CPU vcores. No longer preferred. Use `yarn.resource-types.vcores.increment-allocation` instead. Defaults to 1. |
###Allocation file format