YARN-4830. Add support for resource types in the nodemanager. Contributed by Varun Vasudev.
(cherry picked from commit 759114b006
)
This commit is contained in:
parent
92ca475fdc
commit
16beac12ad
|
@ -308,7 +308,8 @@ public abstract class Resource implements Comparable<Resource> {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (entry.getKey().equals(ResourceInformation.VCORES.getName())
|
if (entry.getKey().equals(ResourceInformation.VCORES.getName())
|
||||||
&& entry.getValue().getUnits().equals("")) {
|
&& entry.getValue().getUnits()
|
||||||
|
.equals(ResourceInformation.VCORES.getUnits())) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
sb.append(", ").append(entry.getKey()).append(": ")
|
sb.append(", ").append(entry.getKey()).append(": ")
|
||||||
|
|
|
@ -64,6 +64,10 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String RESOURCE_TYPES_CONFIGURATION_FILE =
|
public static final String RESOURCE_TYPES_CONFIGURATION_FILE =
|
||||||
"resource-types.xml";
|
"resource-types.xml";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String NODE_RESOURCES_CONFIGURATION_FILE =
|
||||||
|
"node-resources.xml";
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final List<String> RM_CONFIGURATION_FILES =
|
public static final List<String> RM_CONFIGURATION_FILES =
|
||||||
Collections.unmodifiableList(Arrays.asList(
|
Collections.unmodifiableList(Arrays.asList(
|
||||||
|
@ -74,6 +78,16 @@ public class YarnConfiguration extends Configuration {
|
||||||
YARN_SITE_CONFIGURATION_FILE,
|
YARN_SITE_CONFIGURATION_FILE,
|
||||||
CORE_SITE_CONFIGURATION_FILE));
|
CORE_SITE_CONFIGURATION_FILE));
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final List<String> NM_CONFIGURATION_FILES =
|
||||||
|
Collections.unmodifiableList(Arrays.asList(
|
||||||
|
NODE_RESOURCES_CONFIGURATION_FILE,
|
||||||
|
DR_CONFIGURATION_FILE,
|
||||||
|
CS_CONFIGURATION_FILE,
|
||||||
|
HADOOP_POLICY_CONFIGURATION_FILE,
|
||||||
|
YARN_SITE_CONFIGURATION_FILE,
|
||||||
|
CORE_SITE_CONFIGURATION_FILE));
|
||||||
|
|
||||||
@Evolving
|
@Evolving
|
||||||
public static final int APPLICATION_MAX_TAGS = 10;
|
public static final int APPLICATION_MAX_TAGS = 10;
|
||||||
|
|
||||||
|
@ -112,12 +126,15 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String YARN_PREFIX = "yarn.";
|
public static final String YARN_PREFIX = "yarn.";
|
||||||
|
|
||||||
/////////////////////////////
|
/////////////////////////////
|
||||||
// Scheduler resource types configs
|
// Resource types configs
|
||||||
////////////////////////////
|
////////////////////////////
|
||||||
|
|
||||||
public static final String RESOURCE_TYPES =
|
public static final String RESOURCE_TYPES =
|
||||||
YarnConfiguration.YARN_PREFIX + "resource-types";
|
YarnConfiguration.YARN_PREFIX + "resource-types";
|
||||||
|
|
||||||
|
public static final String NM_RESOURCES_PREFIX =
|
||||||
|
YarnConfiguration.NM_PREFIX + "resource-type.";
|
||||||
|
|
||||||
/** Delay before deleting resource to ease debugging of NM issues */
|
/** Delay before deleting resource to ease debugging of NM issues */
|
||||||
public static final String DEBUG_NM_DELETE_DELAY_SEC =
|
public static final String DEBUG_NM_DELETE_DELAY_SEC =
|
||||||
YarnConfiguration.NM_PREFIX + "delete.debug-delay-sec";
|
YarnConfiguration.NM_PREFIX + "delete.debug-delay-sec";
|
||||||
|
|
|
@ -51,7 +51,8 @@ public class FileSystemBasedConfigurationProvider
|
||||||
"Illegal argument! The parameter should not be null or empty");
|
"Illegal argument! The parameter should not be null or empty");
|
||||||
}
|
}
|
||||||
Path filePath;
|
Path filePath;
|
||||||
if (YarnConfiguration.RM_CONFIGURATION_FILES.contains(name)) {
|
if (YarnConfiguration.RM_CONFIGURATION_FILES.contains(name) ||
|
||||||
|
YarnConfiguration.NM_CONFIGURATION_FILES.contains(name)) {
|
||||||
filePath = new Path(this.configDir, name);
|
filePath = new Path(this.configDir, name);
|
||||||
if (!fs.exists(filePath)) {
|
if (!fs.exists(filePath)) {
|
||||||
LOG.info(filePath + " not found");
|
LOG.info(filePath + " not found");
|
||||||
|
|
|
@ -39,7 +39,8 @@ public class LocalConfigurationProvider extends ConfigurationProvider {
|
||||||
if (name == null || name.isEmpty()) {
|
if (name == null || name.isEmpty()) {
|
||||||
throw new YarnException(
|
throw new YarnException(
|
||||||
"Illegal argument! The parameter should not be null or empty");
|
"Illegal argument! The parameter should not be null or empty");
|
||||||
} else if (YarnConfiguration.RM_CONFIGURATION_FILES.contains(name)) {
|
} else if (YarnConfiguration.RM_CONFIGURATION_FILES.contains(name) ||
|
||||||
|
YarnConfiguration.NM_CONFIGURATION_FILES.contains(name)) {
|
||||||
return bootstrapConf.getConfResourceAsInputStream(name);
|
return bootstrapConf.getConfResourceAsInputStream(name);
|
||||||
}
|
}
|
||||||
return new FileInputStream(name);
|
return new FileInputStream(name);
|
||||||
|
|
|
@ -18,7 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||||
|
@ -38,6 +39,8 @@ import java.util.*;
|
||||||
@Unstable
|
@Unstable
|
||||||
public class ResourcePBImpl extends Resource {
|
public class ResourcePBImpl extends Resource {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(ResourcePBImpl.class);
|
||||||
|
|
||||||
ResourceProto proto = ResourceProto.getDefaultInstance();
|
ResourceProto proto = ResourceProto.getDefaultInstance();
|
||||||
ResourceProto.Builder builder = null;
|
ResourceProto.Builder builder = null;
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
@ -92,10 +95,12 @@ public class ResourcePBImpl extends Resource {
|
||||||
@Override
|
@Override
|
||||||
public long getMemorySize() {
|
public long getMemorySize() {
|
||||||
// memory should always be present
|
// memory should always be present
|
||||||
initResourcesMap();
|
initResources();
|
||||||
ResourceInformation ri =
|
ResourceInformation ri =
|
||||||
this.getResourceInformation(ResourceInformation.MEMORY_MB.getName());
|
this.getResourceInformation(ResourceInformation.MEMORY_MB.getName());
|
||||||
return UnitsConversionUtil.convert(ri.getUnits(), "Mi", ri.getValue());
|
return UnitsConversionUtil
|
||||||
|
.convert(ri.getUnits(), ResourceInformation.MEMORY_MB.getUnits(),
|
||||||
|
ri.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -115,21 +120,16 @@ public class ResourcePBImpl extends Resource {
|
||||||
@Override
|
@Override
|
||||||
public int getVirtualCores() {
|
public int getVirtualCores() {
|
||||||
// vcores should always be present
|
// vcores should always be present
|
||||||
initResourcesMap();
|
initResources();
|
||||||
return this.getResourceValue(ResourceInformation.VCORES.getName())
|
return this.getResourceValue(ResourceInformation.VCORES.getName())
|
||||||
.intValue();
|
.intValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setVirtualCores(int vCores) {
|
public void setVirtualCores(int vCores) {
|
||||||
try {
|
setResourceInformation(ResourceInformation.VCORES.getName(),
|
||||||
setResourceValue(ResourceInformation.VCORES.getName(),
|
ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
|
||||||
Long.valueOf(vCores));
|
ResourceInformation.VCORES.getUnits(), (long) vCores));
|
||||||
} catch (ResourceNotFoundException re) {
|
|
||||||
this.setResourceInformation(ResourceInformation.VCORES.getName(),
|
|
||||||
ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
|
|
||||||
(long) vCores));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initResources() {
|
private void initResources() {
|
||||||
|
@ -146,14 +146,16 @@ public class ResourcePBImpl extends Resource {
|
||||||
Long value = entry.hasValue() ? entry.getValue() : 0L;
|
Long value = entry.hasValue() ? entry.getValue() : 0L;
|
||||||
ResourceInformation ri =
|
ResourceInformation ri =
|
||||||
ResourceInformation.newInstance(entry.getKey(), units, value, type);
|
ResourceInformation.newInstance(entry.getKey(), units, value, type);
|
||||||
resources.put(ri.getName(), ri);
|
if (resources.containsKey(ri.getName())) {
|
||||||
}
|
resources.get(ri.getName()).setResourceType(ri.getResourceType());
|
||||||
if(this.getMemory() != p.getMemory()) {
|
resources.get(ri.getName()).setUnits(ri.getUnits());
|
||||||
setMemorySize(p.getMemory());
|
resources.get(ri.getName()).setValue(value);
|
||||||
}
|
} else {
|
||||||
if(this.getVirtualCores() != p.getVirtualCores()) {
|
LOG.warn("Got unknown resource type: " + ri.getName() + "; skipping");
|
||||||
setVirtualCores(p.getVirtualCores());
|
}
|
||||||
}
|
}
|
||||||
|
this.setMemorySize(p.getMemory());
|
||||||
|
this.setVirtualCores(p.getVirtualCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -167,7 +169,7 @@ public class ResourcePBImpl extends Resource {
|
||||||
if (!resource.equals(resourceInformation.getName())) {
|
if (!resource.equals(resourceInformation.getName())) {
|
||||||
resourceInformation.setName(resource);
|
resourceInformation.setName(resource);
|
||||||
}
|
}
|
||||||
initResourcesMap();
|
initResources();
|
||||||
resources.put(resource, resourceInformation);
|
resources.put(resource, resourceInformation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,6 +177,7 @@ public class ResourcePBImpl extends Resource {
|
||||||
public void setResourceValue(String resource, Long value)
|
public void setResourceValue(String resource, Long value)
|
||||||
throws ResourceNotFoundException {
|
throws ResourceNotFoundException {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
|
initResources();
|
||||||
if (resource == null) {
|
if (resource == null) {
|
||||||
throw new IllegalArgumentException("resource type object cannot be null");
|
throw new IllegalArgumentException("resource type object cannot be null");
|
||||||
}
|
}
|
||||||
|
@ -182,9 +185,7 @@ public class ResourcePBImpl extends Resource {
|
||||||
throw new ResourceNotFoundException(
|
throw new ResourceNotFoundException(
|
||||||
"Resource " + resource + " not found");
|
"Resource " + resource + " not found");
|
||||||
}
|
}
|
||||||
ResourceInformation ri = resources.get(resource);
|
resources.get(resource).setValue(value);
|
||||||
ri.setValue(value);
|
|
||||||
resources.put(resource, ri);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -229,8 +230,10 @@ public class ResourcePBImpl extends Resource {
|
||||||
synchronized private void mergeLocalToBuilder() {
|
synchronized private void mergeLocalToBuilder() {
|
||||||
builder.clearResourceValueMap();
|
builder.clearResourceValueMap();
|
||||||
if (resources != null && !resources.isEmpty()) {
|
if (resources != null && !resources.isEmpty()) {
|
||||||
for (Map.Entry<String, ResourceInformation> entry : resources.entrySet()) {
|
for (Map.Entry<String, ResourceInformation> entry :
|
||||||
ResourceInformationProto.Builder e = ResourceInformationProto.newBuilder();
|
resources.entrySet()) {
|
||||||
|
ResourceInformationProto.Builder e =
|
||||||
|
ResourceInformationProto.newBuilder();
|
||||||
e.setKey(entry.getKey());
|
e.setKey(entry.getKey());
|
||||||
e.setUnits(entry.getValue().getUnits());
|
e.setUnits(entry.getValue().getUnits());
|
||||||
e.setType(
|
e.setType(
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.util.resource;
|
package org.apache.hadoop.yarn.util.resource;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -51,15 +52,21 @@ public class ResourceUtils {
|
||||||
public static final String UNITS = ".units";
|
public static final String UNITS = ".units";
|
||||||
public static final String TYPE = ".type";
|
public static final String TYPE = ".type";
|
||||||
|
|
||||||
|
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
|
||||||
|
private static final String VCORES = ResourceInformation.VCORES.getName();
|
||||||
|
|
||||||
private static final Set<String> DISALLOWED_NAMES = new HashSet<>();
|
private static final Set<String> DISALLOWED_NAMES = new HashSet<>();
|
||||||
static {
|
static {
|
||||||
DISALLOWED_NAMES.add("memory");
|
DISALLOWED_NAMES.add("memory");
|
||||||
DISALLOWED_NAMES.add(ResourceInformation.MEMORY_MB.getName());
|
DISALLOWED_NAMES.add(MEMORY);
|
||||||
DISALLOWED_NAMES.add(ResourceInformation.VCORES.getName());
|
DISALLOWED_NAMES.add(VCORES);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static volatile Object lock;
|
private static volatile Object lock;
|
||||||
private static Map<String, ResourceInformation> readOnlyResources;
|
private static Map<String, ResourceInformation> readOnlyResources;
|
||||||
|
private static volatile Object nodeLock;
|
||||||
|
private static Map<String, ResourceInformation> readOnlyNodeResources;
|
||||||
|
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(ResourceUtils.class);
|
static final Log LOG = LogFactory.getLog(ResourceUtils.class);
|
||||||
|
|
||||||
|
@ -69,22 +76,20 @@ public class ResourceUtils {
|
||||||
private static void checkMandatatoryResources(
|
private static void checkMandatatoryResources(
|
||||||
Map<String, ResourceInformation> resourceInformationMap)
|
Map<String, ResourceInformation> resourceInformationMap)
|
||||||
throws YarnRuntimeException {
|
throws YarnRuntimeException {
|
||||||
String memory = ResourceInformation.MEMORY_MB.getName();
|
if (resourceInformationMap.containsKey(MEMORY)) {
|
||||||
String vcores = ResourceInformation.VCORES.getName();
|
ResourceInformation memInfo = resourceInformationMap.get(MEMORY);
|
||||||
if (resourceInformationMap.containsKey(memory)) {
|
|
||||||
ResourceInformation memInfo = resourceInformationMap.get(memory);
|
|
||||||
String memUnits = ResourceInformation.MEMORY_MB.getUnits();
|
String memUnits = ResourceInformation.MEMORY_MB.getUnits();
|
||||||
ResourceTypes memType = ResourceInformation.MEMORY_MB.getResourceType();
|
ResourceTypes memType = ResourceInformation.MEMORY_MB.getResourceType();
|
||||||
if (!memInfo.getUnits().equals(memUnits) || !memInfo.getResourceType()
|
if (!memInfo.getUnits().equals(memUnits) || !memInfo.getResourceType()
|
||||||
.equals(memType)) {
|
.equals(memType)) {
|
||||||
throw new YarnRuntimeException(
|
throw new YarnRuntimeException(
|
||||||
"Attempt to re-define mandatory resource 'memory-mb'. It can only"
|
"Attempt to re-define mandatory resource 'memory-mb'. It can only"
|
||||||
+ " be of type 'COUNTABLE' and have units 'M'.");
|
+ " be of type 'COUNTABLE' and have units 'Mi'.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (resourceInformationMap.containsKey(vcores)) {
|
if (resourceInformationMap.containsKey(VCORES)) {
|
||||||
ResourceInformation vcoreInfo = resourceInformationMap.get(vcores);
|
ResourceInformation vcoreInfo = resourceInformationMap.get(VCORES);
|
||||||
String vcoreUnits = ResourceInformation.VCORES.getUnits();
|
String vcoreUnits = ResourceInformation.VCORES.getUnits();
|
||||||
ResourceTypes vcoreType = ResourceInformation.VCORES.getResourceType();
|
ResourceTypes vcoreType = ResourceInformation.VCORES.getResourceType();
|
||||||
if (!vcoreInfo.getUnits().equals(vcoreUnits) || !vcoreInfo
|
if (!vcoreInfo.getUnits().equals(vcoreUnits) || !vcoreInfo
|
||||||
|
@ -99,21 +104,21 @@ public class ResourceUtils {
|
||||||
private static void addManadtoryResources(
|
private static void addManadtoryResources(
|
||||||
Map<String, ResourceInformation> res) {
|
Map<String, ResourceInformation> res) {
|
||||||
ResourceInformation ri;
|
ResourceInformation ri;
|
||||||
if (!res.containsKey(ResourceInformation.MEMORY_MB.getName())) {
|
if (!res.containsKey(MEMORY)) {
|
||||||
LOG.info("Adding resource type - name = " + ResourceInformation.MEMORY_MB
|
LOG.info("Adding resource type - name = " + MEMORY + ", units = "
|
||||||
.getName() + ", units = " + ResourceInformation.MEMORY_MB.getUnits()
|
+ ResourceInformation.MEMORY_MB.getUnits() + ", type = "
|
||||||
+ ", type = " + ResourceTypes.COUNTABLE);
|
+ ResourceTypes.COUNTABLE);
|
||||||
ri = ResourceInformation
|
ri = ResourceInformation
|
||||||
.newInstance(ResourceInformation.MEMORY_MB.getName(),
|
.newInstance(MEMORY,
|
||||||
ResourceInformation.MEMORY_MB.getUnits());
|
ResourceInformation.MEMORY_MB.getUnits());
|
||||||
res.put(ResourceInformation.MEMORY_MB.getName(), ri);
|
res.put(MEMORY, ri);
|
||||||
}
|
}
|
||||||
if (!res.containsKey(ResourceInformation.VCORES.getName())) {
|
if (!res.containsKey(VCORES)) {
|
||||||
LOG.info("Adding resource type - name = " + ResourceInformation.VCORES
|
LOG.info("Adding resource type - name = " + VCORES + ", units = , type = "
|
||||||
.getName() + ", units = , type = " + ResourceTypes.COUNTABLE);
|
+ ResourceTypes.COUNTABLE);
|
||||||
ri =
|
ri =
|
||||||
ResourceInformation.newInstance(ResourceInformation.VCORES.getName());
|
ResourceInformation.newInstance(VCORES);
|
||||||
res.put(ResourceInformation.VCORES.getName(), ri);
|
res.put(VCORES, ri);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,6 +127,7 @@ public class ResourceUtils {
|
||||||
Map<String, ResourceInformation> resourceInformationMap) {
|
Map<String, ResourceInformation> resourceInformationMap) {
|
||||||
|
|
||||||
String[] resourceNames = conf.getStrings(YarnConfiguration.RESOURCE_TYPES);
|
String[] resourceNames = conf.getStrings(YarnConfiguration.RESOURCE_TYPES);
|
||||||
|
|
||||||
if (resourceNames != null && resourceNames.length != 0) {
|
if (resourceNames != null && resourceNames.length != 0) {
|
||||||
for (String resourceName : resourceNames) {
|
for (String resourceName : resourceNames) {
|
||||||
String resourceUnits = conf.get(
|
String resourceUnits = conf.get(
|
||||||
|
@ -178,25 +184,13 @@ public class ResourceUtils {
|
||||||
conf = new YarnConfiguration();
|
conf = new YarnConfiguration();
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
InputStream ris = getConfInputStream(resourceFile, conf);
|
addResourcesFileToConf(resourceFile, conf);
|
||||||
LOG.debug("Found " + resourceFile + ", adding to configuration");
|
LOG.debug("Found " + resourceFile + ", adding to configuration");
|
||||||
conf.addResource(ris);
|
|
||||||
initializeResourcesMap(conf, resources);
|
initializeResourcesMap(conf, resources);
|
||||||
return resources;
|
|
||||||
} catch (FileNotFoundException fe) {
|
} catch (FileNotFoundException fe) {
|
||||||
LOG.info("Unable to find '" + resourceFile
|
LOG.info("Unable to find '" + resourceFile
|
||||||
+ "'. Falling back to memory and vcores as resources", fe);
|
+ "'. Falling back to memory and vcores as resources", fe);
|
||||||
initializeResourcesMap(conf, resources);
|
initializeResourcesMap(conf, resources);
|
||||||
} catch (IOException ie) {
|
|
||||||
LOG.fatal(
|
|
||||||
"Exception trying to read resource types configuration '"
|
|
||||||
+ resourceFile + "'.", ie);
|
|
||||||
throw new YarnRuntimeException(ie);
|
|
||||||
} catch (YarnException ye) {
|
|
||||||
LOG.fatal(
|
|
||||||
"YARN Exception trying to read resource types configuration '"
|
|
||||||
+ resourceFile + "'.", ye);
|
|
||||||
throw new YarnRuntimeException(ye);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -205,8 +199,8 @@ public class ResourceUtils {
|
||||||
return readOnlyResources;
|
return readOnlyResources;
|
||||||
}
|
}
|
||||||
|
|
||||||
static InputStream getConfInputStream(String resourceFile, Configuration conf)
|
private static InputStream getConfInputStream(String resourceFile,
|
||||||
throws IOException, YarnException {
|
Configuration conf) throws IOException, YarnException {
|
||||||
|
|
||||||
ConfigurationProvider provider =
|
ConfigurationProvider provider =
|
||||||
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
||||||
|
@ -222,8 +216,112 @@ public class ResourceUtils {
|
||||||
return ris;
|
return ris;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void addResourcesFileToConf(String resourceFile,
|
||||||
|
Configuration conf) throws FileNotFoundException {
|
||||||
|
try {
|
||||||
|
InputStream ris = getConfInputStream(resourceFile, conf);
|
||||||
|
LOG.debug("Found " + resourceFile + ", adding to configuration");
|
||||||
|
conf.addResource(ris);
|
||||||
|
} catch (FileNotFoundException fe) {
|
||||||
|
throw fe;
|
||||||
|
} catch (IOException ie) {
|
||||||
|
LOG.fatal("Exception trying to read resource types configuration '"
|
||||||
|
+ resourceFile + "'.", ie);
|
||||||
|
throw new YarnRuntimeException(ie);
|
||||||
|
} catch (YarnException ye) {
|
||||||
|
LOG.fatal("YARN Exception trying to read resource types configuration '"
|
||||||
|
+ resourceFile + "'.", ye);
|
||||||
|
throw new YarnRuntimeException(ye);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static void resetResourceTypes() {
|
static void resetResourceTypes() {
|
||||||
lock = null;
|
lock = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String getUnits(String resourceValue) {
|
||||||
|
String units;
|
||||||
|
for (int i = 0; i < resourceValue.length(); i++) {
|
||||||
|
if (Character.isAlphabetic(resourceValue.charAt(i))) {
|
||||||
|
units = resourceValue.substring(i);
|
||||||
|
if (StringUtils.isAlpha(units)) {
|
||||||
|
return units;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Function to get the resources for a node. This function will look at the
|
||||||
|
* file {@link YarnConfiguration#NODE_RESOURCES_CONFIGURATION_FILE} to
|
||||||
|
* determine the node resources.
|
||||||
|
*
|
||||||
|
* @param conf configuration file
|
||||||
|
* @return a map to resource name to the ResourceInformation object. The map
|
||||||
|
* is guaranteed to have entries for memory and vcores
|
||||||
|
*/
|
||||||
|
public static Map<String, ResourceInformation> getNodeResourceInformation(
|
||||||
|
Configuration conf) {
|
||||||
|
if (nodeLock == null) {
|
||||||
|
synchronized (ResourceUtils.class) {
|
||||||
|
if (nodeLock == null) {
|
||||||
|
synchronized (ResourceUtils.class) {
|
||||||
|
nodeLock = new Object();
|
||||||
|
Map<String, ResourceInformation> nodeResources =
|
||||||
|
initializeNodeResourceInformation(conf);
|
||||||
|
addManadtoryResources(nodeResources);
|
||||||
|
checkMandatatoryResources(nodeResources);
|
||||||
|
readOnlyNodeResources = Collections.unmodifiableMap(nodeResources);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return readOnlyNodeResources;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<String, ResourceInformation>
|
||||||
|
initializeNodeResourceInformation(Configuration conf) {
|
||||||
|
Map<String, ResourceInformation> nodeResources = new HashMap<>();
|
||||||
|
try {
|
||||||
|
addResourcesFileToConf(
|
||||||
|
YarnConfiguration.NODE_RESOURCES_CONFIGURATION_FILE, conf);
|
||||||
|
for (Map.Entry<String, String> entry : conf) {
|
||||||
|
String key = entry.getKey();
|
||||||
|
String value = entry.getValue();
|
||||||
|
if (key.startsWith(YarnConfiguration.NM_RESOURCES_PREFIX)) {
|
||||||
|
addResourceInformation(key, value, nodeResources);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (FileNotFoundException fe) {
|
||||||
|
LOG.info("Couldn't find node resources file");
|
||||||
|
}
|
||||||
|
return nodeResources;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void addResourceInformation(String prop, String value,
|
||||||
|
Map<String, ResourceInformation> nodeResources) {
|
||||||
|
String[] parts = prop.split("\\.");
|
||||||
|
LOG.info("Found resource entry " + prop);
|
||||||
|
if (parts.length == 4) {
|
||||||
|
String resourceType = parts[3];
|
||||||
|
if (!nodeResources.containsKey(resourceType)) {
|
||||||
|
nodeResources
|
||||||
|
.put(resourceType, ResourceInformation.newInstance(resourceType));
|
||||||
|
}
|
||||||
|
String units = getUnits(value);
|
||||||
|
Long resourceValue =
|
||||||
|
Long.valueOf(value.substring(0, value.length() - units.length()));
|
||||||
|
nodeResources.get(resourceType).setValue(resourceValue);
|
||||||
|
nodeResources.get(resourceType).setUnits(units);
|
||||||
|
LOG.debug("Setting value for resource type " + resourceType + " to "
|
||||||
|
+ resourceValue + " with units " + units);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
synchronized public static void resetNodeResources() {
|
||||||
|
nodeLock = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,9 +21,9 @@ package org.apache.hadoop.yarn.util.resource;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -245,4 +245,31 @@ public class TestResourceUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetResourceInformation() throws Exception {
|
||||||
|
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
Map<String, Resource> testRun = new HashMap<>();
|
||||||
|
// testRun.put("node-resources-1.xml", Resource.newInstance(1024, 1));
|
||||||
|
Resource test3Resources = Resource.newInstance(1024, 1);
|
||||||
|
test3Resources.setResourceInformation("resource1",
|
||||||
|
ResourceInformation.newInstance("resource1", "Gi", 5L));
|
||||||
|
test3Resources.setResourceInformation("resource2",
|
||||||
|
ResourceInformation.newInstance("resource2", "m", 2L));
|
||||||
|
testRun.put("node-resources-2.xml", test3Resources);
|
||||||
|
|
||||||
|
for (Map.Entry<String, Resource> entry : testRun.entrySet()) {
|
||||||
|
String resourceFile = entry.getKey();
|
||||||
|
ResourceUtils.resetNodeResources();
|
||||||
|
File dest;
|
||||||
|
File source =
|
||||||
|
new File(conf.getClassLoader().getResource(resourceFile).getFile());
|
||||||
|
dest = new File(source.getParent(), "node-resources.xml");
|
||||||
|
FileUtils.copyFile(source, dest);
|
||||||
|
Map<String, ResourceInformation> actual =
|
||||||
|
ResourceUtils.getNodeResourceInformation(conf);
|
||||||
|
Assert.assertEquals(entry.getValue().getResources(), actual);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||||
|
<!--
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License. See accompanying LICENSE file.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<configuration>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.resource.memory-mb</name>
|
||||||
|
<value>1024</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.resource.vcores</name>
|
||||||
|
<value>1</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
|
@ -0,0 +1,39 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||||
|
<!--
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License. See accompanying LICENSE file.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<configuration>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.resource-type.memory-mb</name>
|
||||||
|
<value>1024Mi</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.resource-type.vcores</name>
|
||||||
|
<value>1</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.resource-type.resource1</name>
|
||||||
|
<value>5Gi</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.resource-type.resource2</name>
|
||||||
|
<value>2m</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
|
@ -174,18 +174,19 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
int memoryMb = NodeManagerHardwareUtils.getContainerMemoryMB(conf);
|
this.totalResource = NodeManagerHardwareUtils.getNodeResources(conf);
|
||||||
|
int memoryMb = totalResource.getMemory();
|
||||||
float vMemToPMem =
|
float vMemToPMem =
|
||||||
conf.getFloat(
|
conf.getFloat(
|
||||||
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
||||||
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
||||||
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
|
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
|
||||||
|
|
||||||
int virtualCores = NodeManagerHardwareUtils.getVCores(conf);
|
int virtualCores = totalResource.getVirtualCores();
|
||||||
LOG.info("Nodemanager resources: memory set to " + memoryMb + "MB.");
|
LOG.info("Nodemanager resources: memory set to " + memoryMb + "MB.");
|
||||||
LOG.info("Nodemanager resources: vcores set to " + virtualCores + ".");
|
LOG.info("Nodemanager resources: vcores set to " + virtualCores + ".");
|
||||||
|
LOG.info("Nodemanager resources: " + totalResource);
|
||||||
|
|
||||||
this.totalResource = Resource.newInstance(memoryMb, virtualCores);
|
|
||||||
metrics.addResource(totalResource);
|
metrics.addResource(totalResource);
|
||||||
|
|
||||||
// Get actual node physical resources
|
// Get actual node physical resources
|
||||||
|
|
|
@ -21,10 +21,16 @@ package org.apache.hadoop.yarn.server.nodemanager.util;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class to determine hardware related characteristics such as the
|
* Helper class to determine hardware related characteristics such as the
|
||||||
|
@ -332,4 +338,50 @@ public class NodeManagerHardwareUtils {
|
||||||
}
|
}
|
||||||
return memoryMb;
|
return memoryMb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the resources for the node.
|
||||||
|
* @param configuration configuration file
|
||||||
|
* @return the resources for the node
|
||||||
|
*/
|
||||||
|
public static Resource getNodeResources(Configuration configuration) {
|
||||||
|
Configuration conf = new Configuration(configuration);
|
||||||
|
String memory = ResourceInformation.MEMORY_MB.getName();
|
||||||
|
String vcores = ResourceInformation.VCORES.getName();
|
||||||
|
|
||||||
|
Resource ret = Resource.newInstance(0, 0);
|
||||||
|
Map<String, ResourceInformation> resourceInformation =
|
||||||
|
ResourceUtils.getNodeResourceInformation(conf);
|
||||||
|
for (Map.Entry<String, ResourceInformation> entry : resourceInformation
|
||||||
|
.entrySet()) {
|
||||||
|
ret.setResourceInformation(entry.getKey(), entry.getValue());
|
||||||
|
LOG.debug("Setting key " + entry.getKey() + " to " + entry.getValue());
|
||||||
|
}
|
||||||
|
if (resourceInformation.containsKey(memory)) {
|
||||||
|
Long value = resourceInformation.get(memory).getValue();
|
||||||
|
if (value > Integer.MAX_VALUE) {
|
||||||
|
throw new YarnRuntimeException("Value '" + value
|
||||||
|
+ "' for resource memory is more than the maximum for an integer.");
|
||||||
|
}
|
||||||
|
ResourceInformation memResInfo = resourceInformation.get(memory);
|
||||||
|
if(memResInfo.getValue() == 0) {
|
||||||
|
ret.setMemory(getContainerMemoryMB(conf));
|
||||||
|
LOG.debug("Set memory to " + ret.getMemory());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (resourceInformation.containsKey(vcores)) {
|
||||||
|
Long value = resourceInformation.get(vcores).getValue();
|
||||||
|
if (value > Integer.MAX_VALUE) {
|
||||||
|
throw new YarnRuntimeException("Value '" + value
|
||||||
|
+ "' for resource vcores is more than the maximum for an integer.");
|
||||||
|
}
|
||||||
|
ResourceInformation vcoresResInfo = resourceInformation.get(vcores);
|
||||||
|
if(vcoresResInfo.getValue() == 0) {
|
||||||
|
ret.setVirtualCores(getVCores(conf));
|
||||||
|
LOG.debug("Set vcores to " + ret.getVirtualCores());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.debug("Node resource information map is " + ret);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -376,10 +376,11 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
// Check if this node has minimum allocations
|
// Check if this node has minimum allocations
|
||||||
if (capability.getMemorySize() < minAllocMb
|
if (capability.getMemorySize() < minAllocMb
|
||||||
|| capability.getVirtualCores() < minAllocVcores) {
|
|| capability.getVirtualCores() < minAllocVcores) {
|
||||||
String message =
|
String message = "NodeManager from " + host
|
||||||
"NodeManager from " + host
|
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
|
||||||
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
|
+ " signal to the NodeManager. Node capabilities are " + capability
|
||||||
+ " signal to the NodeManager.";
|
+ "; minimums are " + minAllocMb + "mb and " + minAllocVcores
|
||||||
|
+ " vcores";
|
||||||
LOG.info(message);
|
LOG.info(message);
|
||||||
response.setDiagnosticsMessage(message);
|
response.setDiagnosticsMessage(message);
|
||||||
response.setNodeAction(NodeAction.SHUTDOWN);
|
response.setNodeAction(NodeAction.SHUTDOWN);
|
||||||
|
|
Loading…
Reference in New Issue