YARN-5587. [Partial backport] Add support for resource profiles. (vvasudev via asuresh)
(cherry picked from commit 6708ac3301
)
This commit is contained in:
parent
f99d603aaf
commit
6db8c36ba9
|
@ -153,6 +153,10 @@
|
||||||
<Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceRetentionSet$LRUComparator" />
|
<Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceRetentionSet$LRUComparator" />
|
||||||
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
|
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl$ProfileCapabilityComparator" />
|
||||||
|
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
|
||||||
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<Class name="org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl" />
|
<Class name="org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl" />
|
||||||
<Field name="builder" />
|
<Field name="builder" />
|
||||||
|
|
|
@ -19,7 +19,9 @@
|
||||||
package org.apache.hadoop.yarn.api.records;
|
package org.apache.hadoop.yarn.api.records;
|
||||||
|
|
||||||
import org.apache.commons.lang.NotImplementedException;
|
import org.apache.commons.lang.NotImplementedException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
@ -101,6 +103,18 @@ public abstract class Resource implements Comparable<Resource> {
|
||||||
return new SimpleResource(memory, vCores);
|
return new SimpleResource(memory, vCores);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public static Resource newInstance(Resource resource) {
|
||||||
|
Resource ret = Resource.newInstance(0, 0);
|
||||||
|
for (Map.Entry<String, ResourceInformation> entry : resource.getResources()
|
||||||
|
.entrySet()) {
|
||||||
|
ret.setResourceInformation(entry.getKey(),
|
||||||
|
ResourceInformation.newInstance(entry.getValue()));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is DEPRECATED:
|
* This method is DEPRECATED:
|
||||||
* Use {@link Resource#getMemorySize()} instead
|
* Use {@link Resource#getMemorySize()} instead
|
||||||
|
|
|
@ -31,6 +31,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
|
||||||
private String units;
|
private String units;
|
||||||
private ResourceTypes resourceType;
|
private ResourceTypes resourceType;
|
||||||
private Long value;
|
private Long value;
|
||||||
|
private Long minimumAllocation;
|
||||||
|
private Long maximumAllocation;
|
||||||
|
|
||||||
private static final String MEMORY_URI = "memory-mb";
|
private static final String MEMORY_URI = "memory-mb";
|
||||||
private static final String VCORES_URI = "vcores";
|
private static final String VCORES_URI = "vcores";
|
||||||
|
@ -117,6 +119,42 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
|
||||||
this.value = rValue;
|
this.value = rValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the minimum allocation for the resource.
|
||||||
|
*
|
||||||
|
* @return the minimum allocation for the resource
|
||||||
|
*/
|
||||||
|
public Long getMinimumAllocation() {
|
||||||
|
return minimumAllocation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the minimum allocation for the resource.
|
||||||
|
*
|
||||||
|
* @param minimumAllocation the minimum allocation for the resource
|
||||||
|
*/
|
||||||
|
public void setMinimumAllocation(Long minimumAllocation) {
|
||||||
|
this.minimumAllocation = minimumAllocation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the maximum allocation for the resource.
|
||||||
|
*
|
||||||
|
* @return the maximum allocation for the resource
|
||||||
|
*/
|
||||||
|
public Long getMaximumAllocation() {
|
||||||
|
return maximumAllocation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the maximum allocation for the resource.
|
||||||
|
*
|
||||||
|
* @param maximumAllocation the maximum allocation for the resource
|
||||||
|
*/
|
||||||
|
public void setMaximumAllocation(Long maximumAllocation) {
|
||||||
|
this.maximumAllocation = maximumAllocation;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new instance of ResourceInformation from another object.
|
* Create a new instance of ResourceInformation from another object.
|
||||||
*
|
*
|
||||||
|
@ -129,33 +167,41 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
|
||||||
ret.setResourceType(other.getResourceType());
|
ret.setResourceType(other.getResourceType());
|
||||||
ret.setUnits(other.getUnits());
|
ret.setUnits(other.getUnits());
|
||||||
ret.setValue(other.getValue());
|
ret.setValue(other.getValue());
|
||||||
|
ret.setMinimumAllocation(other.getMinimumAllocation());
|
||||||
|
ret.setMaximumAllocation(other.getMaximumAllocation());
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ResourceInformation newInstance(String name, String units,
|
public static ResourceInformation newInstance(String name, String units,
|
||||||
Long value, ResourceTypes type) {
|
Long value, ResourceTypes type, Long minimumAllocation,
|
||||||
|
Long maximumAllocation) {
|
||||||
ResourceInformation ret = new ResourceInformation();
|
ResourceInformation ret = new ResourceInformation();
|
||||||
ret.setName(name);
|
ret.setName(name);
|
||||||
ret.setResourceType(type);
|
ret.setResourceType(type);
|
||||||
ret.setUnits(units);
|
ret.setUnits(units);
|
||||||
ret.setValue(value);
|
ret.setValue(value);
|
||||||
|
ret.setMinimumAllocation(minimumAllocation);
|
||||||
|
ret.setMaximumAllocation(maximumAllocation);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ResourceInformation newInstance(String name, String units,
|
public static ResourceInformation newInstance(String name, String units,
|
||||||
Long value) {
|
Long value) {
|
||||||
return ResourceInformation
|
return ResourceInformation
|
||||||
.newInstance(name, units, value, ResourceTypes.COUNTABLE);
|
.newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L,
|
||||||
|
Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ResourceInformation newInstance(String name, String units) {
|
public static ResourceInformation newInstance(String name, String units) {
|
||||||
return ResourceInformation
|
return ResourceInformation
|
||||||
.newInstance(name, units, 0L, ResourceTypes.COUNTABLE);
|
.newInstance(name, units, 0L, ResourceTypes.COUNTABLE, 0L,
|
||||||
|
Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ResourceInformation newInstance(String name, Long value) {
|
public static ResourceInformation newInstance(String name, Long value) {
|
||||||
return ResourceInformation
|
return ResourceInformation
|
||||||
.newInstance(name, "", value, ResourceTypes.COUNTABLE);
|
.newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L,
|
||||||
|
Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ResourceInformation newInstance(String name) {
|
public static ResourceInformation newInstance(String name) {
|
||||||
|
@ -165,7 +211,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "name: " + this.name + ", units: " + this.units + ", type: "
|
return "name: " + this.name + ", units: " + this.units + ", type: "
|
||||||
+ resourceType + ", value: " + value;
|
+ resourceType + ", value: " + value + ", minimum allocation: "
|
||||||
|
+ minimumAllocation + ", maximum allocation: " + maximumAllocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getShorthandRepresentation() {
|
public String getShorthandRepresentation() {
|
||||||
|
|
|
@ -150,12 +150,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean canFit(Resource arg0, Resource arg1) {
|
static boolean canFit(Resource arg0, Resource arg1) {
|
||||||
long mem0 = arg0.getMemorySize();
|
return Resources.fitsIn(arg0, arg1);
|
||||||
long mem1 = arg1.getMemorySize();
|
|
||||||
long cpu0 = arg0.getVirtualCores();
|
|
||||||
long cpu1 = arg1.getVirtualCores();
|
|
||||||
|
|
||||||
return (mem0 <= mem1 && cpu0 <= cpu1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
|
private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
|
||||||
|
|
|
@ -144,8 +144,8 @@ public class ResourcePBImpl extends Resource {
|
||||||
ResourceTypes.COUNTABLE;
|
ResourceTypes.COUNTABLE;
|
||||||
String units = entry.hasUnits() ? entry.getUnits() : "";
|
String units = entry.hasUnits() ? entry.getUnits() : "";
|
||||||
Long value = entry.hasValue() ? entry.getValue() : 0L;
|
Long value = entry.hasValue() ? entry.getValue() : 0L;
|
||||||
ResourceInformation ri =
|
ResourceInformation ri = ResourceInformation
|
||||||
ResourceInformation.newInstance(entry.getKey(), units, value, type);
|
.newInstance(entry.getKey(), units, value, type, 0L, Long.MAX_VALUE);
|
||||||
if (resources.containsKey(ri.getName())) {
|
if (resources.containsKey(ri.getName())) {
|
||||||
resources.get(ri.getName()).setResourceType(ri.getResourceType());
|
resources.get(ri.getName()).setResourceType(ri.getResourceType());
|
||||||
resources.get(ri.getName()).setUnits(ri.getUnits());
|
resources.get(ri.getName()).setUnits(ri.getUnits());
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
|
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
|
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
|
||||||
|
@ -51,6 +52,8 @@ public class ResourceUtils {
|
||||||
|
|
||||||
public static final String UNITS = ".units";
|
public static final String UNITS = ".units";
|
||||||
public static final String TYPE = ".type";
|
public static final String TYPE = ".type";
|
||||||
|
public static final String MINIMUM_ALLOCATION = ".minimum-allocation";
|
||||||
|
public static final String MAXIMUM_ALLOCATION = ".maximum-allocation";
|
||||||
|
|
||||||
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
|
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
|
||||||
private static final String VCORES = ResourceInformation.VCORES.getName();
|
private static final String VCORES = ResourceInformation.VCORES.getName();
|
||||||
|
@ -122,6 +125,86 @@ public class ResourceUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void setMinimumAllocationForMandatoryResources(
|
||||||
|
Map<String, ResourceInformation> res, Configuration conf) {
|
||||||
|
String[][] resourceTypesKeys =
|
||||||
|
{
|
||||||
|
{ ResourceInformation.MEMORY_MB.getName(),
|
||||||
|
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||||
|
String.valueOf(
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
|
||||||
|
ResourceInformation.MEMORY_MB.getName()
|
||||||
|
},
|
||||||
|
{ ResourceInformation.VCORES.getName(),
|
||||||
|
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||||
|
String.valueOf(
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES),
|
||||||
|
ResourceInformation.VCORES.getName()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for (String[] arr : resourceTypesKeys) {
|
||||||
|
String resourceTypesKey =
|
||||||
|
YarnConfiguration.RESOURCE_TYPES + "." + arr[0] + MINIMUM_ALLOCATION;
|
||||||
|
long minimumResourceTypes = conf.getLong(resourceTypesKey, -1);
|
||||||
|
long minimumConf = conf.getLong(arr[1], -1);
|
||||||
|
long minimum;
|
||||||
|
if (minimumResourceTypes != -1) {
|
||||||
|
minimum = minimumResourceTypes;
|
||||||
|
if (minimumConf != -1) {
|
||||||
|
LOG.warn("Using minimum allocation for memory specified in "
|
||||||
|
+ "resource-types config file with key "
|
||||||
|
+ minimumResourceTypes + ", ignoring minimum specified using "
|
||||||
|
+ arr[1]);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
minimum = conf.getLong(arr[1], Long.parseLong(arr[2]));
|
||||||
|
}
|
||||||
|
ResourceInformation ri = res.get(arr[3]);
|
||||||
|
ri.setMinimumAllocation(minimum);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setMaximumAllocationForMandatoryResources(
|
||||||
|
Map<String, ResourceInformation> res, Configuration conf) {
|
||||||
|
String[][] resourceTypesKeys =
|
||||||
|
{
|
||||||
|
{
|
||||||
|
ResourceInformation.MEMORY_MB.getName(),
|
||||||
|
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
String.valueOf(
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
|
||||||
|
ResourceInformation.MEMORY_MB.getName()
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ResourceInformation.VCORES.getName(),
|
||||||
|
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||||
|
String.valueOf(
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
|
||||||
|
ResourceInformation.VCORES.getName()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for (String[] arr : resourceTypesKeys) {
|
||||||
|
String resourceTypesKey =
|
||||||
|
YarnConfiguration.RESOURCE_TYPES + "." + arr[0] + MAXIMUM_ALLOCATION;
|
||||||
|
long maximumResourceTypes = conf.getLong(resourceTypesKey, -1);
|
||||||
|
long maximumConf = conf.getLong(arr[1], -1);
|
||||||
|
long maximum;
|
||||||
|
if (maximumResourceTypes != -1) {
|
||||||
|
maximum = maximumResourceTypes;
|
||||||
|
if (maximumConf != -1) {
|
||||||
|
LOG.warn("Using maximum allocation for memory specified in "
|
||||||
|
+ "resource-types config file with key "
|
||||||
|
+ maximumResourceTypes + ", ignoring maximum specified using "
|
||||||
|
+ arr[1]);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
maximum = conf.getLong(arr[1], Long.parseLong(arr[2]));
|
||||||
|
}
|
||||||
|
ResourceInformation ri = res.get(arr[3]);
|
||||||
|
ri.setMaximumAllocation(maximum);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static void initializeResourcesMap(Configuration conf,
|
static void initializeResourcesMap(Configuration conf,
|
||||||
Map<String, ResourceInformation> resourceInformationMap) {
|
Map<String, ResourceInformation> resourceInformationMap) {
|
||||||
|
@ -135,6 +218,12 @@ public class ResourceUtils {
|
||||||
String resourceTypeName = conf.get(
|
String resourceTypeName = conf.get(
|
||||||
YarnConfiguration.RESOURCE_TYPES + "." + resourceName + TYPE,
|
YarnConfiguration.RESOURCE_TYPES + "." + resourceName + TYPE,
|
||||||
ResourceTypes.COUNTABLE.toString());
|
ResourceTypes.COUNTABLE.toString());
|
||||||
|
Long minimumAllocation = conf.getLong(
|
||||||
|
YarnConfiguration.RESOURCE_TYPES + "." + resourceName
|
||||||
|
+ MINIMUM_ALLOCATION, 0L);
|
||||||
|
Long maximumAllocation = conf.getLong(
|
||||||
|
YarnConfiguration.RESOURCE_TYPES + "." + resourceName
|
||||||
|
+ MAXIMUM_ALLOCATION, Long.MAX_VALUE);
|
||||||
if (resourceName == null || resourceName.isEmpty()
|
if (resourceName == null || resourceName.isEmpty()
|
||||||
|| resourceUnits == null || resourceTypeName == null) {
|
|| resourceUnits == null || resourceTypeName == null) {
|
||||||
throw new YarnRuntimeException(
|
throw new YarnRuntimeException(
|
||||||
|
@ -154,11 +243,14 @@ public class ResourceUtils {
|
||||||
"Error in config, key '" + resourceName + "' specified twice");
|
"Error in config, key '" + resourceName + "' specified twice");
|
||||||
}
|
}
|
||||||
resourceInformationMap.put(resourceName, ResourceInformation
|
resourceInformationMap.put(resourceName, ResourceInformation
|
||||||
.newInstance(resourceName, resourceUnits, 0L, resourceType));
|
.newInstance(resourceName, resourceUnits, 0L, resourceType,
|
||||||
|
minimumAllocation, maximumAllocation));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
checkMandatatoryResources(resourceInformationMap);
|
checkMandatatoryResources(resourceInformationMap);
|
||||||
addManadtoryResources(resourceInformationMap);
|
addManadtoryResources(resourceInformationMap);
|
||||||
|
setMinimumAllocationForMandatoryResources(resourceInformationMap, conf);
|
||||||
|
setMaximumAllocationForMandatoryResources(resourceInformationMap, conf);
|
||||||
readOnlyResources = Collections.unmodifiableMap(resourceInformationMap);
|
readOnlyResources = Collections.unmodifiableMap(resourceInformationMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,6 +264,12 @@ public class ResourceUtils {
|
||||||
YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE);
|
YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Map<String, ResourceInformation> getResourceTypes(
|
||||||
|
Configuration conf) {
|
||||||
|
return getResourceTypes(conf,
|
||||||
|
YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE);
|
||||||
|
}
|
||||||
|
|
||||||
private static Map<String, ResourceInformation> getResourceTypes(
|
private static Map<String, ResourceInformation> getResourceTypes(
|
||||||
Configuration conf, String resourceFile) {
|
Configuration conf, String resourceFile) {
|
||||||
if (lock == null) {
|
if (lock == null) {
|
||||||
|
@ -205,6 +303,12 @@ public class ResourceUtils {
|
||||||
|
|
||||||
ConfigurationProvider provider =
|
ConfigurationProvider provider =
|
||||||
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
||||||
|
try {
|
||||||
|
provider.init(conf);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
|
||||||
InputStream ris = provider.getConfigurationInputStream(conf, resourceFile);
|
InputStream ris = provider.getConfigurationInputStream(conf, resourceFile);
|
||||||
if (ris == null) {
|
if (ris == null) {
|
||||||
if (conf.getResource(resourceFile) == null) {
|
if (conf.getResource(resourceFile) == null) {
|
||||||
|
@ -241,6 +345,12 @@ public class ResourceUtils {
|
||||||
lock = null;
|
lock = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static void resetResourceTypes(Configuration conf) {
|
||||||
|
lock = null;
|
||||||
|
getResourceTypes(conf);
|
||||||
|
}
|
||||||
|
|
||||||
public static String getUnits(String resourceValue) {
|
public static String getUnits(String resourceValue) {
|
||||||
String units;
|
String units;
|
||||||
for (int i = 0; i < resourceValue.length(); i++) {
|
for (int i = 0; i < resourceValue.length(); i++) {
|
||||||
|
@ -326,4 +436,53 @@ public class ResourceUtils {
|
||||||
nodeLock = null;
|
nodeLock = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Resource getResourceTypesMinimumAllocation() {
|
||||||
|
Map<String, ResourceInformation> resourceTypes = getResourceTypes();
|
||||||
|
Resource ret = Resource.newInstance(0, 0);
|
||||||
|
for (Map.Entry<String, ResourceInformation> entry : resourceTypes
|
||||||
|
.entrySet()) {
|
||||||
|
String name = entry.getKey();
|
||||||
|
if (name.equals(ResourceInformation.MEMORY_MB.getName())) {
|
||||||
|
ret.setMemorySize(entry.getValue().getMinimumAllocation());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (name.equals(ResourceInformation.VCORES.getName())) {
|
||||||
|
Long tmp = entry.getValue().getMinimumAllocation();
|
||||||
|
if (tmp > Integer.MAX_VALUE) {
|
||||||
|
tmp = (long) Integer.MAX_VALUE;
|
||||||
|
}
|
||||||
|
ret.setVirtualCores(tmp.intValue());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ret.setResourceValue(name, entry.getValue().getMinimumAllocation());
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a Resource object with for the maximum allocation possible.
|
||||||
|
* @return a Resource object with the maximum allocation for the scheduler
|
||||||
|
*/
|
||||||
|
public static Resource getResourceTypesMaximumAllocation() {
|
||||||
|
Map<String, ResourceInformation> resourceTypes = getResourceTypes();
|
||||||
|
Resource ret = Resource.newInstance(0, 0);
|
||||||
|
for (Map.Entry<String, ResourceInformation> entry : resourceTypes
|
||||||
|
.entrySet()) {
|
||||||
|
String name = entry.getKey();
|
||||||
|
if (name.equals(ResourceInformation.MEMORY_MB.getName())) {
|
||||||
|
ret.setMemorySize(entry.getValue().getMaximumAllocation());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (name.equals(ResourceInformation.VCORES.getName())) {
|
||||||
|
Long tmp = entry.getValue().getMaximumAllocation();
|
||||||
|
if (tmp > Integer.MAX_VALUE) {
|
||||||
|
tmp = (long) Integer.MAX_VALUE;
|
||||||
|
}
|
||||||
|
ret.setVirtualCores(tmp.intValue());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ret.setResourceValue(name, entry.getValue().getMaximumAllocation());
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,12 +75,12 @@ public class Resources {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
public void setMemory(int memory) {
|
public void setMemory(int memory) {
|
||||||
throw new RuntimeException(name + " cannot be modified!");
|
throw new RuntimeException(name + " cannot be modified!");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void setMemorySize(long memory) {
|
public void setMemorySize(long memory) {
|
||||||
throw new RuntimeException(name + " cannot be modified!");
|
throw new RuntimeException(name + " cannot be modified!");
|
||||||
}
|
}
|
||||||
|
@ -193,13 +193,7 @@ public class Resources {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Resource clone(Resource res) {
|
public static Resource clone(Resource res) {
|
||||||
Resource ret = Resource.newInstance(0, 0);
|
return Resource.newInstance(res);
|
||||||
for (Map.Entry<String, ResourceInformation> entry : res.getResources()
|
|
||||||
.entrySet()) {
|
|
||||||
ret.setResourceInformation(entry.getKey(),
|
|
||||||
ResourceInformation.newInstance(entry.getValue()));
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Resource addTo(Resource lhs, Resource rhs) {
|
public static Resource addTo(Resource lhs, Resource rhs) {
|
||||||
|
|
|
@ -96,6 +96,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -1327,8 +1328,51 @@ public abstract class AbstractYarnScheduler
|
||||||
* @param container Container.
|
* @param container Container.
|
||||||
*/
|
*/
|
||||||
public void asyncContainerRelease(RMContainer container) {
|
public void asyncContainerRelease(RMContainer container) {
|
||||||
this.rmContext.getDispatcher().getEventHandler()
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
.handle(new ReleaseContainerEvent(container));
|
new ReleaseContainerEvent(container));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get a Resource object with for the minimum allocation possible. If resource
|
||||||
|
* profiles are enabled then the 'minimum' resource profile will be used. If
|
||||||
|
* they are not enabled, use the minimums specified in the config files.
|
||||||
|
*
|
||||||
|
* @return a Resource object with the minimum allocation for the scheduler
|
||||||
|
*/
|
||||||
|
public Resource getMinimumAllocation() {
|
||||||
|
boolean profilesEnabled = getConfig()
|
||||||
|
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
|
||||||
|
Resource ret;
|
||||||
|
if (!profilesEnabled) {
|
||||||
|
ret = ResourceUtils.getResourceTypesMinimumAllocation();
|
||||||
|
} else {
|
||||||
|
ret = rmContext.getResourceProfilesManager().getMinimumProfile();
|
||||||
|
}
|
||||||
|
LOG.info("Minimum allocation = " + ret);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a Resource object with for the maximum allocation possible. If resource
|
||||||
|
* profiles are enabled then the 'maximum' resource profile will be used. If
|
||||||
|
* they are not enabled, use the maximums specified in the config files.
|
||||||
|
*
|
||||||
|
* @return a Resource object with the maximum allocation for the scheduler
|
||||||
|
*/
|
||||||
|
|
||||||
|
public Resource getMaximumAllocation() {
|
||||||
|
boolean profilesEnabled = getConfig()
|
||||||
|
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
|
||||||
|
Resource ret;
|
||||||
|
if (!profilesEnabled) {
|
||||||
|
ret = ResourceUtils.getResourceTypesMaximumAllocation();
|
||||||
|
} else {
|
||||||
|
ret = rmContext.getResourceProfilesManager().getMaximumProfile();
|
||||||
|
}
|
||||||
|
LOG.info("Maximum allocation = " + ret);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -222,8 +222,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
||||||
|
|
||||||
return Resources.createResource(
|
return Resources.createResource(
|
||||||
Math.min(configuredMaxAllocation.getMemorySize(), maxNodeMemory),
|
Math.min(configuredMaxAllocation.getMemorySize(), maxNodeMemory),
|
||||||
Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores)
|
Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores));
|
||||||
);
|
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -318,8 +318,8 @@ public class CapacityScheduler extends
|
||||||
this.csConfProvider.init(configuration);
|
this.csConfProvider.init(configuration);
|
||||||
this.conf = this.csConfProvider.loadConfiguration(configuration);
|
this.conf = this.csConfProvider.loadConfiguration(configuration);
|
||||||
validateConf(this.conf);
|
validateConf(this.conf);
|
||||||
this.minimumAllocation = this.conf.getMinimumAllocation();
|
this.minimumAllocation = super.getMinimumAllocation();
|
||||||
initMaximumResourceCapability(this.conf.getMaximumAllocation());
|
initMaximumResourceCapability(super.getMaximumAllocation());
|
||||||
this.calculator = this.conf.getResourceCalculator();
|
this.calculator = this.conf.getResourceCalculator();
|
||||||
this.usePortForNodeName = this.conf.getUsePortForNodeName();
|
this.usePortForNodeName = this.conf.getUsePortForNodeName();
|
||||||
this.applications = new ConcurrentHashMap<>();
|
this.applications = new ConcurrentHashMap<>();
|
||||||
|
|
|
@ -1282,8 +1282,8 @@ public class FairScheduler extends
|
||||||
this.conf = new FairSchedulerConfiguration(conf);
|
this.conf = new FairSchedulerConfiguration(conf);
|
||||||
validateConf(this.conf);
|
validateConf(this.conf);
|
||||||
authorizer = YarnAuthorizationProvider.getInstance(conf);
|
authorizer = YarnAuthorizationProvider.getInstance(conf);
|
||||||
minimumAllocation = this.conf.getMinimumAllocation();
|
minimumAllocation = super.getMinimumAllocation();
|
||||||
initMaximumResourceCapability(this.conf.getMaximumAllocation());
|
initMaximumResourceCapability(super.getMaximumAllocation());
|
||||||
incrAllocation = this.conf.getIncrementAllocation();
|
incrAllocation = this.conf.getIncrementAllocation();
|
||||||
updateReservationThreshold();
|
updateReservationThreshold();
|
||||||
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
||||||
|
|
|
@ -241,17 +241,8 @@ public class FifoScheduler extends
|
||||||
//Use ConcurrentSkipListMap because applications need to be ordered
|
//Use ConcurrentSkipListMap because applications need to be ordered
|
||||||
this.applications =
|
this.applications =
|
||||||
new ConcurrentSkipListMap<>();
|
new ConcurrentSkipListMap<>();
|
||||||
this.minimumAllocation =
|
this.minimumAllocation = super.getMinimumAllocation();
|
||||||
Resources.createResource(conf.getInt(
|
initMaximumResourceCapability(super.getMaximumAllocation());
|
||||||
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
|
|
||||||
initMaximumResourceCapability(
|
|
||||||
Resources.createResource(conf.getInt(
|
|
||||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
|
|
||||||
conf.getInt(
|
|
||||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)));
|
|
||||||
this.usePortForNodeName = conf.getBoolean(
|
this.usePortForNodeName = conf.getBoolean(
|
||||||
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
|
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
|
||||||
|
|
|
@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
@ -150,6 +151,7 @@ public class MockRM extends ResourceManager {
|
||||||
public MockRM(Configuration conf, RMStateStore store,
|
public MockRM(Configuration conf, RMStateStore store,
|
||||||
boolean useNullRMNodeLabelsManager, boolean useRealElector) {
|
boolean useNullRMNodeLabelsManager, boolean useRealElector) {
|
||||||
super();
|
super();
|
||||||
|
ResourceUtils.resetResourceTypes(conf);
|
||||||
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
|
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
|
||||||
this.useRealElector = useRealElector;
|
this.useRealElector = useRealElector;
|
||||||
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
|
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
|
||||||
|
|
|
@ -247,6 +247,7 @@ public class TestAppManager{
|
||||||
ResourceScheduler scheduler = mockResourceScheduler();
|
ResourceScheduler scheduler = mockResourceScheduler();
|
||||||
((RMContextImpl)rmContext).setScheduler(scheduler);
|
((RMContextImpl)rmContext).setScheduler(scheduler);
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
((RMContextImpl) rmContext).setYarnConfiguration(conf);
|
||||||
ApplicationMasterService masterService =
|
ApplicationMasterService masterService =
|
||||||
new ApplicationMasterService(rmContext, scheduler);
|
new ApplicationMasterService(rmContext, scheduler);
|
||||||
appMonitor = new TestRMAppManager(rmContext,
|
appMonitor = new TestRMAppManager(rmContext,
|
||||||
|
|
|
@ -116,6 +116,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Dom
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -215,6 +216,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
||||||
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
||||||
128);
|
128);
|
||||||
|
ResourceUtils.resetResourceTypes(conf);
|
||||||
scheduler.init(conf);
|
scheduler.init(conf);
|
||||||
scheduler.start();
|
scheduler.start();
|
||||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
@ -243,6 +245,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
|
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
|
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
|
||||||
|
ResourceUtils.resetResourceTypes(conf);
|
||||||
scheduler.init(conf);
|
scheduler.init(conf);
|
||||||
scheduler.reinitialize(conf, null);
|
scheduler.reinitialize(conf, null);
|
||||||
Assert.assertEquals(256, scheduler.getMinimumResourceCapability().getMemorySize());
|
Assert.assertEquals(256, scheduler.getMinimumResourceCapability().getMemorySize());
|
||||||
|
@ -260,6 +263,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
|
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
|
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
|
||||||
|
ResourceUtils.resetResourceTypes(conf);
|
||||||
scheduler.init(conf);
|
scheduler.init(conf);
|
||||||
scheduler.reinitialize(conf, null);
|
scheduler.reinitialize(conf, null);
|
||||||
Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getMemorySize());
|
Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getMemorySize());
|
||||||
|
|
|
@ -96,6 +96,7 @@ import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
|
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
|
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
|
||||||
|
@ -260,6 +261,7 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
|
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
|
||||||
failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
||||||
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
||||||
|
ResourceUtils.resetResourceTypes(conf);
|
||||||
|
|
||||||
if (useRpc && !useFixedPorts) {
|
if (useRpc && !useFixedPorts) {
|
||||||
throw new YarnRuntimeException("Invalid configuration!" +
|
throw new YarnRuntimeException("Invalid configuration!" +
|
||||||
|
|
Loading…
Reference in New Issue