diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 26d75922ab3..b6cb581b68e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -20,14 +20,18 @@ package org.apache.hadoop.yarn.util.resource; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; +import org.apache.hadoop.yarn.api.records.impl.LightWeightResource; import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.UnitsConversionUtil; @@ -71,6 +75,8 @@ public class ResourceUtils { "^(((\\p{Alnum}([\\p{Alnum}-]*\\p{Alnum})?\\.)*" + "\\p{Alnum}([\\p{Alnum}-]*\\p{Alnum})?)/)?\\p{Alpha}([\\w.-]*)$"); + private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$"; + private static volatile boolean initializedResources = false; private static final Map RESOURCE_NAME_TO_INDEX = new ConcurrentHashMap(); @@ -776,4 +782,108 @@ public class ResourceUtils { return info; } + + /** + * Return a new {@link Resource} instance with all resource values + * initialized to {@code value}. + * @param value the value to use for all resources + * @return a new {@link Resource} instance + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static Resource createResourceWithSameValue(long value) { + LightWeightResource res = new LightWeightResource(value, + Long.valueOf(value).intValue()); + int numberOfResources = getNumberOfKnownResourceTypes(); + for (int i = 2; i < numberOfResources; i++) { + res.setResourceValue(i, value); + } + + return res; + } + + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static Resource createResourceFromString( + String resourceStr, + List resourceTypeInfos) { + Map typeToValue = parseResourcesString(resourceStr); + validateResourceTypes(typeToValue.keySet(), resourceTypeInfos); + Resource resource = Resource.newInstance(0, 0); + for (Entry entry : typeToValue.entrySet()) { + resource.setResourceValue(entry.getKey(), entry.getValue()); + } + return resource; + } + + private static Map parseResourcesString(String resourcesStr) { + Map resources = new HashMap<>(); + String[] pairs = resourcesStr.trim().split(","); + for (String resource : pairs) { + resource = resource.trim(); + if (!resource.matches(RES_PATTERN)) { + throw new IllegalArgumentException("\"" + resource + "\" is not a " + + "valid resource type/amount pair. " + + "Please provide key=amount pairs separated by commas."); + } + String[] splits = resource.split("="); + String key = splits[0], value = splits[1]; + String units = getUnits(value); + + String valueWithoutUnit = value.substring(0, + value.length()- units.length()).trim(); + long resourceValue = Long.parseLong(valueWithoutUnit); + + // Convert commandline unit to standard YARN unit. + if (units.equals("M") || units.equals("m")) { + units = "Mi"; + } else if (units.equals("G") || units.equals("g")) { + units = "Gi"; + } else if (units.isEmpty()) { + // do nothing; + } else { + throw new IllegalArgumentException("Acceptable units are M/G or empty"); + } + + // special handle memory-mb and memory + if (key.equals(ResourceInformation.MEMORY_URI)) { + if (!units.isEmpty()) { + resourceValue = UnitsConversionUtil.convert(units, "Mi", + resourceValue); + } + } + + if (key.equals("memory")) { + key = ResourceInformation.MEMORY_URI; + resourceValue = UnitsConversionUtil.convert(units, "Mi", + resourceValue); + } + + // special handle gpu + if (key.equals("gpu")) { + key = ResourceInformation.GPU_URI; + } + + // special handle fpga + if (key.equals("fpga")) { + key = ResourceInformation.FPGA_URI; + } + + resources.put(key, resourceValue); + } + return resources; + } + + private static void validateResourceTypes( + Iterable resourceNames, + List resourceTypeInfos) + throws ResourceNotFoundException { + for (String resourceName : resourceNames) { + if (!resourceTypeInfos.stream().anyMatch( + e -> e.getName().equals(resourceName))) { + throw new ResourceNotFoundException( + "Unknown resource: " + resourceName); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java index f3eee7c274e..c00bc2c6498 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java @@ -16,23 +16,15 @@ package org.apache.hadoop.yarn.submarine.client.cli; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceInformation; -import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; -import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; import org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException; import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager; -import org.apache.hadoop.yarn.util.UnitsConversionUtil; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.apache.hadoop.yarn.submarine.client.cli.CliConstants.KEYTAB; @@ -41,7 +33,6 @@ import static org.apache.hadoop.yarn.submarine.client.cli.CliConstants.PRINCIPAL public class CliUtils { private static final Logger LOG = LoggerFactory.getLogger(CliUtils.class); - private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$"; /** * Replace patterns inside cli * @@ -74,86 +65,6 @@ public class CliUtils { return newCli; } - private static Map parseResourcesString(String resourcesStr) { - Map resources = new HashMap<>(); - String[] pairs = resourcesStr.trim().split(","); - for (String resource : pairs) { - resource = resource.trim(); - if (!resource.matches(RES_PATTERN)) { - throw new IllegalArgumentException("\"" + resource + "\" is not a " - + "valid resource type/amount pair. " - + "Please provide key=amount pairs separated by commas."); - } - String[] splits = resource.split("="); - String key = splits[0], value = splits[1]; - String units = ResourceUtils.getUnits(value); - - String valueWithoutUnit = value.substring(0, - value.length()- units.length()).trim(); - long resourceValue = Long.parseLong(valueWithoutUnit); - - // Convert commandline unit to standard YARN unit. - if (units.equals("M") || units.equals("m")) { - units = "Mi"; - } else if (units.equals("G") || units.equals("g")) { - units = "Gi"; - } else if (units.isEmpty()) { - // do nothing; - } else { - throw new IllegalArgumentException("Acceptable units are M/G or empty"); - } - - // special handle memory-mb and memory - if (key.equals(ResourceInformation.MEMORY_URI)) { - if (!units.isEmpty()) { - resourceValue = UnitsConversionUtil.convert(units, "Mi", - resourceValue); - } - } - - if (key.equals("memory")) { - key = ResourceInformation.MEMORY_URI; - resourceValue = UnitsConversionUtil.convert(units, "Mi", - resourceValue); - } - - // special handle gpu - if (key.equals("gpu")) { - key = ResourceInformation.GPU_URI; - } - - // special handle fpga - if (key.equals("fpga")) { - key = ResourceInformation.FPGA_URI; - } - - resources.put(key, resourceValue); - } - return resources; - } - - private static void validateResourceTypes(Iterable resourceNames, - List resourceTypes) throws IOException, YarnException { - for (String resourceName : resourceNames) { - if (!resourceTypes.stream().anyMatch( - e -> e.getName().equals(resourceName))) { - throw new ResourceNotFoundException( - "Unknown resource: " + resourceName); - } - } - } - - public static Resource createResourceFromString(String resourceStr, - List resourceTypes) throws IOException, YarnException { - Map typeToValue = parseResourcesString(resourceStr); - validateResourceTypes(typeToValue.keySet(), resourceTypes); - Resource resource = Resource.newInstance(0, 0); - for (Map.Entry entry : typeToValue.entrySet()) { - resource.setResourceValue(entry.getKey(), entry.getValue()); - } - return resource; - } - // Is it for help? public static boolean argsForHelp(String[] args) { if (args == null || args.length == 0) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java index 111d4ebd61a..9a01dadf8a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.submarine.client.cli.CliConstants; import org.apache.hadoop.yarn.submarine.client.cli.CliUtils; import org.apache.hadoop.yarn.submarine.common.ClientContext; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import java.io.IOException; import java.util.ArrayList; @@ -104,7 +105,7 @@ public class RunJobParameters extends RunParameters { throw new ParseException( "--" + CliConstants.WORKER_RES + " is absent."); } - workerResource = CliUtils.createResourceFromString( + workerResource = ResourceUtils.createResourceFromString( workerResourceStr, clientContext.getOrCreateYarnClient().getResourceTypeInfo()); } @@ -115,7 +116,7 @@ public class RunJobParameters extends RunParameters { if (psResourceStr == null) { throw new ParseException("--" + CliConstants.PS_RES + " is absent."); } - psResource = CliUtils.createResourceFromString(psResourceStr, + psResource = ResourceUtils.createResourceFromString(psResourceStr, clientContext.getOrCreateYarnClient().getResourceTypeInfo()); } @@ -127,7 +128,7 @@ public class RunJobParameters extends RunParameters { if (tensorboardResourceStr == null || tensorboardResourceStr.isEmpty()) { tensorboardResourceStr = CliConstants.TENSORBOARD_DEFAULT_RESOURCES; } - tensorboardResource = CliUtils.createResourceFromString( + tensorboardResource = ResourceUtils.createResourceFromString( tensorboardResourceStr, clientContext.getOrCreateYarnClient().getResourceTypeInfo()); tensorboardDockerImage = parsedCommandLine.getOptionValue( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java index 184d53d7a01..2a8f1da6316 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java @@ -21,9 +21,6 @@ package org.apache.hadoop.yarn.submarine.client.cli; import org.apache.commons.cli.ParseException; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceInformation; -import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; import org.apache.hadoop.yarn.submarine.common.MockClientContext; @@ -32,15 +29,12 @@ import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory; import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor; import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter; import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -204,72 +198,4 @@ public class TestRunJobCliParsing { "python run-ps.py --input=hdfs://input --model_dir=hdfs://output/model", runJobCli.getRunJobParameters().getPSLaunchCmd()); } - - @Test - public void testResourceUnitParsing() throws Exception { - Resource res = CliUtils.createResourceFromString("memory=20g,vcores=3", - ResourceUtils.getResourcesTypeInfo()); - Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); - - res = CliUtils.createResourceFromString("memory=20G,vcores=3", - ResourceUtils.getResourcesTypeInfo()); - Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); - - res = CliUtils.createResourceFromString("memory=20M,vcores=3", - ResourceUtils.getResourcesTypeInfo()); - Assert.assertEquals(Resources.createResource(20, 3), res); - - res = CliUtils.createResourceFromString("memory=20m,vcores=3", - ResourceUtils.getResourcesTypeInfo()); - Assert.assertEquals(Resources.createResource(20, 3), res); - - res = CliUtils.createResourceFromString("memory-mb=20,vcores=3", - ResourceUtils.getResourcesTypeInfo()); - Assert.assertEquals(Resources.createResource(20, 3), res); - - res = CliUtils.createResourceFromString("memory-mb=20m,vcores=3", - ResourceUtils.getResourcesTypeInfo()); - Assert.assertEquals(Resources.createResource(20, 3), res); - - res = CliUtils.createResourceFromString("memory-mb=20G,vcores=3", - ResourceUtils.getResourcesTypeInfo()); - Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); - - // W/o unit for memory means bits, and 20 bits will be rounded to 0 - res = CliUtils.createResourceFromString("memory=20,vcores=3", - ResourceUtils.getResourcesTypeInfo()); - Assert.assertEquals(Resources.createResource(0, 3), res); - - // Test multiple resources - List resTypes = new ArrayList<>( - ResourceUtils.getResourcesTypeInfo()); - resTypes.add(ResourceTypeInfo.newInstance(ResourceInformation.GPU_URI, "")); - ResourceUtils.reinitializeResources(resTypes); - res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=0", - resTypes); - Assert.assertEquals(2 * 1024, res.getMemorySize()); - Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); - - res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=3", - resTypes); - Assert.assertEquals(2 * 1024, res.getMemorySize()); - Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI)); - - res = CliUtils.createResourceFromString("memory=2G,vcores=3", - resTypes); - Assert.assertEquals(2 * 1024, res.getMemorySize()); - Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); - - res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=0", - resTypes); - Assert.assertEquals(2 * 1024, res.getMemorySize()); - Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); - - res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=3", - resTypes); - Assert.assertEquals(2 * 1024, res.getMemorySize()); - Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI)); - - // TODO, add more negative tests. - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index bf1df8dbfa8..47641475455 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; -import org.apache.hadoop.yarn.api.records.impl.LightWeightResource; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; /** @@ -39,25 +38,6 @@ public class Resources { private static final Log LOG = LogFactory.getLog(Resources.class); - /** - * Return a new {@link Resource} instance with all resource values - * initialized to {@code value}. - * @param value the value to use for all resources - * @return a new {@link Resource} instance - */ - @Private - @Unstable - public static Resource createResourceWithSameValue(long value) { - LightWeightResource res = new LightWeightResource(value, - Long.valueOf(value).intValue()); - int numberOfResources = ResourceUtils.getNumberOfKnownResourceTypes(); - for (int i = 2; i < numberOfResources; i++) { - res.setResourceValue(i, value); - } - - return res; - } - /** * Helper class to create a resource with a fixed value for all resource * types. For example, a NONE resource which returns 0 for any resource type. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java index d6e0565776c..f7ec4f803f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.junit.After; @@ -31,7 +32,9 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -391,6 +394,74 @@ public class TestResourceUtils { } } + @Test + public void testResourceUnitParsing() throws Exception { + Resource res = ResourceUtils.createResourceFromString("memory=20g,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); + + res = ResourceUtils.createResourceFromString("memory=20G,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); + + res = ResourceUtils.createResourceFromString("memory=20M,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20, 3), res); + + res = ResourceUtils.createResourceFromString("memory=20m,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20, 3), res); + + res = ResourceUtils.createResourceFromString("memory-mb=20,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20, 3), res); + + res = ResourceUtils.createResourceFromString("memory-mb=20m,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20, 3), res); + + res = ResourceUtils.createResourceFromString("memory-mb=20G,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); + + // W/o unit for memory means bits, and 20 bits will be rounded to 0 + res = ResourceUtils.createResourceFromString("memory=20,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(0, 3), res); + + // Test multiple resources + List resTypes = new ArrayList<>( + ResourceUtils.getResourcesTypeInfo()); + resTypes.add(ResourceTypeInfo.newInstance(ResourceInformation.GPU_URI, "")); + ResourceUtils.reinitializeResources(resTypes); + res = ResourceUtils.createResourceFromString("memory=2G,vcores=3,gpu=0", + resTypes); + Assert.assertEquals(2 * 1024, res.getMemorySize()); + Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); + + res = ResourceUtils.createResourceFromString("memory=2G,vcores=3,gpu=3", + resTypes); + Assert.assertEquals(2 * 1024, res.getMemorySize()); + Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI)); + + res = ResourceUtils.createResourceFromString("memory=2G,vcores=3", + resTypes); + Assert.assertEquals(2 * 1024, res.getMemorySize()); + Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); + + res = ResourceUtils.createResourceFromString( + "memory=2G,vcores=3,yarn.io/gpu=0", resTypes); + Assert.assertEquals(2 * 1024, res.getMemorySize()); + Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); + + res = ResourceUtils.createResourceFromString( + "memory=2G,vcores=3,yarn.io/gpu=3", resTypes); + Assert.assertEquals(2 * 1024, res.getMemorySize()); + Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI)); + + // TODO, add more negative tests. + } + public static String setupResourceTypes(Configuration conf, String filename) throws Exception { File source = new File( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java index 07b24eb2618..ecd940ea98b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java @@ -269,7 +269,7 @@ public class TestResources { unsetExtraResourceType(); setupExtraResourceType(); - Resource res = Resources.createResourceWithSameValue(11L); + Resource res = ResourceUtils.createResourceWithSameValue(11L); assertEquals(11L, res.getMemorySize()); assertEquals(11, res.getVirtualCores()); assertEquals(11L, res.getResourceInformation(EXTRA_RESOURCE_TYPE).getValue()); @@ -280,7 +280,7 @@ public class TestResources { unsetExtraResourceType(); setupExtraResourceType(); - Resource res = Resources.createResourceWithSameValue(11); + Resource res = ResourceUtils.createResourceWithSameValue(11); assertEquals(11, res.getMemorySize()); assertEquals(11, res.getVirtualCores()); assertEquals(11, res.getResourceInformation(EXTRA_RESOURCE_TYPE).getValue()); @@ -288,14 +288,14 @@ public class TestResources { @Test public void testCreateSimpleResourceWithSameLongValue() { - Resource res = Resources.createResourceWithSameValue(11L); + Resource res = ResourceUtils.createResourceWithSameValue(11L); assertEquals(11L, res.getMemorySize()); assertEquals(11, res.getVirtualCores()); } @Test public void testCreateSimpleResourceWithSameIntValue() { - Resource res = Resources.createResourceWithSameValue(11); + Resource res = ResourceUtils.createResourceWithSameValue(11); assertEquals(11, res.getMemorySize()); assertEquals(11, res.getVirtualCores()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 2c9f9a37255..caa88cf9d6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueStatistics; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -69,10 +70,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.Sets; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.UNDEFINED; + public abstract class AbstractCSQueue implements CSQueue { private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); @@ -361,9 +365,9 @@ public abstract class AbstractCSQueue implements CSQueue { capacityConfigType = CapacityConfigType.NONE; updateConfigurableResourceRequirement(getQueuePath(), clusterResource); - this.maximumAllocation = - configuration.getMaximumAllocationPerQueue( - getQueuePath()); + // Setup queue's maximumAllocation respecting the global setting + // and queue setting + setupMaximumAllocation(configuration); // initialized the queue state based on previous state, configured state // and its parent state. @@ -425,6 +429,51 @@ public abstract class AbstractCSQueue implements CSQueue { } } + private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) { + String queue = getQueuePath(); + Resource clusterMax = ResourceUtils + .fetchMaximumAllocationFromConfig(csConf); + Resource queueMax = csConf.getQueueMaximumAllocation(queue); + + maximumAllocation = Resources.clone( + parent == null ? clusterMax : parent.getMaximumAllocation()); + + String errMsg = + "Queue maximum allocation cannot be larger than the cluster setting" + + " for queue " + queue + + " max allocation per queue: %s" + + " cluster setting: " + clusterMax; + + if (queueMax == Resources.none()) { + // Handle backward compatibility + long queueMemory = csConf.getQueueMaximumAllocationMb(queue); + int queueVcores = csConf.getQueueMaximumAllocationVcores(queue); + if (queueMemory != UNDEFINED) { + maximumAllocation.setMemorySize(queueMemory); + } + + if (queueVcores != UNDEFINED) { + maximumAllocation.setVirtualCores(queueVcores); + } + + if ((queueMemory != UNDEFINED && queueMemory > clusterMax.getMemorySize() + || (queueVcores != UNDEFINED + && queueVcores > clusterMax.getVirtualCores()))) { + throw new IllegalArgumentException( + String.format(errMsg, maximumAllocation)); + } + } else { + // Queue level maximum-allocation can't be larger than cluster setting + for (ResourceInformation ri : queueMax.getResources()) { + if (ri.compareTo(clusterMax.getResourceInformation(ri.getName())) > 0) { + throw new IllegalArgumentException(String.format(errMsg, queueMax)); + } + + maximumAllocation.setResourceInformation(ri.getName(), ri); + } + } + } + private Map getUserWeightsFromHierarchy (CapacitySchedulerConfiguration configuration) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index c0c280e9f1e..1af325023d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -86,6 +86,10 @@ public interface CSQueue extends SchedulerQueue { public PrivilegedEntity getPrivilegedEntity(); + Resource getMaximumAllocation(); + + Resource getMinimumAllocation(); + /** * Get the configured capacity of the queue. * @return configured queue capacity diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index dec6a84ed1a..ead1f575e1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -140,13 +141,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true; + @Private + public static final String MAXIMUM_ALLOCATION = "maximum-allocation"; + @Private public static final String MAXIMUM_ALLOCATION_MB = "maximum-allocation-mb"; @Private public static final String MAXIMUM_ALLOCATION_VCORES = - "maximum-allocation-vcores"; - + "maximum-allocation-vcores"; /** * Ordering policy of queues */ @@ -889,50 +892,32 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur } /** - * Get the per queue setting for the maximum limit to allocate to - * each container request. + * Get maximum_allocation setting for the specified queue from the + * configuration. * * @param queue * name of the queue - * @return setting specified per queue else falls back to the cluster setting + * @return Resource object or Resource.none if not set */ - public Resource getMaximumAllocationPerQueue(String queue) { - // Only support to specify memory and vcores maximum allocation per queue - // for now. + public Resource getQueueMaximumAllocation(String queue) { String queuePrefix = getQueuePrefix(queue); - long maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, - (int)UNDEFINED); - int maxAllocationVcoresPerQueue = getInt( - queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED); - if (LOG.isDebugEnabled()) { - LOG.debug("max alloc mb per queue for " + queue + " is " - + maxAllocationMbPerQueue); - LOG.debug("max alloc vcores per queue for " + queue + " is " - + maxAllocationVcoresPerQueue); + String rawQueueMaxAllocation = get(queuePrefix + MAXIMUM_ALLOCATION, null); + if (Strings.isNullOrEmpty(rawQueueMaxAllocation)) { + return Resources.none(); + } else { + return ResourceUtils.createResourceFromString(rawQueueMaxAllocation, + ResourceUtils.getResourcesTypeInfo()); } - Resource clusterMax = ResourceUtils.fetchMaximumAllocationFromConfig(this); - if (maxAllocationMbPerQueue == (int)UNDEFINED) { - LOG.info("max alloc mb per queue for " + queue + " is undefined"); - maxAllocationMbPerQueue = clusterMax.getMemorySize(); - } - if (maxAllocationVcoresPerQueue == (int)UNDEFINED) { - LOG.info("max alloc vcore per queue for " + queue + " is undefined"); - maxAllocationVcoresPerQueue = clusterMax.getVirtualCores(); - } - // Copy from clusterMax and overwrite per-queue's maximum memory/vcore - // allocation. - Resource result = Resources.clone(clusterMax); - result.setMemorySize(maxAllocationMbPerQueue); - result.setVirtualCores(maxAllocationVcoresPerQueue); - if (maxAllocationMbPerQueue > clusterMax.getMemorySize() - || maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) { - throw new IllegalArgumentException( - "Queue maximum allocation cannot be larger than the cluster setting" - + " for queue " + queue - + " max allocation per queue: " + result - + " cluster setting: " + clusterMax); - } - return result; + } + + public long getQueueMaximumAllocationMb(String queue) { + String queuePrefix = getQueuePrefix(queue); + return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED); + } + + public int getQueueMaximumAllocationVcores(String queue) { + String queuePrefix = getQueuePrefix(queue); + return getInt(queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED); } public boolean getEnableUserMetrics() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java index 62bad4414b2..44049fdf595 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java @@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.util.resource.ResourceUtils; -import org.apache.hadoop.yarn.util.resource.Resources; /** * A {@code ConfigurableResource} object represents an entity that is used to @@ -53,7 +52,7 @@ public class ConfigurableResource { * @param value the value to use for all resources */ ConfigurableResource(long value) { - this(Resources.createResourceWithSameValue(value)); + this(ResourceUtils.createResourceWithSameValue(value)); } public ConfigurableResource(Resource resource) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 56882a70f96..aac7f15a5a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -2996,8 +2998,8 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, cs.getMaximumResourceCapability().getMemorySize()); assertEquals("max allocation for A1", - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - conf.getMaximumAllocationPerQueue(A1).getMemorySize()); + Resources.none(), + conf.getQueueMaximumAllocation(A1)); assertEquals("max allocation", YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize()); @@ -3087,6 +3089,10 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { cs.reinitialize(conf, mockContext); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueA = findQueue(rootQueue, A); + CSQueue queueA1 = findQueue(queueA, A1); + assertEquals("max capability MB in CS", YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, cs.getMaximumResourceCapability().getMemorySize()); @@ -3095,10 +3101,10 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { cs.getMaximumResourceCapability().getVirtualCores()); assertEquals("max allocation MB A1", 4096, - conf.getMaximumAllocationPerQueue(A1).getMemorySize()); + queueA1.getMaximumAllocation().getMemorySize()); assertEquals("max allocation vcores A1", 2, - conf.getMaximumAllocationPerQueue(A1).getVirtualCores()); + queueA1.getMaximumAllocation().getVirtualCores()); assertEquals("cluster max allocation MB", YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize()); @@ -3106,11 +3112,8 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores()); - CSQueue rootQueue = cs.getRootQueue(); - CSQueue queueA = findQueue(rootQueue, A); - CSQueue queueA1 = findQueue(queueA, A1); - assertEquals("queue max allocation", 4096, ((LeafQueue) queueA1) - .getMaximumAllocation().getMemorySize()); + assertEquals("queue max allocation", 4096, + queueA1.getMaximumAllocation().getMemorySize()); setMaxAllocMb(conf, A1, 6144); setMaxAllocVcores(conf, A1, 3); @@ -3118,9 +3121,9 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { // conf will have changed but we shouldn't be able to change max allocation // for the actual queue assertEquals("max allocation MB A1", 6144, - conf.getMaximumAllocationPerQueue(A1).getMemorySize()); + queueA1.getMaximumAllocation().getMemorySize()); assertEquals("max allocation vcores A1", 3, - conf.getMaximumAllocationPerQueue(A1).getVirtualCores()); + queueA1.getMaximumAllocation().getVirtualCores()); assertEquals("max allocation MB cluster", YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize()); @@ -3128,9 +3131,9 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores()); assertEquals("queue max allocation MB", 6144, - ((LeafQueue) queueA1).getMaximumAllocation().getMemorySize()); + queueA1.getMaximumAllocation().getMemorySize()); assertEquals("queue max allocation vcores", 3, - ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores()); + queueA1.getMaximumAllocation().getVirtualCores()); assertEquals("max capability MB cluster", YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, cs.getMaximumResourceCapability().getMemorySize()); @@ -3216,17 +3219,17 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { CSQueue queueB2 = findQueue(queueB, B2); assertEquals("queue A1 max allocation MB", 4096, - ((LeafQueue) queueA1).getMaximumAllocation().getMemorySize()); + queueA1.getMaximumAllocation().getMemorySize()); assertEquals("queue A1 max allocation vcores", 4, - ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores()); + queueA1.getMaximumAllocation().getVirtualCores()); assertEquals("queue A2 max allocation MB", 10240, - ((LeafQueue) queueA2).getMaximumAllocation().getMemorySize()); + queueA2.getMaximumAllocation().getMemorySize()); assertEquals("queue A2 max allocation vcores", 10, - ((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores()); + queueA2.getMaximumAllocation().getVirtualCores()); assertEquals("queue B2 max allocation MB", 10240, - ((LeafQueue) queueB2).getMaximumAllocation().getMemorySize()); + queueB2.getMaximumAllocation().getMemorySize()); assertEquals("queue B2 max allocation vcores", 10, - ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores()); + queueB2.getMaximumAllocation().getVirtualCores()); setMaxAllocMb(conf, 12288); setMaxAllocVcores(conf, 12); @@ -3238,17 +3241,187 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { assertEquals("max allocation vcores in CS", 12, cs.getMaximumResourceCapability().getVirtualCores()); assertEquals("queue A1 max MB allocation", 4096, - ((LeafQueue) queueA1).getMaximumAllocation().getMemorySize()); + queueA1.getMaximumAllocation().getMemorySize()); assertEquals("queue A1 max vcores allocation", 4, - ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores()); + queueA1.getMaximumAllocation().getVirtualCores()); assertEquals("queue A2 max MB allocation", 12288, - ((LeafQueue) queueA2).getMaximumAllocation().getMemorySize()); + queueA2.getMaximumAllocation().getMemorySize()); assertEquals("queue A2 max vcores allocation", 12, - ((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores()); + queueA2.getMaximumAllocation().getVirtualCores()); assertEquals("queue B2 max MB allocation", 12288, - ((LeafQueue) queueB2).getMaximumAllocation().getMemorySize()); + queueB2.getMaximumAllocation().getMemorySize()); assertEquals("queue B2 max vcores allocation", 12, - ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores()); + queueB2.getMaximumAllocation().getVirtualCores()); + } + + @Test + public void testQueuesMaxAllocationInheritance() throws Exception { + // queue level max allocation is set by the queue configuration explicitly + // or inherits from the parent. + + CapacityScheduler cs = new CapacityScheduler(); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + setMaxAllocMb(conf, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + setMaxAllocVcores(conf, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + + // Test the child queue overrides + setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT, + "memory-mb=4096,vcores=2"); + setMaxAllocation(conf, A1, "memory-mb=6144,vcores=2"); + setMaxAllocation(conf, B, "memory-mb=5120, vcores=2"); + setMaxAllocation(conf, B2, "memory-mb=1024, vcores=2"); + + cs.init(conf); + cs.start(); + cs.reinitialize(conf, mockContext); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueA = findQueue(rootQueue, A); + CSQueue queueB = findQueue(rootQueue, B); + CSQueue queueA1 = findQueue(queueA, A1); + CSQueue queueA2 = findQueue(queueA, A2); + CSQueue queueB1 = findQueue(queueB, B1); + CSQueue queueB2 = findQueue(queueB, B2); + + assertEquals("max capability MB in CS", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + cs.getMaximumResourceCapability().getMemorySize()); + assertEquals("max capability vcores in CS", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + cs.getMaximumResourceCapability().getVirtualCores()); + assertEquals("max allocation MB A1", + 6144, + queueA1.getMaximumAllocation().getMemorySize()); + assertEquals("max allocation vcores A1", + 2, + queueA1.getMaximumAllocation().getVirtualCores()); + assertEquals("max allocation MB A2", 4096, + queueA2.getMaximumAllocation().getMemorySize()); + assertEquals("max allocation vcores A2", + 2, + queueA2.getMaximumAllocation().getVirtualCores()); + assertEquals("max allocation MB B", 5120, + queueB.getMaximumAllocation().getMemorySize()); + assertEquals("max allocation MB B1", 5120, + queueB1.getMaximumAllocation().getMemorySize()); + assertEquals("max allocation MB B2", 1024, + queueB2.getMaximumAllocation().getMemorySize()); + + // Test get the max-allocation from different parent + unsetMaxAllocation(conf, A1); + unsetMaxAllocation(conf, B); + unsetMaxAllocation(conf, B1); + setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT, + "memory-mb=6144,vcores=2"); + setMaxAllocation(conf, A, "memory-mb=8192,vcores=2"); + + cs.reinitialize(conf, mockContext); + + assertEquals("max capability MB in CS", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + cs.getMaximumResourceCapability().getMemorySize()); + assertEquals("max capability vcores in CS", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + cs.getMaximumResourceCapability().getVirtualCores()); + assertEquals("max allocation MB A1", + 8192, + queueA1.getMaximumAllocation().getMemorySize()); + assertEquals("max allocation vcores A1", + 2, + queueA1.getMaximumAllocation().getVirtualCores()); + assertEquals("max allocation MB B1", + 6144, + queueB1.getMaximumAllocation().getMemorySize()); + assertEquals("max allocation vcores B1", + 2, + queueB1.getMaximumAllocation().getVirtualCores()); + + // Test the default + unsetMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT); + unsetMaxAllocation(conf, A); + unsetMaxAllocation(conf, A1); + cs.reinitialize(conf, mockContext); + + assertEquals("max capability MB in CS", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + cs.getMaximumResourceCapability().getMemorySize()); + assertEquals("max capability vcores in CS", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + cs.getMaximumResourceCapability().getVirtualCores()); + assertEquals("max allocation MB A1", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + queueA1.getMaximumAllocation().getMemorySize()); + assertEquals("max allocation vcores A1", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + queueA1.getMaximumAllocation().getVirtualCores()); + assertEquals("max allocation MB A2", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + queueA2.getMaximumAllocation().getMemorySize()); + assertEquals("max allocation vcores A2", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + queueA2.getMaximumAllocation().getVirtualCores()); + } + + @Test + public void testVerifyQueuesMaxAllocationConf() throws Exception { + // queue level max allocation can't exceed the cluster setting + + CapacityScheduler cs = new CapacityScheduler(); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + setMaxAllocMb(conf, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + setMaxAllocVcores(conf, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + + long largerMem = + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1024; + long largerVcores = + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES+10; + + cs.init(conf); + cs.start(); + cs.reinitialize(conf, mockContext); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT, + "memory-mb=" + largerMem + ",vcores=2"); + try { + cs.reinitialize(conf, mockContext); + fail("Queue Root maximum allocation can't exceed the cluster setting"); + } catch(Exception e) { + assertTrue("maximum allocation exception", + e.getCause().getMessage().contains("maximum allocation")); + } + + setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT, + "memory-mb=4096,vcores=2"); + setMaxAllocation(conf, A, "memory-mb=6144,vcores=2"); + setMaxAllocation(conf, A1, "memory-mb=" + largerMem + ",vcores=2"); + try { + cs.reinitialize(conf, mockContext); + fail("Queue A1 maximum allocation can't exceed the cluster setting"); + } catch(Exception e) { + assertTrue("maximum allocation exception", + e.getCause().getMessage().contains("maximum allocation")); + } + setMaxAllocation(conf, A1, "memory-mb=8192" + ",vcores=" + largerVcores); + try { + cs.reinitialize(conf, mockContext); + fail("Queue A1 maximum allocation can't exceed the cluster setting"); + } catch(Exception e) { + assertTrue("maximum allocation exception", + e.getCause().getMessage().contains("maximum allocation")); + } + } private void waitContainerAllocated(MockAM am, int mem, int nContainer, @@ -4103,6 +4276,20 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { conf.setInt(propName, maxAllocVcores); } + private void setMaxAllocation(CapacitySchedulerConfiguration conf, + String queueName, String maxAllocation) { + String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName) + + MAXIMUM_ALLOCATION; + conf.set(propName, maxAllocation); + } + + private void unsetMaxAllocation(CapacitySchedulerConfiguration conf, + String queueName) { + String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName) + + MAXIMUM_ALLOCATION; + conf.unset(propName); + } + private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); RMContainer rmContainer = cs.getRMContainer(containerId);