From d6673c5491fcb0059de15190d1d16e7b3090f3b6 Mon Sep 17 00:00:00 2001 From: Jonathan Hung Date: Wed, 9 Jan 2019 16:01:06 -0500 Subject: [PATCH] YARN-9188. Port YARN-7136 to branch-2 --- .../dev-support/findbugs-exclude.xml | 2 +- .../hadoop/yarn/api/records/Resource.java | 176 ++++++------ .../yarn/api/records/ResourceInformation.java | 15 +- ...Resource.java => LightWeightResource.java} | 106 ++++--- .../yarn/util/resource/ResourceUtils.java | 23 +- .../api/records/impl/pb/ResourcePBImpl.java | 19 +- .../resource/DominantResourceCalculator.java | 75 +++-- .../hadoop/yarn/util/resource/Resources.java | 30 +- .../yarn/util/resource/TestResourceUtils.java | 2 + .../rmapp/attempt/RMAppAttemptImpl.java | 2 - .../yarn/server/resourcemanager/MockRM.java | 6 +- .../capacity/TestCapacityScheduler.java | 137 --------- .../capacity/TestCapacitySchedulerPerf.java | 265 ++++++++++++++++++ .../hadoop/yarn/server/MiniYARNCluster.java | 7 +- 14 files changed, 524 insertions(+), 341 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/{BaseResource.java => LightWeightResource.java} (55%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index e086fbeca21..45aa868830c 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -629,7 +629,7 @@ - + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index f3a5bc25db9..37b50f28ba7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.records.impl.BaseResource; +import org.apache.hadoop.yarn.api.records.impl.LightWeightResource; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -59,8 +59,15 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils; @Stable public abstract class Resource implements Comparable { - protected static final String MEMORY = ResourceInformation.MEMORY_MB.getName(); - protected static final String VCORES = ResourceInformation.VCORES.getName(); + protected ResourceInformation[] resources = null; + + // Number of mandatory resources, this is added to avoid invoke + // MandatoryResources.values().length, since values() internally will + // copy array, etc. + protected static final int NUM_MANDATORY_RESOURCES = 2; + + protected static final int MEMORY_INDEX = 0; + protected static final int VCORES_INDEX = 1; @Public @Stable @@ -71,7 +78,7 @@ public abstract class Resource implements Comparable { ret.setVirtualCores(vCores); return ret; } - return new BaseResource(memory, vCores); + return new LightWeightResource(memory, vCores); } @Public @@ -83,7 +90,7 @@ public abstract class Resource implements Comparable { ret.setVirtualCores(vCores); return ret; } - return new BaseResource(memory, vCores); + return new LightWeightResource(memory, vCores); } @InterfaceAudience.Private @@ -201,7 +208,9 @@ public abstract class Resource implements Comparable { */ @Public @Evolving - public abstract ResourceInformation[] getResources(); + public ResourceInformation[] getResources() { + return resources; + } /** * Get ResourceInformation for a specified resource. @@ -215,7 +224,6 @@ public abstract class Resource implements Comparable { public ResourceInformation getResourceInformation(String resource) throws ResourceNotFoundException { Integer index = ResourceUtils.getResourceTypeIndex().get(resource); - ResourceInformation[] resources = getResources(); if (index != null) { return resources[index]; } @@ -236,12 +244,13 @@ public abstract class Resource implements Comparable { @Evolving public ResourceInformation getResourceInformation(int index) throws ResourceNotFoundException { - ResourceInformation[] resources = getResources(); - if (index < 0 || index >= resources.length) { - throw new ResourceNotFoundException("Unknown resource at index '" + index - + "'. Vaid resources are: " + Arrays.toString(resources)); + ResourceInformation ri = null; + try { + ri = resources[index]; + } catch (ArrayIndexOutOfBoundsException e) { + throwExceptionWhenArrayOutOfBound(index); } - return resources[index]; + return ri; } /** @@ -271,11 +280,11 @@ public abstract class Resource implements Comparable { public void setResourceInformation(String resource, ResourceInformation resourceInformation) throws ResourceNotFoundException { - if (resource.equals(MEMORY)) { + if (resource.equals(ResourceInformation.MEMORY_URI)) { this.setMemorySize(resourceInformation.getValue()); return; } - if (resource.equals(VCORES)) { + if (resource.equals(ResourceInformation.VCORES_URI)) { this.setVirtualCores((int) resourceInformation.getValue()); return; } @@ -298,7 +307,6 @@ public abstract class Resource implements Comparable { public void setResourceInformation(int index, ResourceInformation resourceInformation) throws ResourceNotFoundException { - ResourceInformation[] resources = getResources(); if (index < 0 || index >= resources.length) { throw new ResourceNotFoundException("Unknown resource at index '" + index + "'. Valid resources are " + Arrays.toString(resources)); @@ -318,11 +326,11 @@ public abstract class Resource implements Comparable { @Evolving public void setResourceValue(String resource, long value) throws ResourceNotFoundException { - if (resource.equals(MEMORY)) { + if (resource.equals(ResourceInformation.MEMORY_URI)) { this.setMemorySize(value); return; } - if (resource.equals(VCORES)) { + if (resource.equals(ResourceInformation.VCORES_URI)) { this.setVirtualCores((int)value); return; } @@ -346,27 +354,21 @@ public abstract class Resource implements Comparable { @Evolving public void setResourceValue(int index, long value) throws ResourceNotFoundException { - ResourceInformation[] resources = getResources(); - if (index < 0 || index >= resources.length) { - throw new ResourceNotFoundException("Unknown resource at index '" + index - + "'. Valid resources are " + Arrays.toString(resources)); + try { + resources[index].setValue(value); + } catch (ArrayIndexOutOfBoundsException e) { + throwExceptionWhenArrayOutOfBound(index); } - resources[index].setValue(value); } - @Override - public int hashCode() { - final int prime = 263167; + private void throwExceptionWhenArrayOutOfBound(int index) { + String exceptionMsg = String.format( + "Trying to access ResourceInformation for given index=%d. " + + "Acceptable index range is [0,%d), please check double check " + + "configured resources in resource-types.xml", + index, ResourceUtils.getNumberOfKnownResourceTypes()); - int result = (int) (939769357 - + getMemorySize()); // prime * result = 939769357 initially - result = prime * result + getVirtualCores(); - for (ResourceInformation entry : getResources()) { - if (!entry.getName().equals(MEMORY) && !entry.getName().equals(VCORES)) { - result = prime * result + entry.hashCode(); - } - } - return result; + throw new ResourceNotFoundException(exceptionMsg); } @Override @@ -381,20 +383,15 @@ public abstract class Resource implements Comparable { return false; } Resource other = (Resource) obj; - if (getMemorySize() != other.getMemorySize() - || getVirtualCores() != other.getVirtualCores()) { - return false; - } - ResourceInformation[] myVectors = getResources(); ResourceInformation[] otherVectors = other.getResources(); - if (myVectors.length != otherVectors.length) { + if (resources.length != otherVectors.length) { return false; } - for (int i = 0; i < myVectors.length; i++) { - ResourceInformation a = myVectors[i]; + for (int i = 0; i < resources.length; i++) { + ResourceInformation a = resources[i]; ResourceInformation b = otherVectors[i]; if ((a != b) && ((a == null) || !a.equals(b))) { return false; @@ -403,64 +400,71 @@ public abstract class Resource implements Comparable { return true; } + @Override + public int compareTo(Resource other) { + ResourceInformation[] otherResources = other.getResources(); + + int arrLenThis = this.resources.length; + int arrLenOther = otherResources.length; + + // compare memory and vcores first(in that order) to preserve + // existing behaviour + for (int i = 0; i < arrLenThis; i++) { + ResourceInformation otherEntry; + try { + otherEntry = otherResources[i]; + } catch (ArrayIndexOutOfBoundsException e) { + // For two vectors with different size and same prefix. Shorter vector + // goes first. + return 1; + } + ResourceInformation entry = resources[i]; + + long diff = entry.compareTo(otherEntry); + if (diff > 0) { + return 1; + } else if (diff < 0) { + return -1; + } + } + + if (arrLenThis < arrLenOther) { + return -1; + } + + return 0; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); + sb.append(""); return sb.toString(); } @Override - public int compareTo(Resource other) { - ResourceInformation[] thisResources = this.getResources(); - ResourceInformation[] otherResources = other.getResources(); - - // compare memory and vcores first(in that order) to preserve - // existing behaviour - long diff = this.getMemorySize() - other.getMemorySize(); - if (diff == 0) { - diff = this.getVirtualCores() - other.getVirtualCores(); + public int hashCode() { + final int prime = 47; + long result = 0; + for (ResourceInformation entry : resources) { + result = prime * result + entry.hashCode(); } - if (diff == 0) { - diff = thisResources.length - otherResources.length; - if (diff == 0) { - int maxLength = ResourceUtils.getResourceTypesArray().length; - for (int i = 0; i < maxLength; i++) { - // For memory and vcores, we can skip the loop as it's already - // compared. - if (i < 2) { - continue; - } - - ResourceInformation entry = thisResources[i]; - ResourceInformation otherEntry = otherResources[i]; - if (entry.getName().equals(otherEntry.getName())) { - diff = entry.compareTo(otherEntry); - if (diff != 0) { - break; - } - } - } - } - } - return Long.compare(diff, 0); + return (int) result; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java index 4717d820d4f..0cc1e9c3897 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.records; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.util.UnitsConversionUtil; @@ -34,8 +35,8 @@ public class ResourceInformation implements Comparable { private long minimumAllocation; private long maximumAllocation; - private static final String MEMORY_URI = "memory-mb"; - private static final String VCORES_URI = "vcores"; + public static final String MEMORY_URI = "memory-mb"; + public static final String VCORES_URI = "vcores"; public static final ResourceInformation MEMORY_MB = ResourceInformation.newInstance(MEMORY_URI, "Mi"); @@ -83,6 +84,16 @@ public class ResourceInformation implements Comparable { this.units = rUnits; } + /** + * Checking if a unit included by KNOWN_UNITS is an expensive operation. This + * can be avoided in critical path in RM. + * @param rUnits units for the resource + */ + @InterfaceAudience.Private + public void setUnitsWithoutValidation(String rUnits) { + this.units = rUnits; + } + /** * Get the resource type. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/BaseResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java similarity index 55% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/BaseResource.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java index b5cc4d63e4d..b80e13388cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/BaseResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java @@ -18,19 +18,24 @@ package org.apache.hadoop.yarn.api.records.impl; -import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; -import java.util.Arrays; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_MB; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; /** *

- * BaseResource extends Resource to handle base resources such + * LightResource extends Resource to handle base resources such * as memory and CPU. * TODO: We have a long term plan to use AbstractResource when additional * resource types are to be handled as well. + * This will be used to speed up internal calculation to avoid creating + * costly PB-backed Resource object: ResourcePBImpl *

* *

@@ -54,48 +59,34 @@ import java.util.Arrays; * * @see Resource */ -@Public +@InterfaceAudience.Private @Unstable -public class BaseResource extends Resource { +public class LightWeightResource extends Resource { private ResourceInformation memoryResInfo; private ResourceInformation vcoresResInfo; - protected ResourceInformation[] resources = null; - protected ResourceInformation[] readOnlyResources = null; - // Number of mandatory resources, this is added to avoid invoke - // MandatoryResources.values().length, since values() internally will - // copy array, etc. - private static final int NUM_MANDATORY_RESOURCES = 2; - - protected enum MandatoryResources { - MEMORY(0), VCORES(1); - - private final int id; - - MandatoryResources(int id) { - this.id = id; - } - - public int getId() { - return this.id; - } - } - - public BaseResource() { - // Base constructor. - } - - public BaseResource(long memory, long vcores) { - this.memoryResInfo = ResourceInformation.newInstance(MEMORY, - ResourceInformation.MEMORY_MB.getUnits(), memory); - this.vcoresResInfo = ResourceInformation.newInstance(VCORES, "", vcores); + public LightWeightResource(long memory, long vcores) { + this.memoryResInfo = LightWeightResource.newDefaultInformation(MEMORY_URI, + MEMORY_MB.getUnits(), memory); + this.vcoresResInfo = LightWeightResource.newDefaultInformation(VCORES_URI, + "", vcores); resources = new ResourceInformation[NUM_MANDATORY_RESOURCES]; - readOnlyResources = new ResourceInformation[NUM_MANDATORY_RESOURCES]; - resources[MandatoryResources.MEMORY.id] = memoryResInfo; - resources[MandatoryResources.VCORES.id] = vcoresResInfo; - readOnlyResources = Arrays.copyOf(resources, resources.length); + resources[MEMORY_INDEX] = memoryResInfo; + resources[VCORES_INDEX] = vcoresResInfo; + } + + private static ResourceInformation newDefaultInformation(String name, + String unit, long value) { + ResourceInformation ri = new ResourceInformation(); + ri.setName(name); + ri.setValue(value); + ri.setResourceType(ResourceTypes.COUNTABLE); + ri.setUnitsWithoutValidation(unit); + ri.setMinimumAllocation(0); + ri.setMaximumAllocation(Long.MAX_VALUE); + return ri; } @Override @@ -131,7 +122,42 @@ public class BaseResource extends Resource { } @Override - public ResourceInformation[] getResources() { - return readOnlyResources; + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || !(obj instanceof Resource)) { + return false; + } + Resource other = (Resource) obj; + if (getMemorySize() != other.getMemorySize() + || getVirtualCores() != other.getVirtualCores()) { + return false; + } + + return true; + } + + @Override + public int compareTo(Resource other) { + // compare memory and vcores first(in that order) to preserve + // existing behaviour + long diff = this.getMemorySize() - other.getMemorySize(); + if (diff == 0) { + return this.getVirtualCores() - other.getVirtualCores(); + } else if (diff > 0){ + return 1; + } else { + return -1; + } + } + + @Override + public int hashCode() { + final int prime = 47; + long result = prime + getMemorySize(); + result = prime * result + getVirtualCores(); + + return (int) result; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index e3e25d18b6a..110453abde7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -244,13 +244,28 @@ public class ResourceUtils { minimumAllocation, maximumAllocation)); } } + checkMandatoryResources(resourceInformationMap); addMandatoryResources(resourceInformationMap); + setMinimumAllocationForMandatoryResources(resourceInformationMap, conf); setMaximumAllocationForMandatoryResources(resourceInformationMap, conf); + + initializeResourcesFromResourceInformationMap(resourceInformationMap); + } + + /** + * This method is visible for testing, unit test can construct a + * resourceInformationMap and pass it to this method to initialize multiple resources. + * @param resourceInformationMap constructed resource information map. + */ + @VisibleForTesting + public static void initializeResourcesFromResourceInformationMap( + Map resourceInformationMap) { resourceTypes = Collections.unmodifiableMap(resourceInformationMap); updateKnownResources(); updateResourceTypeIndex(); + initializedResources = true; } private static void updateKnownResources() { @@ -347,14 +362,12 @@ public class ResourceUtils { try { addResourcesFileToConf(resourceFile, conf); LOG.debug("Found " + resourceFile + ", adding to configuration"); - initializeResourcesMap(conf); - initializedResources = true; } catch (FileNotFoundException fe) { LOG.info("Unable to find '" + resourceFile + "'. Falling back to memory and vcores as resources."); - initializeResourcesMap(conf); - initializedResources = true; } + initializeResourcesMap(conf); + } } } @@ -558,7 +571,7 @@ public class ResourceUtils { */ public static String getDefaultUnit(String resourceType) { ResourceInformation ri = getResourceTypes().get(resourceType); - if (null != ri) { + if (ri != null) { return ri.getUnits(); } return ""; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java index 9f34beccfa3..06c30ffb708 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java @@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; -import org.apache.hadoop.yarn.api.records.impl.BaseResource; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; @@ -34,13 +33,12 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceInformationProto; import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceUtils; -import java.util.Arrays; import java.util.Map; @Private @Unstable -public class ResourcePBImpl extends BaseResource { +public class ResourcePBImpl extends Resource { private static final Log LOG = LogFactory.getLog(ResourcePBImpl.class); @@ -95,7 +93,7 @@ public class ResourcePBImpl extends BaseResource { @Override public long getMemorySize() { // memory should always be present - ResourceInformation ri = resources[MandatoryResources.MEMORY.getId()]; + ResourceInformation ri = resources[MEMORY_INDEX]; if (ri.getUnits().isEmpty()) { return ri.getValue(); @@ -113,19 +111,19 @@ public class ResourcePBImpl extends BaseResource { @Override public void setMemorySize(long memory) { maybeInitBuilder(); - getResourceInformation(MEMORY).setValue(memory); + getResourceInformation(ResourceInformation.MEMORY_URI).setValue(memory); } @Override public int getVirtualCores() { // vcores should always be present - return (int) resources[MandatoryResources.VCORES.getId()].getValue(); + return (int) resources[VCORES_INDEX].getValue(); } @Override public void setVirtualCores(int vCores) { maybeInitBuilder(); - getResourceInformation(VCORES).setValue(vCores); + getResourceInformation(ResourceInformation.VCORES_URI).setValue(vCores); } private void initResources() { @@ -156,7 +154,6 @@ public class ResourcePBImpl extends BaseResource { resources[index].setValue(value); } } - readOnlyResources = Arrays.copyOf(resources, resources.length); this.setMemorySize(p.getMemory()); this.setVirtualCores(p.getVirtualCores()); } @@ -186,11 +183,6 @@ public class ResourcePBImpl extends BaseResource { getResourceInformation(resource).setValue(value); } - @Override - public ResourceInformation[] getResources() { - return super.getResources(); - } - @Override public ResourceInformation getResourceInformation(String resource) throws ResourceNotFoundException { @@ -212,7 +204,6 @@ public class ResourcePBImpl extends BaseResource { } resources = new ResourceInformation[types.length]; - readOnlyResources = new ResourceInformation[types.length]; for (ResourceInformation entry : types) { int index = ResourceUtils.getResourceTypeIndex().get(entry.getName()); resources[index] = ResourceInformation.newInstance(entry); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index ffd4fec0ee0..d64f03ec33b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -73,7 +73,7 @@ public class DominantResourceCalculator extends ResourceCalculator { boolean rhsGreater = false; int ret = 0; - int maxLength = ResourceUtils.getResourceTypesArray().length; + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation lhsResourceInformation = lhs .getResourceInformation(i); @@ -111,10 +111,12 @@ public class DominantResourceCalculator extends ResourceCalculator { // resources and then look for which resource has the biggest // share overall. ResourceInformation[] clusterRes = clusterResource.getResources(); + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + // If array creation shows up as a time sink, these arrays could be cached // because they're always the same length. - double[] lhsShares = new double[clusterRes.length]; - double[] rhsShares = new double[clusterRes.length]; + double[] lhsShares = new double[maxLength]; + double[] rhsShares = new double[maxLength]; double diff; try { @@ -124,10 +126,10 @@ public class DominantResourceCalculator extends ResourceCalculator { calculateShares(clusterRes, lhs, rhs, lhsShares, rhsShares, max); diff = max[0] - max[1]; - } else if (clusterRes.length == 2) { + } else if (maxLength == 2) { // Special case to handle the common scenario of only CPU and memory // so that we can optimize for performance - diff = calculateSharesForMandatoryResources(clusterRes, lhs, rhs, + diff = calculateSharesForTwoMandatoryResources(clusterRes, lhs, rhs, lhsShares, rhsShares); } else { calculateShares(clusterRes, lhs, rhs, lhsShares, rhsShares); @@ -182,7 +184,8 @@ public class DominantResourceCalculator extends ResourceCalculator { ResourceInformation[] firstRes = first.getResources(); ResourceInformation[] secondRes = second.getResources(); - for (int i = 0; i < clusterRes.length; i++) { + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { firstShares[i] = calculateShare(clusterRes[i], firstRes[i]); secondShares[i] = calculateShare(clusterRes[i], secondRes[i]); } @@ -205,35 +208,27 @@ public class DominantResourceCalculator extends ResourceCalculator { * second resource, respectively * @throws NullPointerException if any parameter is null */ - private int calculateSharesForMandatoryResources( + private int calculateSharesForTwoMandatoryResources( ResourceInformation[] clusterRes, Resource first, Resource second, double[] firstShares, double[] secondShares) { ResourceInformation[] firstRes = first.getResources(); ResourceInformation[] secondRes = second.getResources(); + firstShares[0] = calculateShare(clusterRes[0], firstRes[0]); + secondShares[0] = calculateShare(clusterRes[0], secondRes[0]); + firstShares[1] = calculateShare(clusterRes[1], firstRes[1]); + secondShares[1] = calculateShare(clusterRes[1], secondRes[1]); + int firstDom = 0; + int firstSub = 1; + if (firstShares[1] > firstShares[0]) { + firstDom = 1; + firstSub = 0; + } int secondDom = 0; - int firstSub = 0; - int secondSub = 0; - - for (int i = 0; i < clusterRes.length; i++) { - firstShares[i] = calculateShare(clusterRes[i], firstRes[i]); - secondShares[i] = calculateShare(clusterRes[i], secondRes[i]); - - if (firstShares[i] > firstShares[firstDom]) { - firstDom = i; - } - - if (firstShares[i] < firstShares[firstSub]) { - firstSub = i; - } - - if (secondShares[i] > secondShares[secondDom]) { - secondDom = i; - } - - if (secondShares[i] < secondShares[secondSub]) { - secondSub = i; - } + int secondSub = 1; + if (secondShares[1] > secondShares[0]) { + secondDom = 1; + secondSub = 0; } if (firstShares[firstDom] > secondShares[secondDom]) { @@ -280,7 +275,8 @@ public class DominantResourceCalculator extends ResourceCalculator { max[0] = 0.0; max[1] = 0.0; - for (int i = 0; i < clusterRes.length; i++) { + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { firstShares[i] = calculateShare(clusterRes[i], firstRes[i]); secondShares[i] = calculateShare(clusterRes[i], secondRes[i]); @@ -339,7 +335,7 @@ public class DominantResourceCalculator extends ResourceCalculator { public long computeAvailableContainers(Resource available, Resource required) { long min = Long.MAX_VALUE; - int maxLength = ResourceUtils.getResourceTypesArray().length; + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation availableResource = available .getResourceInformation(i); @@ -358,11 +354,12 @@ public class DominantResourceCalculator extends ResourceCalculator { @Override public float divide(Resource clusterResource, Resource numerator, Resource denominator) { + int nKnownResourceTypes = ResourceUtils.getNumberOfKnownResourceTypes(); ResourceInformation[] clusterRes = clusterResource.getResources(); // We have to provide the calculateShares() method with somewhere to store // the shares. We don't actually need these shares afterwards. - double[] numeratorShares = new double[clusterRes.length]; - double[] denominatorShares = new double[clusterRes.length]; + double[] numeratorShares = new double[nKnownResourceTypes]; + double[] denominatorShares = new double[nKnownResourceTypes]; // We also have to provide a place for calculateShares() to store the max // shares so that we can use them. double[] max = new double[2]; @@ -386,7 +383,7 @@ public class DominantResourceCalculator extends ResourceCalculator { @Override public float ratio(Resource a, Resource b) { float ratio = 0.0f; - int maxLength = ResourceUtils.getResourceTypesArray().length; + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation aResourceInformation = a.getResourceInformation(i); ResourceInformation bResourceInformation = b.getResourceInformation(i); @@ -407,7 +404,7 @@ public class DominantResourceCalculator extends ResourceCalculator { public Resource divideAndCeil(Resource numerator, long denominator) { Resource ret = Resource.newInstance(numerator); - int maxLength = ResourceUtils.getResourceTypesArray().length; + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation resourceInformation = ret.getResourceInformation(i); resourceInformation @@ -428,7 +425,7 @@ public class DominantResourceCalculator extends ResourceCalculator { public Resource normalize(Resource r, Resource minimumResource, Resource maximumResource, Resource stepFactor) { Resource ret = Resource.newInstance(r); - int maxLength = ResourceUtils.getResourceTypesArray().length; + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation minimumResourceInformation = minimumResource @@ -474,7 +471,7 @@ public class DominantResourceCalculator extends ResourceCalculator { private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) { Resource ret = Resource.newInstance(r); - int maxLength = ResourceUtils.getResourceTypesArray().length; + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation stepFactorResourceInformation = stepFactor @@ -513,7 +510,7 @@ public class DominantResourceCalculator extends ResourceCalculator { private Resource multiplyAndNormalize(Resource r, double by, Resource stepFactor, boolean roundUp) { Resource ret = Resource.newInstance(r); - int maxLength = ResourceUtils.getResourceTypesArray().length; + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation stepFactorResourceInformation = stepFactor @@ -542,7 +539,7 @@ public class DominantResourceCalculator extends ResourceCalculator { @Override public boolean fitsIn(Resource cluster, Resource smaller, Resource bigger) { - int maxLength = ResourceUtils.getResourceTypesArray().length; + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation sResourceInformation = smaller .getResourceInformation(i); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 1e2ce155505..325bce465b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -24,12 +24,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; -import org.apache.hadoop.yarn.api.records.impl.BaseResource; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.util.UnitsConversionUtil; -import java.util.Arrays; - /** * Resources is a computation class which provides a set of apis to do * mathematical operations on Resource object. @@ -45,9 +42,11 @@ public class Resources { * Helper class to create a resource with a fixed value for all resource * types. For example, a NONE resource which returns 0 for any resource type. */ - static class FixedValueResource extends BaseResource { + @InterfaceAudience.Private + @Unstable + static class FixedValueResource extends Resource { - private long resourceValue; + private final long resourceValue; private String name; /** @@ -100,6 +99,19 @@ public class Resources { throw new RuntimeException(name + " cannot be modified!"); } + @Override + public void setResourceInformation(int index, + ResourceInformation resourceInformation) + throws ResourceNotFoundException { + throw new RuntimeException(name + " cannot be modified!"); + } + + @Override + public void setResourceValue(int index, long value) + throws ResourceNotFoundException { + throw new RuntimeException(name + " cannot be modified!"); + } + @Override public void setResourceInformation(String resource, ResourceInformation resourceInformation) @@ -117,19 +129,11 @@ public class Resources { ResourceInformation[] types = ResourceUtils.getResourceTypesArray(); if (types != null) { resources = new ResourceInformation[types.length]; - readOnlyResources = new ResourceInformation[types.length]; for (int index = 0; index < types.length; index++) { resources[index] = ResourceInformation.newInstance(types[index]); resources[index].setValue(resourceValue); - - // this is a fix for getVirtualCores returning an int - if (resourceValue > Integer.MAX_VALUE && ResourceInformation.VCORES - .getName().equals(resources[index].getName())) { - resources[index].setValue((long) Integer.MAX_VALUE); - } } } - readOnlyResources = Arrays.copyOf(resources, resources.length); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java index a5550a70f22..d6bab9250df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java @@ -37,6 +37,8 @@ import java.util.Map; * Test class to verify all resource utility methods. */ public class TestResourceUtils { + public static final String TEST_CONF_RESET_RESOURCE_TYPES = + "yarn.test.reset-resource-types"; static class ResourceFileInformation { String filename; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index fd6b1e9939d..d09be8b91d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1814,8 +1814,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { if (newTrackingUrl != null && !newTrackingUrl.equals(appAttempt.originalTrackingUrl)) { appAttempt.originalTrackingUrl = newTrackingUrl; - AggregateAppResourceUsage resUsage = - appAttempt.attemptMetrics.getAggregateAppResourceUsage(); ApplicationAttemptStateData attemptState = ApplicationAttemptStateData .newInstance(appAttempt.applicationAttemptId, appAttempt.getMasterContainer(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 2d7f3f6e088..07545845f5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.TestResourceUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -151,7 +152,10 @@ public class MockRM extends ResourceManager { public MockRM(Configuration conf, RMStateStore store, boolean useNullRMNodeLabelsManager, boolean useRealElector) { super(); - ResourceUtils.resetResourceTypes(conf); + if (conf.getBoolean(TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES, + true)) { + ResourceUtils.resetResourceTypes(conf); + } this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager; this.useRealElector = useRealElector; init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index d23ef59b3a5..bb14e1e7ada 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -4341,143 +4341,6 @@ public class TestCapacityScheduler { rm.stop(); } - @Test (timeout = 300000) - public void testUserLimitThroughput() throws Exception { - // Since this is more of a performance unit test, only run if - // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true) - Assume.assumeTrue(Boolean.valueOf( - System.getProperty("RunUserLimitThroughput"))); - - CapacitySchedulerConfiguration csconf = - new CapacitySchedulerConfiguration(); - csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f); - csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f); - csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", - 100.0f); - csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f); - csconf.setResourceComparator(DominantResourceCalculator.class); - - YarnConfiguration conf = new YarnConfiguration(csconf); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - - MockRM rm = new MockRM(conf); - rm.start(); - - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - LeafQueue qb = (LeafQueue)cs.getQueue("default"); - - // For now make user limit large so we can activate all applications - qb.setUserLimitFactor((float)100.0); - qb.setupConfigurableCapacities(); - - SchedulerEvent addAppEvent; - SchedulerEvent addAttemptEvent; - Container container = mock(Container.class); - ApplicationSubmissionContext submissionContext = - mock(ApplicationSubmissionContext.class); - - final int appCount = 100; - ApplicationId[] appids = new ApplicationId[appCount]; - RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount]; - ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount]; - RMAppImpl[] apps = new RMAppImpl[appCount]; - RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount]; - for (int i=0; i loggers=LogManager.getCurrentLoggers(); - loggers.hasMoreElements(); ) { - Logger logger = (Logger) loggers.nextElement(); - logger.setLevel(Level.WARN); - } - final int topn = 20; - final int iterations = 2000000; - final int printInterval = 20000; - final float numerator = 1000.0f * printInterval; - PriorityQueue queue = new PriorityQueue<>(topn, - Collections.reverseOrder()); - - long n = Time.monotonicNow(); - long timespent = 0; - for (int i = 0; i < iterations; i+=2) { - if (i > 0 && i % printInterval == 0){ - long ts = (Time.monotonicNow() - n); - if (queue.size() < topn) { - queue.offer(ts); - } else { - Long last = queue.peek(); - if (last > ts) { - queue.poll(); - queue.offer(ts); - } - } - System.out.println(i + " " + (numerator / ts)); - n= Time.monotonicNow(); - } - cs.handle(new NodeUpdateSchedulerEvent(node)); - cs.handle(new NodeUpdateSchedulerEvent(node2)); - } - timespent=0; - int entries = queue.size(); - while(queue.size() > 0){ - long l = queue.poll(); - timespent += l; - } - System.out.println("Avg of fastest " + entries + ": " - + numerator / (timespent / entries)); - rm.stop(); - } - @Test public void testCSQueueBlocked() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java new file mode 100644 index 00000000000..0837fd7205a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assume; +import org.junit.Test; + +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +import static org.apache.hadoop.yarn.util.resource.TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCapacitySchedulerPerf { + private final int GB = 1024; + + private String getResourceName(int idx) { + return "resource-" + idx; + } + + private void testUserLimitThroughputWithNumberOfResourceTypes( + int numOfResourceTypes) + throws Exception { + if (numOfResourceTypes > 2) { + // Initialize resource map + Map riMap = new HashMap<>(); + + // Initialize mandatory resources + riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB); + riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES); + + for (int i = 2; i < numOfResourceTypes; i++) { + String resourceName = getResourceName(i); + riMap.put(resourceName, ResourceInformation + .newInstance(resourceName, "", 0, ResourceTypes.COUNTABLE, 0, + Integer.MAX_VALUE)); + } + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + } + + // Since this is more of a performance unit test, only run if + // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true) + Assume.assumeTrue(Boolean.valueOf( + System.getProperty("RunCapacitySchedulerPerfTests"))); + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", + 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f); + csconf.setResourceComparator(DominantResourceCalculator.class); + + YarnConfiguration conf = new YarnConfiguration(csconf); + // Don't reset resource types since we have already configured resource types + conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue qb = (LeafQueue)cs.getQueue("default"); + + // For now make user limit large so we can activate all applications + qb.setUserLimitFactor((float)100.0); + qb.setupConfigurableCapacities(); + + SchedulerEvent addAppEvent; + SchedulerEvent addAttemptEvent; + Container container = mock(Container.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); + + final int appCount = 100; + ApplicationId[] appids = new ApplicationId[appCount]; + RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount]; + ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount]; + RMAppImpl[] apps = new RMAppImpl[appCount]; + RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount]; + for (int i=0; i 2) { + for (int i = 2; i < numOfResourceTypes; i++) { + nodeResource.setResourceValue(getResourceName(i), 10); + } + } + + RMNode node = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.1"); + cs.handle(new NodeAddedSchedulerEvent(node)); + + RMNode node2 = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.2"); + cs.handle(new NodeAddedSchedulerEvent(node2)); + + Priority u0Priority = TestUtils.createMockPriority(1); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount]; + for (int i=0;i 2) { + for (int j = 2; j < numOfResourceTypes; j++) { + resourceRequest.getCapability().setResourceValue(getResourceName(j), + 10); + } + } + + // allocate container for app2 with 1GB memory and 1 vcore + fiCaApps[i].updateResourceRequests( + Collections.singletonList(resourceRequest)); + } + // Now force everything to be over user limit + qb.setUserLimitFactor((float)0.0); + + // Quiet the loggers while measuring throughput + for (Enumeration loggers = LogManager.getCurrentLoggers(); + loggers.hasMoreElements(); ) { + Logger logger = (Logger) loggers.nextElement(); + logger.setLevel(Level.WARN); + } + final int topn = 20; + final int iterations = 2000000; + final int printInterval = 20000; + final float numerator = 1000.0f * printInterval; + PriorityQueue queue = new PriorityQueue<>(topn, + Collections.reverseOrder()); + + long n = Time.monotonicNow(); + long timespent = 0; + for (int i = 0; i < iterations; i+=2) { + if (i > 0 && i % printInterval == 0){ + long ts = (Time.monotonicNow() - n); + if (queue.size() < topn) { + queue.offer(ts); + } else { + Long last = queue.peek(); + if (last > ts) { + queue.poll(); + queue.offer(ts); + } + } + System.out.println(i + " " + (numerator / ts)); + n= Time.monotonicNow(); + } + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + } + timespent=0; + int entries = queue.size(); + while(queue.size() > 0){ + long l = queue.poll(); + timespent += l; + } + System.out.println( + "#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries + + ": " + numerator / (timespent / entries)); + rm.stop(); + } + + @Test(timeout = 300000) + public void testUserLimitThroughputForTwoResources() throws Exception { + testUserLimitThroughputWithNumberOfResourceTypes(2); + } + + @Test(timeout = 300000) + public void testUserLimitThroughputForThreeResources() throws Exception { + testUserLimitThroughputWithNumberOfResourceTypes(3); + } + + @Test(timeout = 300000) + public void testUserLimitThroughputForFourResources() throws Exception { + testUserLimitThroughputWithNumberOfResourceTypes(4); + } + + @Test(timeout = 300000) + public void testUserLimitThroughputForFiveResources() throws Exception { + testUserLimitThroughputWithNumberOfResourceTypes(5); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 75b7bffe199..611cfcc4d38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -102,6 +102,8 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.yarn.util.resource.TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES; + /** *

* Embedded Yarn minicluster for testcases that need to interact with a cluster. @@ -261,7 +263,10 @@ public class MiniYARNCluster extends CompositeService { YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC); failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); - ResourceUtils.resetResourceTypes(conf); + + if (conf.getBoolean(TEST_CONF_RESET_RESOURCE_TYPES, true)) { + ResourceUtils.resetResourceTypes(conf); + } if (useRpc && !useFixedPorts) { throw new YarnRuntimeException("Invalid configuration!" +