YARN-9188. Port YARN-7136 to branch-2

This commit is contained in:
Jonathan Hung 2019-01-09 16:01:06 -05:00
parent 26e87be3f2
commit d6673c5491
14 changed files with 524 additions and 341 deletions

View File

@ -629,7 +629,7 @@
</Match> </Match>
<Match> <Match>
<Class name="org.apache.hadoop.yarn.api.records.impl.BaseResource" /> <Class name="org.apache.hadoop.yarn.api.records.Resource" />
<Method name="getResources" /> <Method name="getResources" />
<Bug pattern="EI_EXPOSE_REP" /> <Bug pattern="EI_EXPOSE_REP" />
</Match> </Match>

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.impl.BaseResource; import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@ -59,8 +59,15 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@Stable @Stable
public abstract class Resource implements Comparable<Resource> { public abstract class Resource implements Comparable<Resource> {
protected static final String MEMORY = ResourceInformation.MEMORY_MB.getName(); protected ResourceInformation[] resources = null;
protected static final String VCORES = ResourceInformation.VCORES.getName();
// Number of mandatory resources, this is added to avoid invoke
// MandatoryResources.values().length, since values() internally will
// copy array, etc.
protected static final int NUM_MANDATORY_RESOURCES = 2;
protected static final int MEMORY_INDEX = 0;
protected static final int VCORES_INDEX = 1;
@Public @Public
@Stable @Stable
@ -71,7 +78,7 @@ public abstract class Resource implements Comparable<Resource> {
ret.setVirtualCores(vCores); ret.setVirtualCores(vCores);
return ret; return ret;
} }
return new BaseResource(memory, vCores); return new LightWeightResource(memory, vCores);
} }
@Public @Public
@ -83,7 +90,7 @@ public abstract class Resource implements Comparable<Resource> {
ret.setVirtualCores(vCores); ret.setVirtualCores(vCores);
return ret; return ret;
} }
return new BaseResource(memory, vCores); return new LightWeightResource(memory, vCores);
} }
@InterfaceAudience.Private @InterfaceAudience.Private
@ -201,7 +208,9 @@ public abstract class Resource implements Comparable<Resource> {
*/ */
@Public @Public
@Evolving @Evolving
public abstract ResourceInformation[] getResources(); public ResourceInformation[] getResources() {
return resources;
}
/** /**
* Get ResourceInformation for a specified resource. * Get ResourceInformation for a specified resource.
@ -215,7 +224,6 @@ public abstract class Resource implements Comparable<Resource> {
public ResourceInformation getResourceInformation(String resource) public ResourceInformation getResourceInformation(String resource)
throws ResourceNotFoundException { throws ResourceNotFoundException {
Integer index = ResourceUtils.getResourceTypeIndex().get(resource); Integer index = ResourceUtils.getResourceTypeIndex().get(resource);
ResourceInformation[] resources = getResources();
if (index != null) { if (index != null) {
return resources[index]; return resources[index];
} }
@ -236,12 +244,13 @@ public abstract class Resource implements Comparable<Resource> {
@Evolving @Evolving
public ResourceInformation getResourceInformation(int index) public ResourceInformation getResourceInformation(int index)
throws ResourceNotFoundException { throws ResourceNotFoundException {
ResourceInformation[] resources = getResources(); ResourceInformation ri = null;
if (index < 0 || index >= resources.length) { try {
throw new ResourceNotFoundException("Unknown resource at index '" + index ri = resources[index];
+ "'. Vaid resources are: " + Arrays.toString(resources)); } catch (ArrayIndexOutOfBoundsException e) {
throwExceptionWhenArrayOutOfBound(index);
} }
return resources[index]; return ri;
} }
/** /**
@ -271,11 +280,11 @@ public abstract class Resource implements Comparable<Resource> {
public void setResourceInformation(String resource, public void setResourceInformation(String resource,
ResourceInformation resourceInformation) ResourceInformation resourceInformation)
throws ResourceNotFoundException { throws ResourceNotFoundException {
if (resource.equals(MEMORY)) { if (resource.equals(ResourceInformation.MEMORY_URI)) {
this.setMemorySize(resourceInformation.getValue()); this.setMemorySize(resourceInformation.getValue());
return; return;
} }
if (resource.equals(VCORES)) { if (resource.equals(ResourceInformation.VCORES_URI)) {
this.setVirtualCores((int) resourceInformation.getValue()); this.setVirtualCores((int) resourceInformation.getValue());
return; return;
} }
@ -298,7 +307,6 @@ public abstract class Resource implements Comparable<Resource> {
public void setResourceInformation(int index, public void setResourceInformation(int index,
ResourceInformation resourceInformation) ResourceInformation resourceInformation)
throws ResourceNotFoundException { throws ResourceNotFoundException {
ResourceInformation[] resources = getResources();
if (index < 0 || index >= resources.length) { if (index < 0 || index >= resources.length) {
throw new ResourceNotFoundException("Unknown resource at index '" + index throw new ResourceNotFoundException("Unknown resource at index '" + index
+ "'. Valid resources are " + Arrays.toString(resources)); + "'. Valid resources are " + Arrays.toString(resources));
@ -318,11 +326,11 @@ public abstract class Resource implements Comparable<Resource> {
@Evolving @Evolving
public void setResourceValue(String resource, long value) public void setResourceValue(String resource, long value)
throws ResourceNotFoundException { throws ResourceNotFoundException {
if (resource.equals(MEMORY)) { if (resource.equals(ResourceInformation.MEMORY_URI)) {
this.setMemorySize(value); this.setMemorySize(value);
return; return;
} }
if (resource.equals(VCORES)) { if (resource.equals(ResourceInformation.VCORES_URI)) {
this.setVirtualCores((int)value); this.setVirtualCores((int)value);
return; return;
} }
@ -346,27 +354,21 @@ public abstract class Resource implements Comparable<Resource> {
@Evolving @Evolving
public void setResourceValue(int index, long value) public void setResourceValue(int index, long value)
throws ResourceNotFoundException { throws ResourceNotFoundException {
ResourceInformation[] resources = getResources(); try {
if (index < 0 || index >= resources.length) { resources[index].setValue(value);
throw new ResourceNotFoundException("Unknown resource at index '" + index } catch (ArrayIndexOutOfBoundsException e) {
+ "'. Valid resources are " + Arrays.toString(resources)); throwExceptionWhenArrayOutOfBound(index);
} }
resources[index].setValue(value);
} }
@Override private void throwExceptionWhenArrayOutOfBound(int index) {
public int hashCode() { String exceptionMsg = String.format(
final int prime = 263167; "Trying to access ResourceInformation for given index=%d. "
+ "Acceptable index range is [0,%d), please check double check "
+ "configured resources in resource-types.xml",
index, ResourceUtils.getNumberOfKnownResourceTypes());
int result = (int) (939769357 throw new ResourceNotFoundException(exceptionMsg);
+ getMemorySize()); // prime * result = 939769357 initially
result = prime * result + getVirtualCores();
for (ResourceInformation entry : getResources()) {
if (!entry.getName().equals(MEMORY) && !entry.getName().equals(VCORES)) {
result = prime * result + entry.hashCode();
}
}
return result;
} }
@Override @Override
@ -381,20 +383,15 @@ public abstract class Resource implements Comparable<Resource> {
return false; return false;
} }
Resource other = (Resource) obj; Resource other = (Resource) obj;
if (getMemorySize() != other.getMemorySize()
|| getVirtualCores() != other.getVirtualCores()) {
return false;
}
ResourceInformation[] myVectors = getResources();
ResourceInformation[] otherVectors = other.getResources(); ResourceInformation[] otherVectors = other.getResources();
if (myVectors.length != otherVectors.length) { if (resources.length != otherVectors.length) {
return false; return false;
} }
for (int i = 0; i < myVectors.length; i++) { for (int i = 0; i < resources.length; i++) {
ResourceInformation a = myVectors[i]; ResourceInformation a = resources[i];
ResourceInformation b = otherVectors[i]; ResourceInformation b = otherVectors[i];
if ((a != b) && ((a == null) || !a.equals(b))) { if ((a != b) && ((a == null) || !a.equals(b))) {
return false; return false;
@ -403,64 +400,71 @@ public abstract class Resource implements Comparable<Resource> {
return true; return true;
} }
@Override
public int compareTo(Resource other) {
ResourceInformation[] otherResources = other.getResources();
int arrLenThis = this.resources.length;
int arrLenOther = otherResources.length;
// compare memory and vcores first(in that order) to preserve
// existing behaviour
for (int i = 0; i < arrLenThis; i++) {
ResourceInformation otherEntry;
try {
otherEntry = otherResources[i];
} catch (ArrayIndexOutOfBoundsException e) {
// For two vectors with different size and same prefix. Shorter vector
// goes first.
return 1;
}
ResourceInformation entry = resources[i];
long diff = entry.compareTo(otherEntry);
if (diff > 0) {
return 1;
} else if (diff < 0) {
return -1;
}
}
if (arrLenThis < arrLenOther) {
return -1;
}
return 0;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("<memory:").append(getMemorySize()).append(", vCores:") sb.append("<memory:").append(getMemorySize()).append(", vCores:")
.append(getVirtualCores()); .append(getVirtualCores());
for (ResourceInformation entry : getResources()) {
if (entry.getName().equals(MEMORY) for (int i = 2; i < resources.length; i++) {
&& entry.getUnits() ResourceInformation ri = resources[i];
.equals(ResourceInformation.MEMORY_MB.getUnits())) { if (ri.getValue() == 0) {
continue; continue;
} }
if (entry.getName().equals(VCORES) sb.append(", ");
&& entry.getUnits() sb.append(ri.getName()).append(": ")
.equals(ResourceInformation.VCORES.getUnits())) { .append(ri.getValue());
continue; sb.append(ri.getUnits());
}
sb.append(", ").append(entry.getName()).append(": ")
.append(entry.getValue())
.append(entry.getUnits());
} }
sb.append(">"); sb.append(">");
return sb.toString(); return sb.toString();
} }
@Override @Override
public int compareTo(Resource other) { public int hashCode() {
ResourceInformation[] thisResources = this.getResources(); final int prime = 47;
ResourceInformation[] otherResources = other.getResources(); long result = 0;
for (ResourceInformation entry : resources) {
// compare memory and vcores first(in that order) to preserve result = prime * result + entry.hashCode();
// existing behaviour
long diff = this.getMemorySize() - other.getMemorySize();
if (diff == 0) {
diff = this.getVirtualCores() - other.getVirtualCores();
} }
if (diff == 0) { return (int) result;
diff = thisResources.length - otherResources.length;
if (diff == 0) {
int maxLength = ResourceUtils.getResourceTypesArray().length;
for (int i = 0; i < maxLength; i++) {
// For memory and vcores, we can skip the loop as it's already
// compared.
if (i < 2) {
continue;
}
ResourceInformation entry = thisResources[i];
ResourceInformation otherEntry = otherResources[i];
if (entry.getName().equals(otherEntry.getName())) {
diff = entry.compareTo(otherEntry);
if (diff != 0) {
break;
}
}
}
}
}
return Long.compare(diff, 0);
} }
/** /**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.api.records; package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.UnitsConversionUtil;
@ -34,8 +35,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
private long minimumAllocation; private long minimumAllocation;
private long maximumAllocation; private long maximumAllocation;
private static final String MEMORY_URI = "memory-mb"; public static final String MEMORY_URI = "memory-mb";
private static final String VCORES_URI = "vcores"; public static final String VCORES_URI = "vcores";
public static final ResourceInformation MEMORY_MB = public static final ResourceInformation MEMORY_MB =
ResourceInformation.newInstance(MEMORY_URI, "Mi"); ResourceInformation.newInstance(MEMORY_URI, "Mi");
@ -83,6 +84,16 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
this.units = rUnits; this.units = rUnits;
} }
/**
* Checking if a unit included by KNOWN_UNITS is an expensive operation. This
* can be avoided in critical path in RM.
* @param rUnits units for the resource
*/
@InterfaceAudience.Private
public void setUnitsWithoutValidation(String rUnits) {
this.units = rUnits;
}
/** /**
* Get the resource type. * Get the resource type.
* *

View File

@ -18,19 +18,24 @@
package org.apache.hadoop.yarn.api.records.impl; package org.apache.hadoop.yarn.api.records.impl;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceInformation;
import java.util.Arrays; import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_MB;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
/** /**
* <p> * <p>
* <code>BaseResource</code> extends Resource to handle base resources such * <code>LightResource</code> extends Resource to handle base resources such
* as memory and CPU. * as memory and CPU.
* TODO: We have a long term plan to use AbstractResource when additional * TODO: We have a long term plan to use AbstractResource when additional
* resource types are to be handled as well. * resource types are to be handled as well.
* This will be used to speed up internal calculation to avoid creating
* costly PB-backed Resource object: <code>ResourcePBImpl</code>
* </p> * </p>
* *
* <p> * <p>
@ -54,48 +59,34 @@ import java.util.Arrays;
* *
* @see Resource * @see Resource
*/ */
@Public @InterfaceAudience.Private
@Unstable @Unstable
public class BaseResource extends Resource { public class LightWeightResource extends Resource {
private ResourceInformation memoryResInfo; private ResourceInformation memoryResInfo;
private ResourceInformation vcoresResInfo; private ResourceInformation vcoresResInfo;
protected ResourceInformation[] resources = null;
protected ResourceInformation[] readOnlyResources = null;
// Number of mandatory resources, this is added to avoid invoke public LightWeightResource(long memory, long vcores) {
// MandatoryResources.values().length, since values() internally will this.memoryResInfo = LightWeightResource.newDefaultInformation(MEMORY_URI,
// copy array, etc. MEMORY_MB.getUnits(), memory);
private static final int NUM_MANDATORY_RESOURCES = 2; this.vcoresResInfo = LightWeightResource.newDefaultInformation(VCORES_URI,
"", vcores);
protected enum MandatoryResources {
MEMORY(0), VCORES(1);
private final int id;
MandatoryResources(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
}
public BaseResource() {
// Base constructor.
}
public BaseResource(long memory, long vcores) {
this.memoryResInfo = ResourceInformation.newInstance(MEMORY,
ResourceInformation.MEMORY_MB.getUnits(), memory);
this.vcoresResInfo = ResourceInformation.newInstance(VCORES, "", vcores);
resources = new ResourceInformation[NUM_MANDATORY_RESOURCES]; resources = new ResourceInformation[NUM_MANDATORY_RESOURCES];
readOnlyResources = new ResourceInformation[NUM_MANDATORY_RESOURCES]; resources[MEMORY_INDEX] = memoryResInfo;
resources[MandatoryResources.MEMORY.id] = memoryResInfo; resources[VCORES_INDEX] = vcoresResInfo;
resources[MandatoryResources.VCORES.id] = vcoresResInfo; }
readOnlyResources = Arrays.copyOf(resources, resources.length);
private static ResourceInformation newDefaultInformation(String name,
String unit, long value) {
ResourceInformation ri = new ResourceInformation();
ri.setName(name);
ri.setValue(value);
ri.setResourceType(ResourceTypes.COUNTABLE);
ri.setUnitsWithoutValidation(unit);
ri.setMinimumAllocation(0);
ri.setMaximumAllocation(Long.MAX_VALUE);
return ri;
} }
@Override @Override
@ -131,7 +122,42 @@ public class BaseResource extends Resource {
} }
@Override @Override
public ResourceInformation[] getResources() { public boolean equals(Object obj) {
return readOnlyResources; if (this == obj) {
return true;
}
if (obj == null || !(obj instanceof Resource)) {
return false;
}
Resource other = (Resource) obj;
if (getMemorySize() != other.getMemorySize()
|| getVirtualCores() != other.getVirtualCores()) {
return false;
}
return true;
}
@Override
public int compareTo(Resource other) {
// compare memory and vcores first(in that order) to preserve
// existing behaviour
long diff = this.getMemorySize() - other.getMemorySize();
if (diff == 0) {
return this.getVirtualCores() - other.getVirtualCores();
} else if (diff > 0){
return 1;
} else {
return -1;
}
}
@Override
public int hashCode() {
final int prime = 47;
long result = prime + getMemorySize();
result = prime * result + getVirtualCores();
return (int) result;
} }
} }

View File

@ -244,13 +244,28 @@ public class ResourceUtils {
minimumAllocation, maximumAllocation)); minimumAllocation, maximumAllocation));
} }
} }
checkMandatoryResources(resourceInformationMap); checkMandatoryResources(resourceInformationMap);
addMandatoryResources(resourceInformationMap); addMandatoryResources(resourceInformationMap);
setMinimumAllocationForMandatoryResources(resourceInformationMap, conf); setMinimumAllocationForMandatoryResources(resourceInformationMap, conf);
setMaximumAllocationForMandatoryResources(resourceInformationMap, conf); setMaximumAllocationForMandatoryResources(resourceInformationMap, conf);
initializeResourcesFromResourceInformationMap(resourceInformationMap);
}
/**
* This method is visible for testing, unit test can construct a
* resourceInformationMap and pass it to this method to initialize multiple resources.
* @param resourceInformationMap constructed resource information map.
*/
@VisibleForTesting
public static void initializeResourcesFromResourceInformationMap(
Map<String, ResourceInformation> resourceInformationMap) {
resourceTypes = Collections.unmodifiableMap(resourceInformationMap); resourceTypes = Collections.unmodifiableMap(resourceInformationMap);
updateKnownResources(); updateKnownResources();
updateResourceTypeIndex(); updateResourceTypeIndex();
initializedResources = true;
} }
private static void updateKnownResources() { private static void updateKnownResources() {
@ -347,14 +362,12 @@ public class ResourceUtils {
try { try {
addResourcesFileToConf(resourceFile, conf); addResourcesFileToConf(resourceFile, conf);
LOG.debug("Found " + resourceFile + ", adding to configuration"); LOG.debug("Found " + resourceFile + ", adding to configuration");
initializeResourcesMap(conf);
initializedResources = true;
} catch (FileNotFoundException fe) { } catch (FileNotFoundException fe) {
LOG.info("Unable to find '" + resourceFile LOG.info("Unable to find '" + resourceFile
+ "'. Falling back to memory and vcores as resources."); + "'. Falling back to memory and vcores as resources.");
initializeResourcesMap(conf);
initializedResources = true;
} }
initializeResourcesMap(conf);
} }
} }
} }
@ -558,7 +571,7 @@ public class ResourceUtils {
*/ */
public static String getDefaultUnit(String resourceType) { public static String getDefaultUnit(String resourceType) {
ResourceInformation ri = getResourceTypes().get(resourceType); ResourceInformation ri = getResourceTypes().get(resourceType);
if (null != ri) { if (ri != null) {
return ri.getUnits(); return ri.getUnits();
} }
return ""; return "";

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.impl.BaseResource;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@ -34,13 +33,12 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceInformationProto;
import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.Arrays;
import java.util.Map; import java.util.Map;
@Private @Private
@Unstable @Unstable
public class ResourcePBImpl extends BaseResource { public class ResourcePBImpl extends Resource {
private static final Log LOG = LogFactory.getLog(ResourcePBImpl.class); private static final Log LOG = LogFactory.getLog(ResourcePBImpl.class);
@ -95,7 +93,7 @@ public class ResourcePBImpl extends BaseResource {
@Override @Override
public long getMemorySize() { public long getMemorySize() {
// memory should always be present // memory should always be present
ResourceInformation ri = resources[MandatoryResources.MEMORY.getId()]; ResourceInformation ri = resources[MEMORY_INDEX];
if (ri.getUnits().isEmpty()) { if (ri.getUnits().isEmpty()) {
return ri.getValue(); return ri.getValue();
@ -113,19 +111,19 @@ public class ResourcePBImpl extends BaseResource {
@Override @Override
public void setMemorySize(long memory) { public void setMemorySize(long memory) {
maybeInitBuilder(); maybeInitBuilder();
getResourceInformation(MEMORY).setValue(memory); getResourceInformation(ResourceInformation.MEMORY_URI).setValue(memory);
} }
@Override @Override
public int getVirtualCores() { public int getVirtualCores() {
// vcores should always be present // vcores should always be present
return (int) resources[MandatoryResources.VCORES.getId()].getValue(); return (int) resources[VCORES_INDEX].getValue();
} }
@Override @Override
public void setVirtualCores(int vCores) { public void setVirtualCores(int vCores) {
maybeInitBuilder(); maybeInitBuilder();
getResourceInformation(VCORES).setValue(vCores); getResourceInformation(ResourceInformation.VCORES_URI).setValue(vCores);
} }
private void initResources() { private void initResources() {
@ -156,7 +154,6 @@ public class ResourcePBImpl extends BaseResource {
resources[index].setValue(value); resources[index].setValue(value);
} }
} }
readOnlyResources = Arrays.copyOf(resources, resources.length);
this.setMemorySize(p.getMemory()); this.setMemorySize(p.getMemory());
this.setVirtualCores(p.getVirtualCores()); this.setVirtualCores(p.getVirtualCores());
} }
@ -186,11 +183,6 @@ public class ResourcePBImpl extends BaseResource {
getResourceInformation(resource).setValue(value); getResourceInformation(resource).setValue(value);
} }
@Override
public ResourceInformation[] getResources() {
return super.getResources();
}
@Override @Override
public ResourceInformation getResourceInformation(String resource) public ResourceInformation getResourceInformation(String resource)
throws ResourceNotFoundException { throws ResourceNotFoundException {
@ -212,7 +204,6 @@ public class ResourcePBImpl extends BaseResource {
} }
resources = new ResourceInformation[types.length]; resources = new ResourceInformation[types.length];
readOnlyResources = new ResourceInformation[types.length];
for (ResourceInformation entry : types) { for (ResourceInformation entry : types) {
int index = ResourceUtils.getResourceTypeIndex().get(entry.getName()); int index = ResourceUtils.getResourceTypeIndex().get(entry.getName());
resources[index] = ResourceInformation.newInstance(entry); resources[index] = ResourceInformation.newInstance(entry);

View File

@ -73,7 +73,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
boolean rhsGreater = false; boolean rhsGreater = false;
int ret = 0; int ret = 0;
int maxLength = ResourceUtils.getResourceTypesArray().length; int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) { for (int i = 0; i < maxLength; i++) {
ResourceInformation lhsResourceInformation = lhs ResourceInformation lhsResourceInformation = lhs
.getResourceInformation(i); .getResourceInformation(i);
@ -111,10 +111,12 @@ public class DominantResourceCalculator extends ResourceCalculator {
// resources and then look for which resource has the biggest // resources and then look for which resource has the biggest
// share overall. // share overall.
ResourceInformation[] clusterRes = clusterResource.getResources(); ResourceInformation[] clusterRes = clusterResource.getResources();
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
// If array creation shows up as a time sink, these arrays could be cached // If array creation shows up as a time sink, these arrays could be cached
// because they're always the same length. // because they're always the same length.
double[] lhsShares = new double[clusterRes.length]; double[] lhsShares = new double[maxLength];
double[] rhsShares = new double[clusterRes.length]; double[] rhsShares = new double[maxLength];
double diff; double diff;
try { try {
@ -124,10 +126,10 @@ public class DominantResourceCalculator extends ResourceCalculator {
calculateShares(clusterRes, lhs, rhs, lhsShares, rhsShares, max); calculateShares(clusterRes, lhs, rhs, lhsShares, rhsShares, max);
diff = max[0] - max[1]; diff = max[0] - max[1];
} else if (clusterRes.length == 2) { } else if (maxLength == 2) {
// Special case to handle the common scenario of only CPU and memory // Special case to handle the common scenario of only CPU and memory
// so that we can optimize for performance // so that we can optimize for performance
diff = calculateSharesForMandatoryResources(clusterRes, lhs, rhs, diff = calculateSharesForTwoMandatoryResources(clusterRes, lhs, rhs,
lhsShares, rhsShares); lhsShares, rhsShares);
} else { } else {
calculateShares(clusterRes, lhs, rhs, lhsShares, rhsShares); calculateShares(clusterRes, lhs, rhs, lhsShares, rhsShares);
@ -182,7 +184,8 @@ public class DominantResourceCalculator extends ResourceCalculator {
ResourceInformation[] firstRes = first.getResources(); ResourceInformation[] firstRes = first.getResources();
ResourceInformation[] secondRes = second.getResources(); ResourceInformation[] secondRes = second.getResources();
for (int i = 0; i < clusterRes.length; i++) { int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) {
firstShares[i] = calculateShare(clusterRes[i], firstRes[i]); firstShares[i] = calculateShare(clusterRes[i], firstRes[i]);
secondShares[i] = calculateShare(clusterRes[i], secondRes[i]); secondShares[i] = calculateShare(clusterRes[i], secondRes[i]);
} }
@ -205,35 +208,27 @@ public class DominantResourceCalculator extends ResourceCalculator {
* second resource, respectively * second resource, respectively
* @throws NullPointerException if any parameter is null * @throws NullPointerException if any parameter is null
*/ */
private int calculateSharesForMandatoryResources( private int calculateSharesForTwoMandatoryResources(
ResourceInformation[] clusterRes, Resource first, Resource second, ResourceInformation[] clusterRes, Resource first, Resource second,
double[] firstShares, double[] secondShares) { double[] firstShares, double[] secondShares) {
ResourceInformation[] firstRes = first.getResources(); ResourceInformation[] firstRes = first.getResources();
ResourceInformation[] secondRes = second.getResources(); ResourceInformation[] secondRes = second.getResources();
firstShares[0] = calculateShare(clusterRes[0], firstRes[0]);
secondShares[0] = calculateShare(clusterRes[0], secondRes[0]);
firstShares[1] = calculateShare(clusterRes[1], firstRes[1]);
secondShares[1] = calculateShare(clusterRes[1], secondRes[1]);
int firstDom = 0; int firstDom = 0;
int firstSub = 1;
if (firstShares[1] > firstShares[0]) {
firstDom = 1;
firstSub = 0;
}
int secondDom = 0; int secondDom = 0;
int firstSub = 0; int secondSub = 1;
int secondSub = 0; if (secondShares[1] > secondShares[0]) {
secondDom = 1;
for (int i = 0; i < clusterRes.length; i++) { secondSub = 0;
firstShares[i] = calculateShare(clusterRes[i], firstRes[i]);
secondShares[i] = calculateShare(clusterRes[i], secondRes[i]);
if (firstShares[i] > firstShares[firstDom]) {
firstDom = i;
}
if (firstShares[i] < firstShares[firstSub]) {
firstSub = i;
}
if (secondShares[i] > secondShares[secondDom]) {
secondDom = i;
}
if (secondShares[i] < secondShares[secondSub]) {
secondSub = i;
}
} }
if (firstShares[firstDom] > secondShares[secondDom]) { if (firstShares[firstDom] > secondShares[secondDom]) {
@ -280,7 +275,8 @@ public class DominantResourceCalculator extends ResourceCalculator {
max[0] = 0.0; max[0] = 0.0;
max[1] = 0.0; max[1] = 0.0;
for (int i = 0; i < clusterRes.length; i++) { int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) {
firstShares[i] = calculateShare(clusterRes[i], firstRes[i]); firstShares[i] = calculateShare(clusterRes[i], firstRes[i]);
secondShares[i] = calculateShare(clusterRes[i], secondRes[i]); secondShares[i] = calculateShare(clusterRes[i], secondRes[i]);
@ -339,7 +335,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
public long computeAvailableContainers(Resource available, public long computeAvailableContainers(Resource available,
Resource required) { Resource required) {
long min = Long.MAX_VALUE; long min = Long.MAX_VALUE;
int maxLength = ResourceUtils.getResourceTypesArray().length; int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) { for (int i = 0; i < maxLength; i++) {
ResourceInformation availableResource = available ResourceInformation availableResource = available
.getResourceInformation(i); .getResourceInformation(i);
@ -358,11 +354,12 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override @Override
public float divide(Resource clusterResource, public float divide(Resource clusterResource,
Resource numerator, Resource denominator) { Resource numerator, Resource denominator) {
int nKnownResourceTypes = ResourceUtils.getNumberOfKnownResourceTypes();
ResourceInformation[] clusterRes = clusterResource.getResources(); ResourceInformation[] clusterRes = clusterResource.getResources();
// We have to provide the calculateShares() method with somewhere to store // We have to provide the calculateShares() method with somewhere to store
// the shares. We don't actually need these shares afterwards. // the shares. We don't actually need these shares afterwards.
double[] numeratorShares = new double[clusterRes.length]; double[] numeratorShares = new double[nKnownResourceTypes];
double[] denominatorShares = new double[clusterRes.length]; double[] denominatorShares = new double[nKnownResourceTypes];
// We also have to provide a place for calculateShares() to store the max // We also have to provide a place for calculateShares() to store the max
// shares so that we can use them. // shares so that we can use them.
double[] max = new double[2]; double[] max = new double[2];
@ -386,7 +383,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override @Override
public float ratio(Resource a, Resource b) { public float ratio(Resource a, Resource b) {
float ratio = 0.0f; float ratio = 0.0f;
int maxLength = ResourceUtils.getResourceTypesArray().length; int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) { for (int i = 0; i < maxLength; i++) {
ResourceInformation aResourceInformation = a.getResourceInformation(i); ResourceInformation aResourceInformation = a.getResourceInformation(i);
ResourceInformation bResourceInformation = b.getResourceInformation(i); ResourceInformation bResourceInformation = b.getResourceInformation(i);
@ -407,7 +404,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
public Resource divideAndCeil(Resource numerator, long denominator) { public Resource divideAndCeil(Resource numerator, long denominator) {
Resource ret = Resource.newInstance(numerator); Resource ret = Resource.newInstance(numerator);
int maxLength = ResourceUtils.getResourceTypesArray().length; int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) { for (int i = 0; i < maxLength; i++) {
ResourceInformation resourceInformation = ret.getResourceInformation(i); ResourceInformation resourceInformation = ret.getResourceInformation(i);
resourceInformation resourceInformation
@ -428,7 +425,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
public Resource normalize(Resource r, Resource minimumResource, public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) { Resource maximumResource, Resource stepFactor) {
Resource ret = Resource.newInstance(r); Resource ret = Resource.newInstance(r);
int maxLength = ResourceUtils.getResourceTypesArray().length; int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) { for (int i = 0; i < maxLength; i++) {
ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation rResourceInformation = r.getResourceInformation(i);
ResourceInformation minimumResourceInformation = minimumResource ResourceInformation minimumResourceInformation = minimumResource
@ -474,7 +471,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) { private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) {
Resource ret = Resource.newInstance(r); Resource ret = Resource.newInstance(r);
int maxLength = ResourceUtils.getResourceTypesArray().length; int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) { for (int i = 0; i < maxLength; i++) {
ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation rResourceInformation = r.getResourceInformation(i);
ResourceInformation stepFactorResourceInformation = stepFactor ResourceInformation stepFactorResourceInformation = stepFactor
@ -513,7 +510,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
private Resource multiplyAndNormalize(Resource r, double by, private Resource multiplyAndNormalize(Resource r, double by,
Resource stepFactor, boolean roundUp) { Resource stepFactor, boolean roundUp) {
Resource ret = Resource.newInstance(r); Resource ret = Resource.newInstance(r);
int maxLength = ResourceUtils.getResourceTypesArray().length; int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) { for (int i = 0; i < maxLength; i++) {
ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation rResourceInformation = r.getResourceInformation(i);
ResourceInformation stepFactorResourceInformation = stepFactor ResourceInformation stepFactorResourceInformation = stepFactor
@ -542,7 +539,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override @Override
public boolean fitsIn(Resource cluster, Resource smaller, Resource bigger) { public boolean fitsIn(Resource cluster, Resource smaller, Resource bigger) {
int maxLength = ResourceUtils.getResourceTypesArray().length; int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) { for (int i = 0; i < maxLength; i++) {
ResourceInformation sResourceInformation = smaller ResourceInformation sResourceInformation = smaller
.getResourceInformation(i); .getResourceInformation(i);

View File

@ -24,12 +24,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.impl.BaseResource;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import java.util.Arrays;
/** /**
* Resources is a computation class which provides a set of apis to do * Resources is a computation class which provides a set of apis to do
* mathematical operations on Resource object. * mathematical operations on Resource object.
@ -45,9 +42,11 @@ public class Resources {
* Helper class to create a resource with a fixed value for all resource * Helper class to create a resource with a fixed value for all resource
* types. For example, a NONE resource which returns 0 for any resource type. * types. For example, a NONE resource which returns 0 for any resource type.
*/ */
static class FixedValueResource extends BaseResource { @InterfaceAudience.Private
@Unstable
static class FixedValueResource extends Resource {
private long resourceValue; private final long resourceValue;
private String name; private String name;
/** /**
@ -100,6 +99,19 @@ public class Resources {
throw new RuntimeException(name + " cannot be modified!"); throw new RuntimeException(name + " cannot be modified!");
} }
@Override
public void setResourceInformation(int index,
ResourceInformation resourceInformation)
throws ResourceNotFoundException {
throw new RuntimeException(name + " cannot be modified!");
}
@Override
public void setResourceValue(int index, long value)
throws ResourceNotFoundException {
throw new RuntimeException(name + " cannot be modified!");
}
@Override @Override
public void setResourceInformation(String resource, public void setResourceInformation(String resource,
ResourceInformation resourceInformation) ResourceInformation resourceInformation)
@ -117,19 +129,11 @@ public class Resources {
ResourceInformation[] types = ResourceUtils.getResourceTypesArray(); ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
if (types != null) { if (types != null) {
resources = new ResourceInformation[types.length]; resources = new ResourceInformation[types.length];
readOnlyResources = new ResourceInformation[types.length];
for (int index = 0; index < types.length; index++) { for (int index = 0; index < types.length; index++) {
resources[index] = ResourceInformation.newInstance(types[index]); resources[index] = ResourceInformation.newInstance(types[index]);
resources[index].setValue(resourceValue); resources[index].setValue(resourceValue);
// this is a fix for getVirtualCores returning an int
if (resourceValue > Integer.MAX_VALUE && ResourceInformation.VCORES
.getName().equals(resources[index].getName())) {
resources[index].setValue((long) Integer.MAX_VALUE);
}
} }
} }
readOnlyResources = Arrays.copyOf(resources, resources.length);
} }
} }

View File

@ -37,6 +37,8 @@ import java.util.Map;
* Test class to verify all resource utility methods. * Test class to verify all resource utility methods.
*/ */
public class TestResourceUtils { public class TestResourceUtils {
public static final String TEST_CONF_RESET_RESOURCE_TYPES =
"yarn.test.reset-resource-types";
static class ResourceFileInformation { static class ResourceFileInformation {
String filename; String filename;

View File

@ -1814,8 +1814,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
if (newTrackingUrl != null && if (newTrackingUrl != null &&
!newTrackingUrl.equals(appAttempt.originalTrackingUrl)) { !newTrackingUrl.equals(appAttempt.originalTrackingUrl)) {
appAttempt.originalTrackingUrl = newTrackingUrl; appAttempt.originalTrackingUrl = newTrackingUrl;
AggregateAppResourceUsage resUsage =
appAttempt.attemptMetrics.getAggregateAppResourceUsage();
ApplicationAttemptStateData attemptState = ApplicationAttemptStateData ApplicationAttemptStateData attemptState = ApplicationAttemptStateData
.newInstance(appAttempt.applicationAttemptId, .newInstance(appAttempt.applicationAttemptId,
appAttempt.getMasterContainer(), appAttempt.getMasterContainer(),

View File

@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@ -151,7 +152,10 @@ public class MockRM extends ResourceManager {
public MockRM(Configuration conf, RMStateStore store, public MockRM(Configuration conf, RMStateStore store,
boolean useNullRMNodeLabelsManager, boolean useRealElector) { boolean useNullRMNodeLabelsManager, boolean useRealElector) {
super(); super();
ResourceUtils.resetResourceTypes(conf); if (conf.getBoolean(TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES,
true)) {
ResourceUtils.resetResourceTypes(conf);
}
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager; this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
this.useRealElector = useRealElector; this.useRealElector = useRealElector;
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));

View File

@ -4341,143 +4341,6 @@ public class TestCapacityScheduler {
rm.stop(); rm.stop();
} }
@Test (timeout = 300000)
public void testUserLimitThroughput() throws Exception {
// Since this is more of a performance unit test, only run if
// RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
Assume.assumeTrue(Boolean.valueOf(
System.getProperty("RunUserLimitThroughput")));
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
csconf.setResourceComparator(DominantResourceCalculator.class);
YarnConfiguration conf = new YarnConfiguration(csconf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue qb = (LeafQueue)cs.getQueue("default");
// For now make user limit large so we can activate all applications
qb.setUserLimitFactor((float)100.0);
qb.setupConfigurableCapacities();
SchedulerEvent addAppEvent;
SchedulerEvent addAttemptEvent;
Container container = mock(Container.class);
ApplicationSubmissionContext submissionContext =
mock(ApplicationSubmissionContext.class);
final int appCount = 100;
ApplicationId[] appids = new ApplicationId[appCount];
RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
RMAppImpl[] apps = new RMAppImpl[appCount];
RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
for (int i=0; i<appCount; i++) {
appids[i] = BuilderUtils.newApplicationId(100, i);
appAttemptIds[i] =
BuilderUtils.newApplicationAttemptId(appids[i], 1);
attemptMetrics[i] =
new RMAppAttemptMetrics(appAttemptIds[i], rm.getRMContext());
apps[i] = mock(RMAppImpl.class);
when(apps[i].getApplicationId()).thenReturn(appids[i]);
attempts[i] = mock(RMAppAttemptImpl.class);
when(attempts[i].getMasterContainer()).thenReturn(container);
when(attempts[i].getSubmissionContext()).thenReturn(submissionContext);
when(attempts[i].getAppAttemptId()).thenReturn(appAttemptIds[i]);
when(attempts[i].getRMAppAttemptMetrics()).thenReturn(attemptMetrics[i]);
when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
rm.getRMContext().getRMApps().put(appids[i], apps[i]);
addAppEvent =
new AppAddedSchedulerEvent(appids[i], "default", "user1");
cs.handle(addAppEvent);
addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
cs.handle(addAttemptEvent);
}
// add nodes to cluster, so cluster has 20GB and 20 vcores
Resource newResource = Resource.newInstance(10 * GB, 10);
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
cs.handle(new NodeAddedSchedulerEvent(node));
Resource newResource2 = Resource.newInstance(10 * GB, 10);
RMNode node2 = MockNodes.newNodeInfo(0, newResource2, 1, "127.0.0.2");
cs.handle(new NodeAddedSchedulerEvent(node2));
Priority u0Priority = TestUtils.createMockPriority(1);
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
for (int i=0;i<appCount;i++) {
fiCaApps[i] =
cs.getSchedulerApplications().get(apps[i].getApplicationId())
.getCurrentAppAttempt();
// allocate container for app2 with 1GB memory and 1 vcore
fiCaApps[i].updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
u0Priority, recordFactory)));
}
// Now force everything to be over user limit
qb.setUserLimitFactor((float)0.0);
// Quiet the loggers while measuring throughput
for (Enumeration<?> loggers=LogManager.getCurrentLoggers();
loggers.hasMoreElements(); ) {
Logger logger = (Logger) loggers.nextElement();
logger.setLevel(Level.WARN);
}
final int topn = 20;
final int iterations = 2000000;
final int printInterval = 20000;
final float numerator = 1000.0f * printInterval;
PriorityQueue<Long> queue = new PriorityQueue<>(topn,
Collections.reverseOrder());
long n = Time.monotonicNow();
long timespent = 0;
for (int i = 0; i < iterations; i+=2) {
if (i > 0 && i % printInterval == 0){
long ts = (Time.monotonicNow() - n);
if (queue.size() < topn) {
queue.offer(ts);
} else {
Long last = queue.peek();
if (last > ts) {
queue.poll();
queue.offer(ts);
}
}
System.out.println(i + " " + (numerator / ts));
n= Time.monotonicNow();
}
cs.handle(new NodeUpdateSchedulerEvent(node));
cs.handle(new NodeUpdateSchedulerEvent(node2));
}
timespent=0;
int entries = queue.size();
while(queue.size() > 0){
long l = queue.poll();
timespent += l;
}
System.out.println("Avg of fastest " + entries + ": "
+ numerator / (timespent / entries));
rm.stop();
}
@Test @Test
public void testCSQueueBlocked() throws Exception { public void testCSQueueBlocked() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();

View File

@ -0,0 +1,265 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assume;
import org.junit.Test;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import static org.apache.hadoop.yarn.util.resource.TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestCapacitySchedulerPerf {
private final int GB = 1024;
private String getResourceName(int idx) {
return "resource-" + idx;
}
private void testUserLimitThroughputWithNumberOfResourceTypes(
int numOfResourceTypes)
throws Exception {
if (numOfResourceTypes > 2) {
// Initialize resource map
Map<String, ResourceInformation> riMap = new HashMap<>();
// Initialize mandatory resources
riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
for (int i = 2; i < numOfResourceTypes; i++) {
String resourceName = getResourceName(i);
riMap.put(resourceName, ResourceInformation
.newInstance(resourceName, "", 0, ResourceTypes.COUNTABLE, 0,
Integer.MAX_VALUE));
}
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}
// Since this is more of a performance unit test, only run if
// RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
Assume.assumeTrue(Boolean.valueOf(
System.getProperty("RunCapacitySchedulerPerfTests")));
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
csconf.setResourceComparator(DominantResourceCalculator.class);
YarnConfiguration conf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource types
conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue qb = (LeafQueue)cs.getQueue("default");
// For now make user limit large so we can activate all applications
qb.setUserLimitFactor((float)100.0);
qb.setupConfigurableCapacities();
SchedulerEvent addAppEvent;
SchedulerEvent addAttemptEvent;
Container container = mock(Container.class);
ApplicationSubmissionContext submissionContext =
mock(ApplicationSubmissionContext.class);
final int appCount = 100;
ApplicationId[] appids = new ApplicationId[appCount];
RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
RMAppImpl[] apps = new RMAppImpl[appCount];
RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
for (int i=0; i<appCount; i++) {
appids[i] = BuilderUtils.newApplicationId(100, i);
appAttemptIds[i] =
BuilderUtils.newApplicationAttemptId(appids[i], 1);
attemptMetrics[i] =
new RMAppAttemptMetrics(appAttemptIds[i], rm.getRMContext());
apps[i] = mock(RMAppImpl.class);
when(apps[i].getApplicationId()).thenReturn(appids[i]);
attempts[i] = mock(RMAppAttemptImpl.class);
when(attempts[i].getMasterContainer()).thenReturn(container);
when(attempts[i].getSubmissionContext()).thenReturn(submissionContext);
when(attempts[i].getAppAttemptId()).thenReturn(appAttemptIds[i]);
when(attempts[i].getRMAppAttemptMetrics()).thenReturn(attemptMetrics[i]);
when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
rm.getRMContext().getRMApps().put(appids[i], apps[i]);
addAppEvent =
new AppAddedSchedulerEvent(appids[i], "default", "user1");
cs.handle(addAppEvent);
addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
cs.handle(addAttemptEvent);
}
// add nodes to cluster, so cluster has 20GB and 20 vcores
Resource nodeResource = Resource.newInstance(10 * GB, 10);
if (numOfResourceTypes > 2) {
for (int i = 2; i < numOfResourceTypes; i++) {
nodeResource.setResourceValue(getResourceName(i), 10);
}
}
RMNode node = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.1");
cs.handle(new NodeAddedSchedulerEvent(node));
RMNode node2 = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.2");
cs.handle(new NodeAddedSchedulerEvent(node2));
Priority u0Priority = TestUtils.createMockPriority(1);
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
for (int i=0;i<appCount;i++) {
fiCaApps[i] =
cs.getSchedulerApplications().get(apps[i].getApplicationId())
.getCurrentAppAttempt();
ResourceRequest resourceRequest = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, u0Priority, recordFactory);
if (numOfResourceTypes > 2) {
for (int j = 2; j < numOfResourceTypes; j++) {
resourceRequest.getCapability().setResourceValue(getResourceName(j),
10);
}
}
// allocate container for app2 with 1GB memory and 1 vcore
fiCaApps[i].updateResourceRequests(
Collections.singletonList(resourceRequest));
}
// Now force everything to be over user limit
qb.setUserLimitFactor((float)0.0);
// Quiet the loggers while measuring throughput
for (Enumeration<?> loggers = LogManager.getCurrentLoggers();
loggers.hasMoreElements(); ) {
Logger logger = (Logger) loggers.nextElement();
logger.setLevel(Level.WARN);
}
final int topn = 20;
final int iterations = 2000000;
final int printInterval = 20000;
final float numerator = 1000.0f * printInterval;
PriorityQueue<Long> queue = new PriorityQueue<>(topn,
Collections.reverseOrder());
long n = Time.monotonicNow();
long timespent = 0;
for (int i = 0; i < iterations; i+=2) {
if (i > 0 && i % printInterval == 0){
long ts = (Time.monotonicNow() - n);
if (queue.size() < topn) {
queue.offer(ts);
} else {
Long last = queue.peek();
if (last > ts) {
queue.poll();
queue.offer(ts);
}
}
System.out.println(i + " " + (numerator / ts));
n= Time.monotonicNow();
}
cs.handle(new NodeUpdateSchedulerEvent(node));
cs.handle(new NodeUpdateSchedulerEvent(node2));
}
timespent=0;
int entries = queue.size();
while(queue.size() > 0){
long l = queue.poll();
timespent += l;
}
System.out.println(
"#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
+ ": " + numerator / (timespent / entries));
rm.stop();
}
@Test(timeout = 300000)
public void testUserLimitThroughputForTwoResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(2);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForThreeResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(3);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForFourResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(4);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForFiveResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(5);
}
}

View File

@ -102,6 +102,8 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.yarn.util.resource.TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES;
/** /**
* <p> * <p>
* Embedded Yarn minicluster for testcases that need to interact with a cluster. * Embedded Yarn minicluster for testcases that need to interact with a cluster.
@ -261,7 +263,10 @@ public class MiniYARNCluster extends CompositeService {
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC); YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
ResourceUtils.resetResourceTypes(conf);
if (conf.getBoolean(TEST_CONF_RESET_RESOURCE_TYPES, true)) {
ResourceUtils.resetResourceTypes(conf);
}
if (useRpc && !useFixedPorts) { if (useRpc && !useFixedPorts) {
throw new YarnRuntimeException("Invalid configuration!" + throw new YarnRuntimeException("Invalid configuration!" +