YARN-7136. Additional Performance Improvement for Resource Profile Feature
(Contributed by Wangda Tan via Daniel Templeton)
(cherry picked from commit bf2b687412
)
This commit is contained in:
parent
335e3a6334
commit
0db9ddb728
|
@ -617,7 +617,7 @@
|
|||
</Match>
|
||||
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.api.records.impl.BaseResource" />
|
||||
<Class name="org.apache.hadoop.yarn.api.records.Resource" />
|
||||
<Method name="getResources" />
|
||||
<Bug pattern="EI_EXPOSE_REP" />
|
||||
</Match>
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.impl.BaseResource;
|
||||
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
|
||||
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
@ -59,8 +59,15 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|||
@Stable
|
||||
public abstract class Resource implements Comparable<Resource> {
|
||||
|
||||
protected static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
|
||||
protected static final String VCORES = ResourceInformation.VCORES.getName();
|
||||
protected ResourceInformation[] resources = null;
|
||||
|
||||
// Number of mandatory resources, this is added to avoid invoke
|
||||
// MandatoryResources.values().length, since values() internally will
|
||||
// copy array, etc.
|
||||
protected static final int NUM_MANDATORY_RESOURCES = 2;
|
||||
|
||||
protected static final int MEMORY_INDEX = 0;
|
||||
protected static final int VCORES_INDEX = 1;
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
|
@ -71,7 +78,7 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
ret.setVirtualCores(vCores);
|
||||
return ret;
|
||||
}
|
||||
return new BaseResource(memory, vCores);
|
||||
return new LightWeightResource(memory, vCores);
|
||||
}
|
||||
|
||||
@Public
|
||||
|
@ -83,7 +90,7 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
ret.setVirtualCores(vCores);
|
||||
return ret;
|
||||
}
|
||||
return new BaseResource(memory, vCores);
|
||||
return new LightWeightResource(memory, vCores);
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -201,7 +208,9 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract ResourceInformation[] getResources();
|
||||
public ResourceInformation[] getResources() {
|
||||
return resources;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get ResourceInformation for a specified resource.
|
||||
|
@ -215,7 +224,6 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
public ResourceInformation getResourceInformation(String resource)
|
||||
throws ResourceNotFoundException {
|
||||
Integer index = ResourceUtils.getResourceTypeIndex().get(resource);
|
||||
ResourceInformation[] resources = getResources();
|
||||
if (index != null) {
|
||||
return resources[index];
|
||||
}
|
||||
|
@ -236,12 +244,13 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
@Evolving
|
||||
public ResourceInformation getResourceInformation(int index)
|
||||
throws ResourceNotFoundException {
|
||||
ResourceInformation[] resources = getResources();
|
||||
if (index < 0 || index >= resources.length) {
|
||||
throw new ResourceNotFoundException("Unknown resource at index '" + index
|
||||
+ "'. Vaid resources are: " + Arrays.toString(resources));
|
||||
ResourceInformation ri = null;
|
||||
try {
|
||||
ri = resources[index];
|
||||
} catch (ArrayIndexOutOfBoundsException e) {
|
||||
throwExceptionWhenArrayOutOfBound(index);
|
||||
}
|
||||
return resources[index];
|
||||
return ri;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -271,11 +280,11 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
public void setResourceInformation(String resource,
|
||||
ResourceInformation resourceInformation)
|
||||
throws ResourceNotFoundException {
|
||||
if (resource.equals(MEMORY)) {
|
||||
if (resource.equals(ResourceInformation.MEMORY_URI)) {
|
||||
this.setMemorySize(resourceInformation.getValue());
|
||||
return;
|
||||
}
|
||||
if (resource.equals(VCORES)) {
|
||||
if (resource.equals(ResourceInformation.VCORES_URI)) {
|
||||
this.setVirtualCores((int) resourceInformation.getValue());
|
||||
return;
|
||||
}
|
||||
|
@ -298,7 +307,6 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
public void setResourceInformation(int index,
|
||||
ResourceInformation resourceInformation)
|
||||
throws ResourceNotFoundException {
|
||||
ResourceInformation[] resources = getResources();
|
||||
if (index < 0 || index >= resources.length) {
|
||||
throw new ResourceNotFoundException("Unknown resource at index '" + index
|
||||
+ "'. Valid resources are " + Arrays.toString(resources));
|
||||
|
@ -318,11 +326,11 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
@Evolving
|
||||
public void setResourceValue(String resource, long value)
|
||||
throws ResourceNotFoundException {
|
||||
if (resource.equals(MEMORY)) {
|
||||
if (resource.equals(ResourceInformation.MEMORY_URI)) {
|
||||
this.setMemorySize(value);
|
||||
return;
|
||||
}
|
||||
if (resource.equals(VCORES)) {
|
||||
if (resource.equals(ResourceInformation.VCORES_URI)) {
|
||||
this.setVirtualCores((int)value);
|
||||
return;
|
||||
}
|
||||
|
@ -346,27 +354,21 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
@Evolving
|
||||
public void setResourceValue(int index, long value)
|
||||
throws ResourceNotFoundException {
|
||||
ResourceInformation[] resources = getResources();
|
||||
if (index < 0 || index >= resources.length) {
|
||||
throw new ResourceNotFoundException("Unknown resource at index '" + index
|
||||
+ "'. Valid resources are " + Arrays.toString(resources));
|
||||
}
|
||||
try {
|
||||
resources[index].setValue(value);
|
||||
} catch (ArrayIndexOutOfBoundsException e) {
|
||||
throwExceptionWhenArrayOutOfBound(index);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 263167;
|
||||
private void throwExceptionWhenArrayOutOfBound(int index) {
|
||||
String exceptionMsg = String.format(
|
||||
"Trying to access ResourceInformation for given index=%d. "
|
||||
+ "Acceptable index range is [0,%d), please check double check "
|
||||
+ "configured resources in resource-types.xml",
|
||||
index, ResourceUtils.getNumberOfKnownResourceTypes());
|
||||
|
||||
int result = (int) (939769357
|
||||
+ getMemorySize()); // prime * result = 939769357 initially
|
||||
result = prime * result + getVirtualCores();
|
||||
for (ResourceInformation entry : getResources()) {
|
||||
if (!entry.getName().equals(MEMORY) && !entry.getName().equals(VCORES)) {
|
||||
result = prime * result + entry.hashCode();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
throw new ResourceNotFoundException(exceptionMsg);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -381,20 +383,15 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
return false;
|
||||
}
|
||||
Resource other = (Resource) obj;
|
||||
if (getMemorySize() != other.getMemorySize()
|
||||
|| getVirtualCores() != other.getVirtualCores()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ResourceInformation[] myVectors = getResources();
|
||||
ResourceInformation[] otherVectors = other.getResources();
|
||||
|
||||
if (myVectors.length != otherVectors.length) {
|
||||
if (resources.length != otherVectors.length) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int i = 0; i < myVectors.length; i++) {
|
||||
ResourceInformation a = myVectors[i];
|
||||
for (int i = 0; i < resources.length; i++) {
|
||||
ResourceInformation a = resources[i];
|
||||
ResourceInformation b = otherVectors[i];
|
||||
if ((a != b) && ((a == null) || !a.equals(b))) {
|
||||
return false;
|
||||
|
@ -403,64 +400,71 @@ public abstract class Resource implements Comparable<Resource> {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Resource other) {
|
||||
ResourceInformation[] otherResources = other.getResources();
|
||||
|
||||
int arrLenThis = this.resources.length;
|
||||
int arrLenOther = otherResources.length;
|
||||
|
||||
// compare memory and vcores first(in that order) to preserve
|
||||
// existing behaviour
|
||||
for (int i = 0; i < arrLenThis; i++) {
|
||||
ResourceInformation otherEntry;
|
||||
try {
|
||||
otherEntry = otherResources[i];
|
||||
} catch (ArrayIndexOutOfBoundsException e) {
|
||||
// For two vectors with different size and same prefix. Shorter vector
|
||||
// goes first.
|
||||
return 1;
|
||||
}
|
||||
ResourceInformation entry = resources[i];
|
||||
|
||||
long diff = entry.compareTo(otherEntry);
|
||||
if (diff > 0) {
|
||||
return 1;
|
||||
} else if (diff < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (arrLenThis < arrLenOther) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append("<memory:").append(getMemorySize()).append(", vCores:")
|
||||
.append(getVirtualCores());
|
||||
for (ResourceInformation entry : getResources()) {
|
||||
if (entry.getName().equals(MEMORY)
|
||||
&& entry.getUnits()
|
||||
.equals(ResourceInformation.MEMORY_MB.getUnits())) {
|
||||
|
||||
for (int i = 2; i < resources.length; i++) {
|
||||
ResourceInformation ri = resources[i];
|
||||
if (ri.getValue() == 0) {
|
||||
continue;
|
||||
}
|
||||
if (entry.getName().equals(VCORES)
|
||||
&& entry.getUnits()
|
||||
.equals(ResourceInformation.VCORES.getUnits())) {
|
||||
continue;
|
||||
}
|
||||
sb.append(", ").append(entry.getName()).append(": ")
|
||||
.append(entry.getValue())
|
||||
.append(entry.getUnits());
|
||||
sb.append(", ");
|
||||
sb.append(ri.getName()).append(": ")
|
||||
.append(ri.getValue());
|
||||
sb.append(ri.getUnits());
|
||||
}
|
||||
|
||||
sb.append(">");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Resource other) {
|
||||
ResourceInformation[] thisResources = this.getResources();
|
||||
ResourceInformation[] otherResources = other.getResources();
|
||||
|
||||
// compare memory and vcores first(in that order) to preserve
|
||||
// existing behaviour
|
||||
long diff = this.getMemorySize() - other.getMemorySize();
|
||||
if (diff == 0) {
|
||||
diff = this.getVirtualCores() - other.getVirtualCores();
|
||||
public int hashCode() {
|
||||
final int prime = 47;
|
||||
long result = 0;
|
||||
for (ResourceInformation entry : resources) {
|
||||
result = prime * result + entry.hashCode();
|
||||
}
|
||||
if (diff == 0) {
|
||||
diff = thisResources.length - otherResources.length;
|
||||
if (diff == 0) {
|
||||
int maxLength = ResourceUtils.getResourceTypesArray().length;
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
// For memory and vcores, we can skip the loop as it's already
|
||||
// compared.
|
||||
if (i < 2) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ResourceInformation entry = thisResources[i];
|
||||
ResourceInformation otherEntry = otherResources[i];
|
||||
if (entry.getName().equals(otherEntry.getName())) {
|
||||
diff = entry.compareTo(otherEntry);
|
||||
if (diff != 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return Long.compare(diff, 0);
|
||||
return (int) result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.curator.shaded.com.google.common.reflect.ClassPath;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||
|
||||
|
@ -34,8 +36,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
|
|||
private long minimumAllocation;
|
||||
private long maximumAllocation;
|
||||
|
||||
private static final String MEMORY_URI = "memory-mb";
|
||||
private static final String VCORES_URI = "vcores";
|
||||
public static final String MEMORY_URI = "memory-mb";
|
||||
public static final String VCORES_URI = "vcores";
|
||||
|
||||
public static final ResourceInformation MEMORY_MB =
|
||||
ResourceInformation.newInstance(MEMORY_URI, "Mi");
|
||||
|
@ -83,6 +85,16 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
|
|||
this.units = rUnits;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checking if a unit included by KNOWN_UNITS is an expensive operation. This
|
||||
* can be avoided in critical path in RM.
|
||||
* @param rUnits units for the resource
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public void setUnitsWithoutValidation(String rUnits) {
|
||||
this.units = rUnits;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the resource type.
|
||||
*
|
||||
|
|
|
@ -18,19 +18,24 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records.impl;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
|
||||
import java.util.Arrays;
|
||||
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_MB;
|
||||
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
|
||||
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* <code>BaseResource</code> extends Resource to handle base resources such
|
||||
* <code>LightResource</code> extends Resource to handle base resources such
|
||||
* as memory and CPU.
|
||||
* TODO: We have a long term plan to use AbstractResource when additional
|
||||
* resource types are to be handled as well.
|
||||
* This will be used to speed up internal calculation to avoid creating
|
||||
* costly PB-backed Resource object: <code>ResourcePBImpl</code>
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
|
@ -54,48 +59,34 @@ import java.util.Arrays;
|
|||
*
|
||||
* @see Resource
|
||||
*/
|
||||
@Public
|
||||
@InterfaceAudience.Private
|
||||
@Unstable
|
||||
public class BaseResource extends Resource {
|
||||
public class LightWeightResource extends Resource {
|
||||
|
||||
private ResourceInformation memoryResInfo;
|
||||
private ResourceInformation vcoresResInfo;
|
||||
protected ResourceInformation[] resources = null;
|
||||
protected ResourceInformation[] readOnlyResources = null;
|
||||
|
||||
// Number of mandatory resources, this is added to avoid invoke
|
||||
// MandatoryResources.values().length, since values() internally will
|
||||
// copy array, etc.
|
||||
private static final int NUM_MANDATORY_RESOURCES = 2;
|
||||
|
||||
protected enum MandatoryResources {
|
||||
MEMORY(0), VCORES(1);
|
||||
|
||||
private final int id;
|
||||
|
||||
MandatoryResources(int id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public int getId() {
|
||||
return this.id;
|
||||
}
|
||||
}
|
||||
|
||||
public BaseResource() {
|
||||
// Base constructor.
|
||||
}
|
||||
|
||||
public BaseResource(long memory, long vcores) {
|
||||
this.memoryResInfo = ResourceInformation.newInstance(MEMORY,
|
||||
ResourceInformation.MEMORY_MB.getUnits(), memory);
|
||||
this.vcoresResInfo = ResourceInformation.newInstance(VCORES, "", vcores);
|
||||
public LightWeightResource(long memory, long vcores) {
|
||||
this.memoryResInfo = LightWeightResource.newDefaultInformation(MEMORY_URI,
|
||||
MEMORY_MB.getUnits(), memory);
|
||||
this.vcoresResInfo = LightWeightResource.newDefaultInformation(VCORES_URI,
|
||||
"", vcores);
|
||||
|
||||
resources = new ResourceInformation[NUM_MANDATORY_RESOURCES];
|
||||
readOnlyResources = new ResourceInformation[NUM_MANDATORY_RESOURCES];
|
||||
resources[MandatoryResources.MEMORY.id] = memoryResInfo;
|
||||
resources[MandatoryResources.VCORES.id] = vcoresResInfo;
|
||||
readOnlyResources = Arrays.copyOf(resources, resources.length);
|
||||
resources[MEMORY_INDEX] = memoryResInfo;
|
||||
resources[VCORES_INDEX] = vcoresResInfo;
|
||||
}
|
||||
|
||||
private static ResourceInformation newDefaultInformation(String name,
|
||||
String unit, long value) {
|
||||
ResourceInformation ri = new ResourceInformation();
|
||||
ri.setName(name);
|
||||
ri.setValue(value);
|
||||
ri.setResourceType(ResourceTypes.COUNTABLE);
|
||||
ri.setUnitsWithoutValidation(unit);
|
||||
ri.setMinimumAllocation(0);
|
||||
ri.setMaximumAllocation(Long.MAX_VALUE);
|
||||
return ri;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -131,7 +122,42 @@ public class BaseResource extends Resource {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ResourceInformation[] getResources() {
|
||||
return readOnlyResources;
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null || !(obj instanceof Resource)) {
|
||||
return false;
|
||||
}
|
||||
Resource other = (Resource) obj;
|
||||
if (getMemorySize() != other.getMemorySize()
|
||||
|| getVirtualCores() != other.getVirtualCores()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Resource other) {
|
||||
// compare memory and vcores first(in that order) to preserve
|
||||
// existing behaviour
|
||||
long diff = this.getMemorySize() - other.getMemorySize();
|
||||
if (diff == 0) {
|
||||
return this.getVirtualCores() - other.getVirtualCores();
|
||||
} else if (diff > 0){
|
||||
return 1;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 47;
|
||||
long result = prime + getMemorySize();
|
||||
result = prime * result + getVirtualCores();
|
||||
|
||||
return (int) result;
|
||||
}
|
||||
}
|
|
@ -244,13 +244,28 @@ public class ResourceUtils {
|
|||
minimumAllocation, maximumAllocation));
|
||||
}
|
||||
}
|
||||
|
||||
checkMandatoryResources(resourceInformationMap);
|
||||
addMandatoryResources(resourceInformationMap);
|
||||
|
||||
setMinimumAllocationForMandatoryResources(resourceInformationMap, conf);
|
||||
setMaximumAllocationForMandatoryResources(resourceInformationMap, conf);
|
||||
|
||||
initializeResourcesFromResourceInformationMap(resourceInformationMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is visible for testing, unit test can construct a
|
||||
* resourceInformationMap and pass it to this method to initialize multiple resources.
|
||||
* @param resourceInformationMap constructed resource information map.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static void initializeResourcesFromResourceInformationMap(
|
||||
Map<String, ResourceInformation> resourceInformationMap) {
|
||||
resourceTypes = Collections.unmodifiableMap(resourceInformationMap);
|
||||
updateKnownResources();
|
||||
updateResourceTypeIndex();
|
||||
initializedResources = true;
|
||||
}
|
||||
|
||||
private static void updateKnownResources() {
|
||||
|
@ -347,14 +362,12 @@ public class ResourceUtils {
|
|||
try {
|
||||
addResourcesFileToConf(resourceFile, conf);
|
||||
LOG.debug("Found " + resourceFile + ", adding to configuration");
|
||||
initializeResourcesMap(conf);
|
||||
initializedResources = true;
|
||||
} catch (FileNotFoundException fe) {
|
||||
LOG.info("Unable to find '" + resourceFile
|
||||
+ "'. Falling back to memory and vcores as resources.");
|
||||
initializeResourcesMap(conf);
|
||||
initializedResources = true;
|
||||
}
|
||||
initializeResourcesMap(conf);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -558,7 +571,7 @@ public class ResourceUtils {
|
|||
*/
|
||||
public static String getDefaultUnit(String resourceType) {
|
||||
ResourceInformation ri = getResourceTypes().get(resourceType);
|
||||
if (null != ri) {
|
||||
if (ri != null) {
|
||||
return ri.getUnits();
|
||||
}
|
||||
return "";
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.impl.BaseResource;
|
||||
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
|
@ -34,13 +33,12 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceInformationProto;
|
|||
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class ResourcePBImpl extends BaseResource {
|
||||
public class ResourcePBImpl extends Resource {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ResourcePBImpl.class);
|
||||
|
||||
|
@ -95,7 +93,7 @@ public class ResourcePBImpl extends BaseResource {
|
|||
@Override
|
||||
public long getMemorySize() {
|
||||
// memory should always be present
|
||||
ResourceInformation ri = resources[MandatoryResources.MEMORY.getId()];
|
||||
ResourceInformation ri = resources[MEMORY_INDEX];
|
||||
|
||||
if (ri.getUnits().isEmpty()) {
|
||||
return ri.getValue();
|
||||
|
@ -113,19 +111,19 @@ public class ResourcePBImpl extends BaseResource {
|
|||
@Override
|
||||
public void setMemorySize(long memory) {
|
||||
maybeInitBuilder();
|
||||
getResourceInformation(MEMORY).setValue(memory);
|
||||
getResourceInformation(ResourceInformation.MEMORY_URI).setValue(memory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getVirtualCores() {
|
||||
// vcores should always be present
|
||||
return (int) resources[MandatoryResources.VCORES.getId()].getValue();
|
||||
return (int) resources[VCORES_INDEX].getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVirtualCores(int vCores) {
|
||||
maybeInitBuilder();
|
||||
getResourceInformation(VCORES).setValue(vCores);
|
||||
getResourceInformation(ResourceInformation.VCORES_URI).setValue(vCores);
|
||||
}
|
||||
|
||||
private void initResources() {
|
||||
|
@ -156,7 +154,6 @@ public class ResourcePBImpl extends BaseResource {
|
|||
resources[index].setValue(value);
|
||||
}
|
||||
}
|
||||
readOnlyResources = Arrays.copyOf(resources, resources.length);
|
||||
this.setMemorySize(p.getMemory());
|
||||
this.setVirtualCores(p.getVirtualCores());
|
||||
}
|
||||
|
@ -186,11 +183,6 @@ public class ResourcePBImpl extends BaseResource {
|
|||
getResourceInformation(resource).setValue(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceInformation[] getResources() {
|
||||
return super.getResources();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceInformation getResourceInformation(String resource)
|
||||
throws ResourceNotFoundException {
|
||||
|
@ -212,7 +204,6 @@ public class ResourcePBImpl extends BaseResource {
|
|||
}
|
||||
|
||||
resources = new ResourceInformation[types.length];
|
||||
readOnlyResources = new ResourceInformation[types.length];
|
||||
for (ResourceInformation entry : types) {
|
||||
int index = ResourceUtils.getResourceTypeIndex().get(entry.getName());
|
||||
resources[index] = ResourceInformation.newInstance(entry);
|
||||
|
|
|
@ -73,7 +73,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
boolean rhsGreater = false;
|
||||
int ret = 0;
|
||||
|
||||
int maxLength = ResourceUtils.getResourceTypesArray().length;
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
ResourceInformation lhsResourceInformation = lhs
|
||||
.getResourceInformation(i);
|
||||
|
@ -111,10 +111,12 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
// resources and then look for which resource has the biggest
|
||||
// share overall.
|
||||
ResourceInformation[] clusterRes = clusterResource.getResources();
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
|
||||
// If array creation shows up as a time sink, these arrays could be cached
|
||||
// because they're always the same length.
|
||||
double[] lhsShares = new double[clusterRes.length];
|
||||
double[] rhsShares = new double[clusterRes.length];
|
||||
double[] lhsShares = new double[maxLength];
|
||||
double[] rhsShares = new double[maxLength];
|
||||
double diff;
|
||||
|
||||
try {
|
||||
|
@ -124,10 +126,10 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
calculateShares(clusterRes, lhs, rhs, lhsShares, rhsShares, max);
|
||||
|
||||
diff = max[0] - max[1];
|
||||
} else if (clusterRes.length == 2) {
|
||||
} else if (maxLength == 2) {
|
||||
// Special case to handle the common scenario of only CPU and memory
|
||||
// so that we can optimize for performance
|
||||
diff = calculateSharesForMandatoryResources(clusterRes, lhs, rhs,
|
||||
diff = calculateSharesForTwoMandatoryResources(clusterRes, lhs, rhs,
|
||||
lhsShares, rhsShares);
|
||||
} else {
|
||||
calculateShares(clusterRes, lhs, rhs, lhsShares, rhsShares);
|
||||
|
@ -182,7 +184,8 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
ResourceInformation[] firstRes = first.getResources();
|
||||
ResourceInformation[] secondRes = second.getResources();
|
||||
|
||||
for (int i = 0; i < clusterRes.length; i++) {
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
firstShares[i] = calculateShare(clusterRes[i], firstRes[i]);
|
||||
secondShares[i] = calculateShare(clusterRes[i], secondRes[i]);
|
||||
}
|
||||
|
@ -205,35 +208,27 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
* second resource, respectively
|
||||
* @throws NullPointerException if any parameter is null
|
||||
*/
|
||||
private int calculateSharesForMandatoryResources(
|
||||
private int calculateSharesForTwoMandatoryResources(
|
||||
ResourceInformation[] clusterRes, Resource first, Resource second,
|
||||
double[] firstShares, double[] secondShares) {
|
||||
ResourceInformation[] firstRes = first.getResources();
|
||||
ResourceInformation[] secondRes = second.getResources();
|
||||
firstShares[0] = calculateShare(clusterRes[0], firstRes[0]);
|
||||
secondShares[0] = calculateShare(clusterRes[0], secondRes[0]);
|
||||
firstShares[1] = calculateShare(clusterRes[1], firstRes[1]);
|
||||
secondShares[1] = calculateShare(clusterRes[1], secondRes[1]);
|
||||
|
||||
int firstDom = 0;
|
||||
int firstSub = 1;
|
||||
if (firstShares[1] > firstShares[0]) {
|
||||
firstDom = 1;
|
||||
firstSub = 0;
|
||||
}
|
||||
int secondDom = 0;
|
||||
int firstSub = 0;
|
||||
int secondSub = 0;
|
||||
|
||||
for (int i = 0; i < clusterRes.length; i++) {
|
||||
firstShares[i] = calculateShare(clusterRes[i], firstRes[i]);
|
||||
secondShares[i] = calculateShare(clusterRes[i], secondRes[i]);
|
||||
|
||||
if (firstShares[i] > firstShares[firstDom]) {
|
||||
firstDom = i;
|
||||
}
|
||||
|
||||
if (firstShares[i] < firstShares[firstSub]) {
|
||||
firstSub = i;
|
||||
}
|
||||
|
||||
if (secondShares[i] > secondShares[secondDom]) {
|
||||
secondDom = i;
|
||||
}
|
||||
|
||||
if (secondShares[i] < secondShares[secondSub]) {
|
||||
secondSub = i;
|
||||
}
|
||||
int secondSub = 1;
|
||||
if (secondShares[1] > secondShares[0]) {
|
||||
secondDom = 1;
|
||||
secondSub = 0;
|
||||
}
|
||||
|
||||
if (firstShares[firstDom] > secondShares[secondDom]) {
|
||||
|
@ -280,7 +275,8 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
max[0] = 0.0;
|
||||
max[1] = 0.0;
|
||||
|
||||
for (int i = 0; i < clusterRes.length; i++) {
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
firstShares[i] = calculateShare(clusterRes[i], firstRes[i]);
|
||||
secondShares[i] = calculateShare(clusterRes[i], secondRes[i]);
|
||||
|
||||
|
@ -339,7 +335,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
public long computeAvailableContainers(Resource available,
|
||||
Resource required) {
|
||||
long min = Long.MAX_VALUE;
|
||||
int maxLength = ResourceUtils.getResourceTypesArray().length;
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
ResourceInformation availableResource = available
|
||||
.getResourceInformation(i);
|
||||
|
@ -358,11 +354,12 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
@Override
|
||||
public float divide(Resource clusterResource,
|
||||
Resource numerator, Resource denominator) {
|
||||
int nKnownResourceTypes = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
ResourceInformation[] clusterRes = clusterResource.getResources();
|
||||
// We have to provide the calculateShares() method with somewhere to store
|
||||
// the shares. We don't actually need these shares afterwards.
|
||||
double[] numeratorShares = new double[clusterRes.length];
|
||||
double[] denominatorShares = new double[clusterRes.length];
|
||||
double[] numeratorShares = new double[nKnownResourceTypes];
|
||||
double[] denominatorShares = new double[nKnownResourceTypes];
|
||||
// We also have to provide a place for calculateShares() to store the max
|
||||
// shares so that we can use them.
|
||||
double[] max = new double[2];
|
||||
|
@ -386,7 +383,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
@Override
|
||||
public float ratio(Resource a, Resource b) {
|
||||
float ratio = 0.0f;
|
||||
int maxLength = ResourceUtils.getResourceTypesArray().length;
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
ResourceInformation aResourceInformation = a.getResourceInformation(i);
|
||||
ResourceInformation bResourceInformation = b.getResourceInformation(i);
|
||||
|
@ -407,7 +404,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
|
||||
public Resource divideAndCeil(Resource numerator, long denominator) {
|
||||
Resource ret = Resource.newInstance(numerator);
|
||||
int maxLength = ResourceUtils.getResourceTypesArray().length;
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
ResourceInformation resourceInformation = ret.getResourceInformation(i);
|
||||
resourceInformation
|
||||
|
@ -428,7 +425,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
public Resource normalize(Resource r, Resource minimumResource,
|
||||
Resource maximumResource, Resource stepFactor) {
|
||||
Resource ret = Resource.newInstance(r);
|
||||
int maxLength = ResourceUtils.getResourceTypesArray().length;
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
ResourceInformation rResourceInformation = r.getResourceInformation(i);
|
||||
ResourceInformation minimumResourceInformation = minimumResource
|
||||
|
@ -474,7 +471,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
|
||||
private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) {
|
||||
Resource ret = Resource.newInstance(r);
|
||||
int maxLength = ResourceUtils.getResourceTypesArray().length;
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
ResourceInformation rResourceInformation = r.getResourceInformation(i);
|
||||
ResourceInformation stepFactorResourceInformation = stepFactor
|
||||
|
@ -513,7 +510,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
private Resource multiplyAndNormalize(Resource r, double by,
|
||||
Resource stepFactor, boolean roundUp) {
|
||||
Resource ret = Resource.newInstance(r);
|
||||
int maxLength = ResourceUtils.getResourceTypesArray().length;
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
ResourceInformation rResourceInformation = r.getResourceInformation(i);
|
||||
ResourceInformation stepFactorResourceInformation = stepFactor
|
||||
|
@ -542,7 +539,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
|
||||
@Override
|
||||
public boolean fitsIn(Resource cluster, Resource smaller, Resource bigger) {
|
||||
int maxLength = ResourceUtils.getResourceTypesArray().length;
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
ResourceInformation sResourceInformation = smaller
|
||||
.getResourceInformation(i);
|
||||
|
|
|
@ -24,12 +24,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.impl.BaseResource;
|
||||
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
|
||||
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* Resources is a computation class which provides a set of apis to do
|
||||
* mathematical operations on Resource object.
|
||||
|
@ -45,9 +42,11 @@ public class Resources {
|
|||
* Helper class to create a resource with a fixed value for all resource
|
||||
* types. For example, a NONE resource which returns 0 for any resource type.
|
||||
*/
|
||||
static class FixedValueResource extends BaseResource {
|
||||
@InterfaceAudience.Private
|
||||
@Unstable
|
||||
static class FixedValueResource extends Resource {
|
||||
|
||||
private long resourceValue;
|
||||
private final long resourceValue;
|
||||
private String name;
|
||||
|
||||
/**
|
||||
|
@ -100,6 +99,19 @@ public class Resources {
|
|||
throw new RuntimeException(name + " cannot be modified!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResourceInformation(int index,
|
||||
ResourceInformation resourceInformation)
|
||||
throws ResourceNotFoundException {
|
||||
throw new RuntimeException(name + " cannot be modified!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResourceValue(int index, long value)
|
||||
throws ResourceNotFoundException {
|
||||
throw new RuntimeException(name + " cannot be modified!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResourceInformation(String resource,
|
||||
ResourceInformation resourceInformation)
|
||||
|
@ -117,20 +129,12 @@ public class Resources {
|
|||
ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
|
||||
if (types != null) {
|
||||
resources = new ResourceInformation[types.length];
|
||||
readOnlyResources = new ResourceInformation[types.length];
|
||||
for (int index = 0; index < types.length; index++) {
|
||||
resources[index] = ResourceInformation.newInstance(types[index]);
|
||||
resources[index].setValue(resourceValue);
|
||||
|
||||
// this is a fix for getVirtualCores returning an int
|
||||
if (resourceValue > Integer.MAX_VALUE && ResourceInformation.VCORES
|
||||
.getName().equals(resources[index].getName())) {
|
||||
resources[index].setValue((long) Integer.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
||||
readOnlyResources = Arrays.copyOf(resources, resources.length);
|
||||
}
|
||||
}
|
||||
|
||||
public static Resource createResource(int memory) {
|
||||
|
|
|
@ -37,6 +37,8 @@ import java.util.Map;
|
|||
* Test class to verify all resource utility methods.
|
||||
*/
|
||||
public class TestResourceUtils {
|
||||
public static final String TEST_CONF_RESET_RESOURCE_TYPES =
|
||||
"yarn.test.reset-resource-types";
|
||||
|
||||
static class ResourceFileInformation {
|
||||
String filename;
|
||||
|
|
|
@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe
|
|||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
@ -151,7 +152,10 @@ public class MockRM extends ResourceManager {
|
|||
public MockRM(Configuration conf, RMStateStore store,
|
||||
boolean useNullRMNodeLabelsManager, boolean useRealElector) {
|
||||
super();
|
||||
if (conf.getBoolean(TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES,
|
||||
true)) {
|
||||
ResourceUtils.resetResourceTypes(conf);
|
||||
}
|
||||
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
|
||||
this.useRealElector = useRealElector;
|
||||
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
|
||||
|
|
|
@ -4319,143 +4319,6 @@ public class TestCapacityScheduler {
|
|||
rm.stop();
|
||||
}
|
||||
|
||||
@Test (timeout = 300000)
|
||||
public void testUserLimitThroughput() throws Exception {
|
||||
// Since this is more of a performance unit test, only run if
|
||||
// RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
|
||||
Assume.assumeTrue(Boolean.valueOf(
|
||||
System.getProperty("RunUserLimitThroughput")));
|
||||
|
||||
CapacitySchedulerConfiguration csconf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
|
||||
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
|
||||
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
|
||||
100.0f);
|
||||
csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
|
||||
csconf.setResourceComparator(DominantResourceCalculator.class);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration(csconf);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
LeafQueue qb = (LeafQueue)cs.getQueue("default");
|
||||
|
||||
// For now make user limit large so we can activate all applications
|
||||
qb.setUserLimitFactor((float)100.0);
|
||||
qb.setupConfigurableCapacities();
|
||||
|
||||
SchedulerEvent addAppEvent;
|
||||
SchedulerEvent addAttemptEvent;
|
||||
Container container = mock(Container.class);
|
||||
ApplicationSubmissionContext submissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
|
||||
final int appCount = 100;
|
||||
ApplicationId[] appids = new ApplicationId[appCount];
|
||||
RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
|
||||
ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
|
||||
RMAppImpl[] apps = new RMAppImpl[appCount];
|
||||
RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
|
||||
for (int i=0; i<appCount; i++) {
|
||||
appids[i] = BuilderUtils.newApplicationId(100, i);
|
||||
appAttemptIds[i] =
|
||||
BuilderUtils.newApplicationAttemptId(appids[i], 1);
|
||||
|
||||
attemptMetrics[i] =
|
||||
new RMAppAttemptMetrics(appAttemptIds[i], rm.getRMContext());
|
||||
apps[i] = mock(RMAppImpl.class);
|
||||
when(apps[i].getApplicationId()).thenReturn(appids[i]);
|
||||
attempts[i] = mock(RMAppAttemptImpl.class);
|
||||
when(attempts[i].getMasterContainer()).thenReturn(container);
|
||||
when(attempts[i].getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(attempts[i].getAppAttemptId()).thenReturn(appAttemptIds[i]);
|
||||
when(attempts[i].getRMAppAttemptMetrics()).thenReturn(attemptMetrics[i]);
|
||||
when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
|
||||
|
||||
rm.getRMContext().getRMApps().put(appids[i], apps[i]);
|
||||
addAppEvent =
|
||||
new AppAddedSchedulerEvent(appids[i], "default", "user1");
|
||||
cs.handle(addAppEvent);
|
||||
addAttemptEvent =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
|
||||
cs.handle(addAttemptEvent);
|
||||
}
|
||||
|
||||
// add nodes to cluster, so cluster has 20GB and 20 vcores
|
||||
Resource newResource = Resource.newInstance(10 * GB, 10);
|
||||
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
|
||||
cs.handle(new NodeAddedSchedulerEvent(node));
|
||||
|
||||
Resource newResource2 = Resource.newInstance(10 * GB, 10);
|
||||
RMNode node2 = MockNodes.newNodeInfo(0, newResource2, 1, "127.0.0.2");
|
||||
cs.handle(new NodeAddedSchedulerEvent(node2));
|
||||
|
||||
Priority u0Priority = TestUtils.createMockPriority(1);
|
||||
RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
|
||||
for (int i=0;i<appCount;i++) {
|
||||
fiCaApps[i] =
|
||||
cs.getSchedulerApplications().get(apps[i].getApplicationId())
|
||||
.getCurrentAppAttempt();
|
||||
// allocate container for app2 with 1GB memory and 1 vcore
|
||||
fiCaApps[i].updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
|
||||
u0Priority, recordFactory)));
|
||||
}
|
||||
// Now force everything to be over user limit
|
||||
qb.setUserLimitFactor((float)0.0);
|
||||
|
||||
// Quiet the loggers while measuring throughput
|
||||
for (Enumeration<?> loggers=LogManager.getCurrentLoggers();
|
||||
loggers.hasMoreElements(); ) {
|
||||
Logger logger = (Logger) loggers.nextElement();
|
||||
logger.setLevel(Level.WARN);
|
||||
}
|
||||
final int topn = 20;
|
||||
final int iterations = 2000000;
|
||||
final int printInterval = 20000;
|
||||
final float numerator = 1000.0f * printInterval;
|
||||
PriorityQueue<Long> queue = new PriorityQueue<>(topn,
|
||||
Collections.reverseOrder());
|
||||
|
||||
long n = Time.monotonicNow();
|
||||
long timespent = 0;
|
||||
for (int i = 0; i < iterations; i+=2) {
|
||||
if (i > 0 && i % printInterval == 0){
|
||||
long ts = (Time.monotonicNow() - n);
|
||||
if (queue.size() < topn) {
|
||||
queue.offer(ts);
|
||||
} else {
|
||||
Long last = queue.peek();
|
||||
if (last > ts) {
|
||||
queue.poll();
|
||||
queue.offer(ts);
|
||||
}
|
||||
}
|
||||
System.out.println(i + " " + (numerator / ts));
|
||||
n= Time.monotonicNow();
|
||||
}
|
||||
cs.handle(new NodeUpdateSchedulerEvent(node));
|
||||
cs.handle(new NodeUpdateSchedulerEvent(node2));
|
||||
}
|
||||
timespent=0;
|
||||
int entries = queue.size();
|
||||
while(queue.size() > 0){
|
||||
long l = queue.poll();
|
||||
timespent += l;
|
||||
}
|
||||
System.out.println("Avg of fastest " + entries + ": "
|
||||
+ numerator / (timespent / entries));
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCSQueueBlocked() throws Exception {
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
|
|
|
@ -0,0 +1,265 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.PriorityQueue;
|
||||
|
||||
import static org.apache.hadoop.yarn.util.resource.TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestCapacitySchedulerPerf {
|
||||
private final int GB = 1024;
|
||||
|
||||
private String getResourceName(int idx) {
|
||||
return "resource-" + idx;
|
||||
}
|
||||
|
||||
private void testUserLimitThroughputWithNumberOfResourceTypes(
|
||||
int numOfResourceTypes)
|
||||
throws Exception {
|
||||
if (numOfResourceTypes > 2) {
|
||||
// Initialize resource map
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
// Initialize mandatory resources
|
||||
riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
|
||||
riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
|
||||
|
||||
for (int i = 2; i < numOfResourceTypes; i++) {
|
||||
String resourceName = getResourceName(i);
|
||||
riMap.put(resourceName, ResourceInformation
|
||||
.newInstance(resourceName, "", 0, ResourceTypes.COUNTABLE, 0,
|
||||
Integer.MAX_VALUE));
|
||||
}
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
}
|
||||
|
||||
// Since this is more of a performance unit test, only run if
|
||||
// RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
|
||||
Assume.assumeTrue(Boolean.valueOf(
|
||||
System.getProperty("RunCapacitySchedulerPerfTests")));
|
||||
|
||||
CapacitySchedulerConfiguration csconf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
|
||||
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
|
||||
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
|
||||
100.0f);
|
||||
csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
|
||||
csconf.setResourceComparator(DominantResourceCalculator.class);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration(csconf);
|
||||
// Don't reset resource types since we have already configured resource types
|
||||
conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
LeafQueue qb = (LeafQueue)cs.getQueue("default");
|
||||
|
||||
// For now make user limit large so we can activate all applications
|
||||
qb.setUserLimitFactor((float)100.0);
|
||||
qb.setupConfigurableCapacities();
|
||||
|
||||
SchedulerEvent addAppEvent;
|
||||
SchedulerEvent addAttemptEvent;
|
||||
Container container = mock(Container.class);
|
||||
ApplicationSubmissionContext submissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
|
||||
final int appCount = 100;
|
||||
ApplicationId[] appids = new ApplicationId[appCount];
|
||||
RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
|
||||
ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
|
||||
RMAppImpl[] apps = new RMAppImpl[appCount];
|
||||
RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
|
||||
for (int i=0; i<appCount; i++) {
|
||||
appids[i] = BuilderUtils.newApplicationId(100, i);
|
||||
appAttemptIds[i] =
|
||||
BuilderUtils.newApplicationAttemptId(appids[i], 1);
|
||||
|
||||
attemptMetrics[i] =
|
||||
new RMAppAttemptMetrics(appAttemptIds[i], rm.getRMContext());
|
||||
apps[i] = mock(RMAppImpl.class);
|
||||
when(apps[i].getApplicationId()).thenReturn(appids[i]);
|
||||
attempts[i] = mock(RMAppAttemptImpl.class);
|
||||
when(attempts[i].getMasterContainer()).thenReturn(container);
|
||||
when(attempts[i].getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(attempts[i].getAppAttemptId()).thenReturn(appAttemptIds[i]);
|
||||
when(attempts[i].getRMAppAttemptMetrics()).thenReturn(attemptMetrics[i]);
|
||||
when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
|
||||
|
||||
rm.getRMContext().getRMApps().put(appids[i], apps[i]);
|
||||
addAppEvent =
|
||||
new AppAddedSchedulerEvent(appids[i], "default", "user1");
|
||||
cs.handle(addAppEvent);
|
||||
addAttemptEvent =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
|
||||
cs.handle(addAttemptEvent);
|
||||
}
|
||||
|
||||
// add nodes to cluster, so cluster has 20GB and 20 vcores
|
||||
Resource nodeResource = Resource.newInstance(10 * GB, 10);
|
||||
if (numOfResourceTypes > 2) {
|
||||
for (int i = 2; i < numOfResourceTypes; i++) {
|
||||
nodeResource.setResourceValue(getResourceName(i), 10);
|
||||
}
|
||||
}
|
||||
|
||||
RMNode node = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.1");
|
||||
cs.handle(new NodeAddedSchedulerEvent(node));
|
||||
|
||||
RMNode node2 = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.2");
|
||||
cs.handle(new NodeAddedSchedulerEvent(node2));
|
||||
|
||||
Priority u0Priority = TestUtils.createMockPriority(1);
|
||||
RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
|
||||
for (int i=0;i<appCount;i++) {
|
||||
fiCaApps[i] =
|
||||
cs.getSchedulerApplications().get(apps[i].getApplicationId())
|
||||
.getCurrentAppAttempt();
|
||||
|
||||
ResourceRequest resourceRequest = TestUtils.createResourceRequest(
|
||||
ResourceRequest.ANY, 1 * GB, 1, true, u0Priority, recordFactory);
|
||||
if (numOfResourceTypes > 2) {
|
||||
for (int j = 2; j < numOfResourceTypes; j++) {
|
||||
resourceRequest.getCapability().setResourceValue(getResourceName(j),
|
||||
10);
|
||||
}
|
||||
}
|
||||
|
||||
// allocate container for app2 with 1GB memory and 1 vcore
|
||||
fiCaApps[i].updateResourceRequests(
|
||||
Collections.singletonList(resourceRequest));
|
||||
}
|
||||
// Now force everything to be over user limit
|
||||
qb.setUserLimitFactor((float)0.0);
|
||||
|
||||
// Quiet the loggers while measuring throughput
|
||||
for (Enumeration<?> loggers = LogManager.getCurrentLoggers();
|
||||
loggers.hasMoreElements(); ) {
|
||||
Logger logger = (Logger) loggers.nextElement();
|
||||
logger.setLevel(Level.WARN);
|
||||
}
|
||||
final int topn = 20;
|
||||
final int iterations = 2000000;
|
||||
final int printInterval = 20000;
|
||||
final float numerator = 1000.0f * printInterval;
|
||||
PriorityQueue<Long> queue = new PriorityQueue<>(topn,
|
||||
Collections.reverseOrder());
|
||||
|
||||
long n = Time.monotonicNow();
|
||||
long timespent = 0;
|
||||
for (int i = 0; i < iterations; i+=2) {
|
||||
if (i > 0 && i % printInterval == 0){
|
||||
long ts = (Time.monotonicNow() - n);
|
||||
if (queue.size() < topn) {
|
||||
queue.offer(ts);
|
||||
} else {
|
||||
Long last = queue.peek();
|
||||
if (last > ts) {
|
||||
queue.poll();
|
||||
queue.offer(ts);
|
||||
}
|
||||
}
|
||||
System.out.println(i + " " + (numerator / ts));
|
||||
n= Time.monotonicNow();
|
||||
}
|
||||
cs.handle(new NodeUpdateSchedulerEvent(node));
|
||||
cs.handle(new NodeUpdateSchedulerEvent(node2));
|
||||
}
|
||||
timespent=0;
|
||||
int entries = queue.size();
|
||||
while(queue.size() > 0){
|
||||
long l = queue.poll();
|
||||
timespent += l;
|
||||
}
|
||||
System.out.println(
|
||||
"#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
|
||||
+ ": " + numerator / (timespent / entries));
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testUserLimitThroughputForTwoResources() throws Exception {
|
||||
testUserLimitThroughputWithNumberOfResourceTypes(2);
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testUserLimitThroughputForThreeResources() throws Exception {
|
||||
testUserLimitThroughputWithNumberOfResourceTypes(3);
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testUserLimitThroughputForFourResources() throws Exception {
|
||||
testUserLimitThroughputWithNumberOfResourceTypes(4);
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testUserLimitThroughputForFiveResources() throws Exception {
|
||||
testUserLimitThroughputWithNumberOfResourceTypes(5);
|
||||
}
|
||||
}
|
|
@ -101,6 +101,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.yarn.util.resource.TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Embedded YARN minicluster for testcases that need to interact with a cluster.
|
||||
|
@ -252,7 +254,10 @@ public class MiniYARNCluster extends CompositeService {
|
|||
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
|
||||
failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
||||
|
||||
if (conf.getBoolean(TEST_CONF_RESET_RESOURCE_TYPES, true)) {
|
||||
ResourceUtils.resetResourceTypes(conf);
|
||||
}
|
||||
|
||||
if (useRpc && !useFixedPorts) {
|
||||
throw new YarnRuntimeException("Invalid configuration!" +
|
||||
|
|
Loading…
Reference in New Issue