YARN-6909. Use LightWeightedResource when number of resource types more than two. (Sunil G via wangda)

Change-Id: I90e021c5dea7abd9ec6bd73b2287c8adebe14595
This commit is contained in:
Wangda Tan 2017-11-09 14:51:15 -08:00
parent 1883a00249
commit dd07038ffa
4 changed files with 141 additions and 91 deletions

View File

@ -28,9 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
/**
@ -75,34 +75,27 @@ public abstract class Resource implements Comparable<Resource> {
@Public
@Stable
public static Resource newInstance(int memory, int vCores) {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
Resource ret = Records.newRecord(Resource.class);
ret.setMemorySize(memory);
ret.setVirtualCores(vCores);
return ret;
}
return new LightWeightResource(memory, vCores);
}
@Public
@Stable
public static Resource newInstance(long memory, int vCores) {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
Resource ret = Records.newRecord(Resource.class);
ret.setMemorySize(memory);
ret.setVirtualCores(vCores);
return ret;
}
return new LightWeightResource(memory, vCores);
}
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static Resource newInstance(Resource resource) {
Resource ret = Resource.newInstance(resource.getMemorySize(),
resource.getVirtualCores());
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
Resource.copy(resource, ret);
Resource ret;
int numberOfKnownResourceTypes = ResourceUtils
.getNumberOfKnownResourceTypes();
if (numberOfKnownResourceTypes > 2) {
ret = new LightWeightResource(resource.getMemorySize(),
resource.getVirtualCores(), resource.getResources());
} else {
ret = new LightWeightResource(resource.getMemorySize(),
resource.getVirtualCores());
}
return ret;
}
@ -411,7 +404,7 @@ public abstract class Resource implements Comparable<Resource> {
int arrLenOther = otherResources.length;
// compare memory and vcores first(in that order) to preserve
// existing behaviour
// existing behavior.
for (int i = 0; i < arrLenThis; i++) {
ResourceInformation otherEntry;
try {
@ -483,4 +476,23 @@ public abstract class Resource implements Comparable<Resource> {
}
return Long.valueOf(value).intValue();
}
/**
* Create ResourceInformation with basic fields.
* @param name Resource Type Name
* @param unit Default unit of provided resource type
* @param value Value associated with giveb resource
* @return ResourceInformation object
*/
protected static ResourceInformation newDefaultInformation(String name,
String unit, long value) {
ResourceInformation ri = new ResourceInformation();
ri.setName(name);
ri.setValue(value);
ri.setResourceType(ResourceTypes.COUNTABLE);
ri.setUnitsWithoutValidation(unit);
ri.setMinimumAllocation(0);
ri.setMaximumAllocation(Long.MAX_VALUE);
return ri;
}
}

View File

@ -23,14 +23,13 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_MB;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.*;
/**
* <p>
* <code>LightResource</code> extends Resource to handle base resources such
* <code>LightWeightResource</code> extends Resource to handle base resources such
* as memory and CPU.
* TODO: We have a long term plan to use AbstractResource when additional
* resource types are to be handled as well.
@ -66,27 +65,42 @@ public class LightWeightResource extends Resource {
private ResourceInformation memoryResInfo;
private ResourceInformation vcoresResInfo;
public LightWeightResource(long memory, long vcores) {
this.memoryResInfo = LightWeightResource.newDefaultInformation(MEMORY_URI,
MEMORY_MB.getUnits(), memory);
this.vcoresResInfo = LightWeightResource.newDefaultInformation(VCORES_URI,
"", vcores);
public LightWeightResource(long memory, int vcores) {
int numberOfKnownResourceTypes = ResourceUtils
.getNumberOfKnownResourceTypes();
initResourceInformations(memory, vcores, numberOfKnownResourceTypes);
resources = new ResourceInformation[NUM_MANDATORY_RESOURCES];
resources[MEMORY_INDEX] = memoryResInfo;
resources[VCORES_INDEX] = vcoresResInfo;
if (numberOfKnownResourceTypes > 2) {
ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
for (int i = 2; i < numberOfKnownResourceTypes; i++) {
resources[i] = new ResourceInformation();
ResourceInformation.copy(types[i], resources[i]);
}
}
}
private static ResourceInformation newDefaultInformation(String name,
String unit, long value) {
ResourceInformation ri = new ResourceInformation();
ri.setName(name);
ri.setValue(value);
ri.setResourceType(ResourceTypes.COUNTABLE);
ri.setUnitsWithoutValidation(unit);
ri.setMinimumAllocation(0);
ri.setMaximumAllocation(Long.MAX_VALUE);
return ri;
public LightWeightResource(long memory, int vcores,
ResourceInformation[] source) {
int numberOfKnownResourceTypes = ResourceUtils
.getNumberOfKnownResourceTypes();
initResourceInformations(memory, vcores, numberOfKnownResourceTypes);
for (int i = 2; i < numberOfKnownResourceTypes; i++) {
resources[i] = new ResourceInformation();
ResourceInformation.copy(source[i], resources[i]);
}
}
private void initResourceInformations(long memory, int vcores,
int numberOfKnownResourceTypes) {
this.memoryResInfo = newDefaultInformation(MEMORY_URI, MEMORY_MB.getUnits(),
memory);
this.vcoresResInfo = newDefaultInformation(VCORES_URI, VCORES.getUnits(),
vcores);
resources = new ResourceInformation[numberOfKnownResourceTypes];
resources[MEMORY_INDEX] = memoryResInfo;
resources[VCORES_INDEX] = vcoresResInfo;
}
@Override
@ -135,21 +149,41 @@ public class LightWeightResource extends Resource {
return false;
}
if (resources.length > 2) {
ResourceInformation[] otherVectors = other.getResources();
if (resources.length != otherVectors.length) {
return false;
}
for (int i = 2; i < resources.length; i++) {
ResourceInformation a = resources[i];
ResourceInformation b = otherVectors[i];
if ((a != b) && ((a == null) || !a.equals(b))) {
return false;
}
}
}
return true;
}
@Override
public int compareTo(Resource other) {
// compare memory and vcores first(in that order) to preserve
// existing behaviour
long diff = this.getMemorySize() - other.getMemorySize();
if (diff == 0) {
return this.getVirtualCores() - other.getVirtualCores();
} else if (diff > 0){
return 1;
} else {
return -1;
// existing behavior.
if (resources.length <= 2) {
long diff = this.getMemorySize() - other.getMemorySize();
if (diff == 0) {
return this.getVirtualCores() - other.getVirtualCores();
} else if (diff > 0) {
return 1;
} else {
return -1;
}
}
return super.compareTo(other);
}
@Override

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.Map;
@Private
@Unstable
public class ResourcePBImpl extends Resource {
@ -50,11 +49,14 @@ public class ResourcePBImpl extends Resource {
static ResourceProto getProto(Resource r) {
final ResourcePBImpl pb;
if (r instanceof ResourcePBImpl) {
pb = (ResourcePBImpl)r;
pb = (ResourcePBImpl) r;
} else {
pb = new ResourcePBImpl();
pb.setMemorySize(r.getMemorySize());
pb.setVirtualCores(r.getVirtualCores());
for(ResourceInformation res : r.getResources()) {
pb.setResourceInformation(res.getName(), res);
}
}
return pb.getProto();
}
@ -111,7 +113,7 @@ public class ResourcePBImpl extends Resource {
@Override
public void setMemorySize(long memory) {
maybeInitBuilder();
getResourceInformation(ResourceInformation.MEMORY_URI).setValue(memory);
resources[MEMORY_INDEX].setValue(memory);
}
@Override
@ -123,7 +125,7 @@ public class ResourcePBImpl extends Resource {
@Override
public void setVirtualCores(int vCores) {
maybeInitBuilder();
getResourceInformation(ResourceInformation.VCORES_URI).setValue(vCores);
resources[VCORES_INDEX].setValue(vCores);
}
private void initResources() {
@ -131,31 +133,51 @@ public class ResourcePBImpl extends Resource {
return;
}
ResourceProtoOrBuilder p = viaProto ? proto : builder;
initResourcesMap();
ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
Map<String, Integer> indexMap = ResourceUtils.getResourceTypeIndex();
for (ResourceInformationProto entry : p.getResourceValueMapList()) {
ResourceTypes type =
entry.hasType() ? ProtoUtils.convertFromProtoFormat(entry.getType()) :
ResourceTypes.COUNTABLE;
resources = new ResourceInformation[types.length];
// When unit not specified in proto, use the default unit.
String units =
entry.hasUnits() ? entry.getUnits() : ResourceUtils.getDefaultUnit(
entry.getKey());
long value = entry.hasValue() ? entry.getValue() : 0L;
ResourceInformation ri = ResourceInformation
.newInstance(entry.getKey(), units, value, type, 0L, Long.MAX_VALUE);
for (ResourceInformationProto entry : p.getResourceValueMapList()) {
Integer index = indexMap.get(entry.getKey());
if (index == null) {
LOG.warn("Got unknown resource type: " + ri.getName() + "; skipping");
LOG.warn("Got unknown resource type: " + entry.getKey() + "; skipping");
} else {
resources[index].setResourceType(ri.getResourceType());
resources[index].setUnits(ri.getUnits());
resources[index].setValue(value);
resources[index] = newDefaultInformation(types[index], entry);
}
}
resources[MEMORY_INDEX] = ResourceInformation
.newInstance(ResourceInformation.MEMORY_MB);
resources[VCORES_INDEX] = ResourceInformation
.newInstance(ResourceInformation.VCORES);
this.setMemorySize(p.getMemory());
this.setVirtualCores(p.getVirtualCores());
// Update missing resource information on respective index.
updateResourceInformationMap(types);
}
private void updateResourceInformationMap(ResourceInformation[] types) {
for (int i = 0; i < types.length; i++) {
if (resources[i] == null) {
resources[i] = ResourceInformation.newInstance(types[i]);
}
}
}
private static ResourceInformation newDefaultInformation(
ResourceInformation resourceInformation, ResourceInformationProto entry) {
ResourceInformation ri = new ResourceInformation();
ri.setName(resourceInformation.getName());
ri.setMinimumAllocation(resourceInformation.getMinimumAllocation());
ri.setMaximumAllocation(resourceInformation.getMaximumAllocation());
ri.setResourceType(entry.hasType()
? ProtoUtils.convertFromProtoFormat(entry.getType())
: ResourceTypes.COUNTABLE);
ri.setUnits(
entry.hasUnits() ? entry.getUnits() : resourceInformation.getUnits());
ri.setValue(entry.hasValue() ? entry.getValue() : 0L);
return ri;
}
@Override
@ -166,10 +188,8 @@ public class ResourcePBImpl extends Resource {
throw new IllegalArgumentException(
"resource and/or resourceInformation cannot be null");
}
if (!resource.equals(resourceInformation.getName())) {
resourceInformation.setName(resource);
}
ResourceInformation storedResourceInfo = getResourceInformation(resource);
ResourceInformation storedResourceInfo = super.getResourceInformation(
resource);
ResourceInformation.copy(resourceInformation, storedResourceInfo);
}
@ -195,25 +215,9 @@ public class ResourcePBImpl extends Resource {
return super.getResourceValue(resource);
}
private void initResourcesMap() {
if (resources == null) {
ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
if (types == null) {
throw new YarnRuntimeException(
"Got null return value from ResourceUtils.getResourceTypes()");
}
resources = new ResourceInformation[types.length];
for (ResourceInformation entry : types) {
int index = ResourceUtils.getResourceTypeIndex().get(entry.getName());
resources[index] = ResourceInformation.newInstance(entry);
}
}
}
synchronized private void mergeLocalToBuilder() {
builder.clearResourceValueMap();
if(resources != null && resources.length != 0) {
if (resources != null && resources.length != 0) {
for (ResourceInformation resInfo : resources) {
ResourceInformationProto.Builder e = ResourceInformationProto
.newBuilder();
@ -236,4 +240,4 @@ public class ResourcePBImpl extends Resource {
proto = builder.build();
viaProto = true;
}
}
}

View File

@ -55,7 +55,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
private Map<String, N> nodeNameToNodeMap = new HashMap<>();
private Map<String, List<N>> nodesPerRack = new HashMap<>();
private final Resource clusterCapacity = Resources.clone(Resources.none());
private Resource clusterCapacity = Resources.createResource(0, 0);
private volatile Resource staleClusterCapacity =
Resources.clone(Resources.none());