YARN-6909. Use LightWeightedResource when number of resource types more than two. (Sunil G via wangda)

Change-Id: I90e021c5dea7abd9ec6bd73b2287c8adebe14595
This commit is contained in:
Wangda Tan 2017-11-09 14:51:15 -08:00 committed by Jonathan Hung
parent 87ad52f076
commit 056b54c62c
4 changed files with 141 additions and 91 deletions

View File

@ -31,9 +31,9 @@
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource; import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
/** /**
@ -76,34 +76,27 @@ public abstract class Resource implements Comparable<Resource> {
@Public @Public
@Stable @Stable
public static Resource newInstance(int memory, int vCores) { public static Resource newInstance(int memory, int vCores) {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
Resource ret = Records.newRecord(Resource.class);
ret.setMemorySize(memory);
ret.setVirtualCores(vCores);
return ret;
}
return new LightWeightResource(memory, vCores); return new LightWeightResource(memory, vCores);
} }
@Public @Public
@Stable @Stable
public static Resource newInstance(long memory, int vCores) { public static Resource newInstance(long memory, int vCores) {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
Resource ret = Records.newRecord(Resource.class);
ret.setMemorySize(memory);
ret.setVirtualCores(vCores);
return ret;
}
return new LightWeightResource(memory, vCores); return new LightWeightResource(memory, vCores);
} }
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public static Resource newInstance(Resource resource) { public static Resource newInstance(Resource resource) {
Resource ret = Resource.newInstance(resource.getMemorySize(), Resource ret;
resource.getVirtualCores()); int numberOfKnownResourceTypes = ResourceUtils
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { .getNumberOfKnownResourceTypes();
Resource.copy(resource, ret); if (numberOfKnownResourceTypes > 2) {
ret = new LightWeightResource(resource.getMemorySize(),
resource.getVirtualCores(), resource.getResources());
} else {
ret = new LightWeightResource(resource.getMemorySize(),
resource.getVirtualCores());
} }
return ret; return ret;
} }
@ -428,7 +421,7 @@ public int compareTo(Resource other) {
int arrLenOther = otherResources.length; int arrLenOther = otherResources.length;
// compare memory and vcores first(in that order) to preserve // compare memory and vcores first(in that order) to preserve
// existing behaviour // existing behavior.
for (int i = 0; i < arrLenThis; i++) { for (int i = 0; i < arrLenThis; i++) {
ResourceInformation otherEntry; ResourceInformation otherEntry;
try { try {
@ -500,4 +493,23 @@ protected static int castToIntSafely(long value) {
} }
return Long.valueOf(value).intValue(); return Long.valueOf(value).intValue();
} }
/**
* Create ResourceInformation with basic fields.
* @param name Resource Type Name
* @param unit Default unit of provided resource type
* @param value Value associated with giveb resource
* @return ResourceInformation object
*/
protected static ResourceInformation newDefaultInformation(String name,
String unit, long value) {
ResourceInformation ri = new ResourceInformation();
ri.setName(name);
ri.setValue(value);
ri.setResourceType(ResourceTypes.COUNTABLE);
ri.setUnitsWithoutValidation(unit);
ri.setMinimumAllocation(0);
ri.setMaximumAllocation(Long.MAX_VALUE);
return ri;
}
} }

View File

@ -23,14 +23,13 @@
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_MB; import static org.apache.hadoop.yarn.api.records.ResourceInformation.*;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
/** /**
* <p> * <p>
* <code>LightResource</code> extends Resource to handle base resources such * <code>LightWeightResource</code> extends Resource to handle base resources such
* as memory and CPU. * as memory and CPU.
* TODO: We have a long term plan to use AbstractResource when additional * TODO: We have a long term plan to use AbstractResource when additional
* resource types are to be handled as well. * resource types are to be handled as well.
@ -66,27 +65,42 @@ public class LightWeightResource extends Resource {
private ResourceInformation memoryResInfo; private ResourceInformation memoryResInfo;
private ResourceInformation vcoresResInfo; private ResourceInformation vcoresResInfo;
public LightWeightResource(long memory, long vcores) { public LightWeightResource(long memory, int vcores) {
this.memoryResInfo = LightWeightResource.newDefaultInformation(MEMORY_URI, int numberOfKnownResourceTypes = ResourceUtils
MEMORY_MB.getUnits(), memory); .getNumberOfKnownResourceTypes();
this.vcoresResInfo = LightWeightResource.newDefaultInformation(VCORES_URI, initResourceInformations(memory, vcores, numberOfKnownResourceTypes);
"", vcores);
resources = new ResourceInformation[NUM_MANDATORY_RESOURCES]; if (numberOfKnownResourceTypes > 2) {
resources[MEMORY_INDEX] = memoryResInfo; ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
resources[VCORES_INDEX] = vcoresResInfo; for (int i = 2; i < numberOfKnownResourceTypes; i++) {
resources[i] = new ResourceInformation();
ResourceInformation.copy(types[i], resources[i]);
}
}
} }
private static ResourceInformation newDefaultInformation(String name, public LightWeightResource(long memory, int vcores,
String unit, long value) { ResourceInformation[] source) {
ResourceInformation ri = new ResourceInformation(); int numberOfKnownResourceTypes = ResourceUtils
ri.setName(name); .getNumberOfKnownResourceTypes();
ri.setValue(value); initResourceInformations(memory, vcores, numberOfKnownResourceTypes);
ri.setResourceType(ResourceTypes.COUNTABLE);
ri.setUnitsWithoutValidation(unit); for (int i = 2; i < numberOfKnownResourceTypes; i++) {
ri.setMinimumAllocation(0); resources[i] = new ResourceInformation();
ri.setMaximumAllocation(Long.MAX_VALUE); ResourceInformation.copy(source[i], resources[i]);
return ri; }
}
private void initResourceInformations(long memory, int vcores,
int numberOfKnownResourceTypes) {
this.memoryResInfo = newDefaultInformation(MEMORY_URI, MEMORY_MB.getUnits(),
memory);
this.vcoresResInfo = newDefaultInformation(VCORES_URI, VCORES.getUnits(),
vcores);
resources = new ResourceInformation[numberOfKnownResourceTypes];
resources[MEMORY_INDEX] = memoryResInfo;
resources[VCORES_INDEX] = vcoresResInfo;
} }
@Override @Override
@ -135,21 +149,41 @@ public boolean equals(Object obj) {
return false; return false;
} }
if (resources.length > 2) {
ResourceInformation[] otherVectors = other.getResources();
if (resources.length != otherVectors.length) {
return false;
}
for (int i = 2; i < resources.length; i++) {
ResourceInformation a = resources[i];
ResourceInformation b = otherVectors[i];
if ((a != b) && ((a == null) || !a.equals(b))) {
return false;
}
}
}
return true; return true;
} }
@Override @Override
public int compareTo(Resource other) { public int compareTo(Resource other) {
// compare memory and vcores first(in that order) to preserve // compare memory and vcores first(in that order) to preserve
// existing behaviour // existing behavior.
long diff = this.getMemorySize() - other.getMemorySize(); if (resources.length <= 2) {
if (diff == 0) { long diff = this.getMemorySize() - other.getMemorySize();
return this.getVirtualCores() - other.getVirtualCores(); if (diff == 0) {
} else if (diff > 0){ return this.getVirtualCores() - other.getVirtualCores();
return 1; } else if (diff > 0) {
} else { return 1;
return -1; } else {
return -1;
}
} }
return super.compareTo(other);
} }
@Override @Override

View File

@ -35,7 +35,6 @@
import java.util.Map; import java.util.Map;
@Private @Private
@Unstable @Unstable
public class ResourcePBImpl extends Resource { public class ResourcePBImpl extends Resource {
@ -50,11 +49,14 @@ public class ResourcePBImpl extends Resource {
static ResourceProto getProto(Resource r) { static ResourceProto getProto(Resource r) {
final ResourcePBImpl pb; final ResourcePBImpl pb;
if (r instanceof ResourcePBImpl) { if (r instanceof ResourcePBImpl) {
pb = (ResourcePBImpl)r; pb = (ResourcePBImpl) r;
} else { } else {
pb = new ResourcePBImpl(); pb = new ResourcePBImpl();
pb.setMemorySize(r.getMemorySize()); pb.setMemorySize(r.getMemorySize());
pb.setVirtualCores(r.getVirtualCores()); pb.setVirtualCores(r.getVirtualCores());
for(ResourceInformation res : r.getResources()) {
pb.setResourceInformation(res.getName(), res);
}
} }
return pb.getProto(); return pb.getProto();
} }
@ -111,7 +113,7 @@ public void setMemory(int memory) {
@Override @Override
public void setMemorySize(long memory) { public void setMemorySize(long memory) {
maybeInitBuilder(); maybeInitBuilder();
getResourceInformation(ResourceInformation.MEMORY_URI).setValue(memory); resources[MEMORY_INDEX].setValue(memory);
} }
@Override @Override
@ -123,7 +125,7 @@ public int getVirtualCores() {
@Override @Override
public void setVirtualCores(int vCores) { public void setVirtualCores(int vCores) {
maybeInitBuilder(); maybeInitBuilder();
getResourceInformation(ResourceInformation.VCORES_URI).setValue(vCores); resources[VCORES_INDEX].setValue(vCores);
} }
private void initResources() { private void initResources() {
@ -131,31 +133,51 @@ private void initResources() {
return; return;
} }
ResourceProtoOrBuilder p = viaProto ? proto : builder; ResourceProtoOrBuilder p = viaProto ? proto : builder;
initResourcesMap(); ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
Map<String, Integer> indexMap = ResourceUtils.getResourceTypeIndex(); Map<String, Integer> indexMap = ResourceUtils.getResourceTypeIndex();
for (ResourceInformationProto entry : p.getResourceValueMapList()) { resources = new ResourceInformation[types.length];
ResourceTypes type =
entry.hasType() ? ProtoUtils.convertFromProtoFormat(entry.getType()) :
ResourceTypes.COUNTABLE;
// When unit not specified in proto, use the default unit. for (ResourceInformationProto entry : p.getResourceValueMapList()) {
String units =
entry.hasUnits() ? entry.getUnits() : ResourceUtils.getDefaultUnit(
entry.getKey());
long value = entry.hasValue() ? entry.getValue() : 0L;
ResourceInformation ri = ResourceInformation
.newInstance(entry.getKey(), units, value, type, 0L, Long.MAX_VALUE);
Integer index = indexMap.get(entry.getKey()); Integer index = indexMap.get(entry.getKey());
if (index == null) { if (index == null) {
LOG.warn("Got unknown resource type: " + ri.getName() + "; skipping"); LOG.warn("Got unknown resource type: " + entry.getKey() + "; skipping");
} else { } else {
resources[index].setResourceType(ri.getResourceType()); resources[index] = newDefaultInformation(types[index], entry);
resources[index].setUnits(ri.getUnits());
resources[index].setValue(value);
} }
} }
resources[MEMORY_INDEX] = ResourceInformation
.newInstance(ResourceInformation.MEMORY_MB);
resources[VCORES_INDEX] = ResourceInformation
.newInstance(ResourceInformation.VCORES);
this.setMemorySize(p.getMemory()); this.setMemorySize(p.getMemory());
this.setVirtualCores(p.getVirtualCores()); this.setVirtualCores(p.getVirtualCores());
// Update missing resource information on respective index.
updateResourceInformationMap(types);
}
private void updateResourceInformationMap(ResourceInformation[] types) {
for (int i = 0; i < types.length; i++) {
if (resources[i] == null) {
resources[i] = ResourceInformation.newInstance(types[i]);
}
}
}
private static ResourceInformation newDefaultInformation(
ResourceInformation resourceInformation, ResourceInformationProto entry) {
ResourceInformation ri = new ResourceInformation();
ri.setName(resourceInformation.getName());
ri.setMinimumAllocation(resourceInformation.getMinimumAllocation());
ri.setMaximumAllocation(resourceInformation.getMaximumAllocation());
ri.setResourceType(entry.hasType()
? ProtoUtils.convertFromProtoFormat(entry.getType())
: ResourceTypes.COUNTABLE);
ri.setUnits(
entry.hasUnits() ? entry.getUnits() : resourceInformation.getUnits());
ri.setValue(entry.hasValue() ? entry.getValue() : 0L);
return ri;
} }
@Override @Override
@ -166,10 +188,8 @@ public void setResourceInformation(String resource,
throw new IllegalArgumentException( throw new IllegalArgumentException(
"resource and/or resourceInformation cannot be null"); "resource and/or resourceInformation cannot be null");
} }
if (!resource.equals(resourceInformation.getName())) { ResourceInformation storedResourceInfo = super.getResourceInformation(
resourceInformation.setName(resource); resource);
}
ResourceInformation storedResourceInfo = getResourceInformation(resource);
ResourceInformation.copy(resourceInformation, storedResourceInfo); ResourceInformation.copy(resourceInformation, storedResourceInfo);
} }
@ -195,25 +215,9 @@ public long getResourceValue(String resource)
return super.getResourceValue(resource); return super.getResourceValue(resource);
} }
private void initResourcesMap() {
if (resources == null) {
ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
if (types == null) {
throw new YarnRuntimeException(
"Got null return value from ResourceUtils.getResourceTypes()");
}
resources = new ResourceInformation[types.length];
for (ResourceInformation entry : types) {
int index = ResourceUtils.getResourceTypeIndex().get(entry.getName());
resources[index] = ResourceInformation.newInstance(entry);
}
}
}
synchronized private void mergeLocalToBuilder() { synchronized private void mergeLocalToBuilder() {
builder.clearResourceValueMap(); builder.clearResourceValueMap();
if(resources != null && resources.length != 0) { if (resources != null && resources.length != 0) {
for (ResourceInformation resInfo : resources) { for (ResourceInformation resInfo : resources) {
ResourceInformationProto.Builder e = ResourceInformationProto ResourceInformationProto.Builder e = ResourceInformationProto
.newBuilder(); .newBuilder();
@ -236,4 +240,4 @@ private void mergeLocalToProto() {
proto = builder.build(); proto = builder.build();
viaProto = true; viaProto = true;
} }
} }

View File

@ -56,7 +56,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
private Map<String, N> nodeNameToNodeMap = new HashMap<>(); private Map<String, N> nodeNameToNodeMap = new HashMap<>();
private Map<String, List<N>> nodesPerRack = new HashMap<>(); private Map<String, List<N>> nodesPerRack = new HashMap<>();
private Resource clusterCapacity = Resources.clone(Resources.none()); private Resource clusterCapacity = Resources.createResource(0, 0);
private Resource staleClusterCapacity = null; private Resource staleClusterCapacity = null;
// Max allocation // Max allocation