YARN-7242. Support to specify values of different resource types in DistributedShell for easier testing. Contributed by Gergely Novák.

This commit is contained in:
Sunil G 2018-01-08 11:59:06 +05:30
parent c2d6fa3656
commit 01f3f2167e
3 changed files with 339 additions and 40 deletions

View File

@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@ -244,6 +245,9 @@ public class ApplicationMaster {
// VirtualCores to request for the container on which the shell command will run // VirtualCores to request for the container on which the shell command will run
private static final int DEFAULT_CONTAINER_VCORES = 1; private static final int DEFAULT_CONTAINER_VCORES = 1;
private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
// All other resources to request for the container
// on which the shell command will run
private Map<String, Long> containerResources = new HashMap<>();
// Priority of the request // Priority of the request
private int requestPriority; private int requestPriority;
// Execution type of the containers. // Execution type of the containers.
@ -431,6 +435,10 @@ public class ApplicationMaster {
"Amount of memory in MB to be requested to run the shell command"); "Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true, opts.addOption("container_vcores", true,
"Amount of virtual cores to be requested to run the shell command"); "Amount of virtual cores to be requested to run the shell command");
opts.addOption("container_resources", true,
"Amount of resources to be requested to run the shell command. " +
"Specified as resource type=value pairs separated by commas. " +
"E.g. -container_resources memory-mb=512,vcores=1");
opts.addOption("container_resource_profile", true, opts.addOption("container_resource_profile", true,
"Resource profile to be requested to run the shell command"); "Resource profile to be requested to run the shell command");
opts.addOption("num_containers", true, opts.addOption("num_containers", true,
@ -590,6 +598,14 @@ public class ApplicationMaster {
"container_memory", "-1")); "container_memory", "-1"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
"container_vcores", "-1")); "container_vcores", "-1"));
containerResources = new HashMap<>();
if (cliParser.hasOption("container_resources")) {
Map<String, Long> resources = Client.parseResourcesString(
cliParser.getOptionValue("container_resources"));
for (Map.Entry<String, Long> entry : resources.entrySet()) {
containerResources.put(entry.getKey(), entry.getValue());
}
}
containerResourceProfile = containerResourceProfile =
cliParser.getOptionValue("container_resource_profile", ""); cliParser.getOptionValue("container_resource_profile", "");
numTotalContainers = Integer.parseInt(cliParser.getOptionValue( numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
@ -711,6 +727,7 @@ public class ApplicationMaster {
.registerApplicationMaster(appMasterHostname, appMasterRpcPort, .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl); appMasterTrackingUrl);
resourceProfiles = response.getResourceProfiles(); resourceProfiles = response.getResourceProfiles();
ResourceUtils.reinitializeResources(response.getResourceTypes());
// Dump out information about cluster capability as seen by the // Dump out information about cluster capability as seen by the
// resource manager // resource manager
long maxMem = response.getMaximumResourceCapability().getMemorySize(); long maxMem = response.getMaximumResourceCapability().getMemorySize();
@ -1593,6 +1610,9 @@ public class ApplicationMaster {
containerVirtualCores; containerVirtualCores;
resourceCapability.setMemorySize(containerMemory); resourceCapability.setMemorySize(containerMemory);
resourceCapability.setVirtualCores(containerVirtualCores); resourceCapability.setVirtualCores(containerVirtualCores);
for (Map.Entry<String, Long> entry : containerResources.entrySet()) {
resourceCapability.setResourceValue(entry.getKey(), entry.getValue());
}
} }
String profileName = containerResourceProfile; String profileName = containerResourceProfile;

View File

@ -29,6 +29,7 @@ import java.util.Set;
import java.util.Vector; import java.util.Vector;
import java.util.Arrays; import java.util.Arrays;
import com.google.common.base.Joiner;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.HelpFormatter;
@ -70,7 +71,9 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -81,8 +84,11 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.client.util.YarnClientUtils; import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException; import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -144,7 +150,8 @@ public class Client {
private long amMemory = DEFAULT_AM_MEMORY; private long amMemory = DEFAULT_AM_MEMORY;
// Amt. of virtual core resource to request for to run the App Master // Amt. of virtual core resource to request for to run the App Master
private int amVCores = DEFAULT_AM_VCORES; private int amVCores = DEFAULT_AM_VCORES;
// Amount of resources to request to run the App Master
private Map<String, Long> amResources = new HashMap<>();
// AM resource profile // AM resource profile
private String amResourceProfile = ""; private String amResourceProfile = "";
@ -168,6 +175,9 @@ public class Client {
private long containerMemory = DEFAULT_CONTAINER_MEMORY; private long containerMemory = DEFAULT_CONTAINER_MEMORY;
// Amt. of virtual cores to request for container in which shell script will be executed // Amt. of virtual cores to request for container in which shell script will be executed
private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
// Amt. of resources to request for container
// in which shell script will be executed
private Map<String, Long> containerResources = new HashMap<>();
// container resource profile // container resource profile
private String containerResourceProfile = ""; private String containerResourceProfile = "";
// No. of containers in which the shell script needs to be executed // No. of containers in which the shell script needs to be executed
@ -265,6 +275,8 @@ public class Client {
Client(String appMasterMainClass, Configuration conf) { Client(String appMasterMainClass, Configuration conf) {
this.conf = conf; this.conf = conf;
this.conf.setBoolean(
YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, true);
this.appMasterMainClass = appMasterMainClass; this.appMasterMainClass = appMasterMainClass;
yarnClient = YarnClient.createYarnClient(); yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf); yarnClient.init(conf);
@ -274,7 +286,12 @@ public class Client {
opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
opts.addOption("timeout", true, "Application timeout in milliseconds"); opts.addOption("timeout", true, "Application timeout in milliseconds");
opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); opts.addOption("master_vcores", true, "Amount of virtual cores " +
"to be requested to run the application master");
opts.addOption("master_resources", true, "Amount of resources " +
"to be requested to run the application master. " +
"Specified as resource type=value pairs separated by commas." +
"E.g. -master_resources memory-mb=512,vcores=2");
opts.addOption("jar", true, "Jar file containing the application master"); opts.addOption("jar", true, "Jar file containing the application master");
opts.addOption("master_resource_profile", true, "Resource profile for the application master"); opts.addOption("master_resource_profile", true, "Resource profile for the application master");
opts.addOption("shell_command", true, "Shell command to be executed by " + opts.addOption("shell_command", true, "Shell command to be executed by " +
@ -290,8 +307,14 @@ public class Client {
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_type", true, opts.addOption("container_type", true,
"Container execution type, GUARANTEED or OPPORTUNISTIC"); "Container execution type, GUARANTEED or OPPORTUNISTIC");
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_memory", true, "Amount of memory in MB " +
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); "to be requested to run the shell command");
opts.addOption("container_vcores", true, "Amount of virtual cores " +
"to be requested to run the shell command");
opts.addOption("container_resources", true, "Amount of resources " +
"to be requested to run the shell command. " +
"Specified as resource type=value pairs separated by commas. " +
"E.g. -container_resources memory-mb=256,vcores=1");
opts.addOption("container_resource_profile", true, "Resource profile for the shell command"); opts.addOption("container_resource_profile", true, "Resource profile for the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("promote_opportunistic_after_start", false, opts.addOption("promote_opportunistic_after_start", false,
@ -403,6 +426,19 @@ public class Client {
Integer.parseInt(cliParser.getOptionValue("master_memory", "-1")); Integer.parseInt(cliParser.getOptionValue("master_memory", "-1"));
amVCores = amVCores =
Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1")); Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1"));
if (cliParser.hasOption("master_resources")) {
Map<String, Long> masterResources =
parseResourcesString(cliParser.getOptionValue("master_resources"));
for (Map.Entry<String, Long> entry : masterResources.entrySet()) {
if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) {
amMemory = entry.getValue();
} else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) {
amVCores = entry.getValue().intValue();
} else {
amResources.put(entry.getKey(), entry.getValue());
}
}
}
amResourceProfile = cliParser.getOptionValue("master_resource_profile", ""); amResourceProfile = cliParser.getOptionValue("master_resource_profile", "");
if (!cliParser.hasOption("jar")) { if (!cliParser.hasOption("jar")) {
@ -461,6 +497,19 @@ public class Client {
Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); Integer.parseInt(cliParser.getOptionValue("container_memory", "-1"));
containerVirtualCores = containerVirtualCores =
Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1")); Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1"));
if (cliParser.hasOption("container_resources")) {
Map<String, Long> resources =
parseResourcesString(cliParser.getOptionValue("container_resources"));
for (Map.Entry<String, Long> entry : resources.entrySet()) {
if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) {
containerMemory = entry.getValue();
} else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) {
containerVirtualCores = entry.getValue().intValue();
} else {
containerResources.put(entry.getKey(), entry.getValue());
}
}
}
containerResourceProfile = containerResourceProfile =
cliParser.getOptionValue("container_resource_profile", ""); cliParser.getOptionValue("container_resource_profile", "");
numContainers = numContainers =
@ -637,9 +686,9 @@ public class Client {
// Set up resource type requirements // Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and // For now, both memory and vcores are supported, so we set memory and
// vcores requirements // vcores requirements
setAMResourceCapability(appContext, amMemory, amVCores, amResourceProfile, List<ResourceTypeInfo> resourceTypes = yarnClient.getResourceTypeInfo();
amPriority, profiles); setAMResourceCapability(appContext, profiles, resourceTypes);
setContainerResources(containerMemory, containerVirtualCores, profiles); setContainerResources(profiles, resourceTypes);
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName); appContext.setApplicationName(appName);
@ -776,6 +825,10 @@ public class Client {
if (containerVirtualCores > 0) { if (containerVirtualCores > 0) {
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
} }
if (!containerResources.isEmpty()) {
Joiner.MapJoiner joiner = Joiner.on(',').withKeyValueSeparator("=");
vargs.add("--container_resources " + joiner.join(containerResources));
}
if (containerResourceProfile != null && !containerResourceProfile if (containerResourceProfile != null && !containerResourceProfile
.isEmpty()) { .isEmpty()) {
vargs.add("--container_resource_profile " + containerResourceProfile); vargs.add("--container_resource_profile " + containerResourceProfile);
@ -1011,25 +1064,26 @@ public class Client {
} }
private void setAMResourceCapability(ApplicationSubmissionContext appContext, private void setAMResourceCapability(ApplicationSubmissionContext appContext,
long memory, int vcores, String profile, int priority, Map<String, Resource> profiles, List<ResourceTypeInfo> resourceTypes)
Map<String, Resource> profiles) throws IllegalArgumentException { throws IllegalArgumentException, IOException, YarnException {
if (memory < -1 || memory == 0) { if (amMemory < -1 || amMemory == 0) {
throw new IllegalArgumentException("Invalid memory specified for" throw new IllegalArgumentException("Invalid memory specified for"
+ " application master, exiting. Specified memory=" + memory); + " application master, exiting. Specified memory=" + amMemory);
} }
if (vcores < -1 || vcores == 0) { if (amVCores < -1 || amVCores == 0) {
throw new IllegalArgumentException("Invalid virtual cores specified for" throw new IllegalArgumentException("Invalid virtual cores specified for"
+ " application master, exiting. Specified virtual cores=" + vcores); + " application master, exiting. " +
"Specified virtual cores=" + amVCores);
} }
String tmp = profile; String tmp = amResourceProfile;
if (profile.isEmpty()) { if (amResourceProfile.isEmpty()) {
tmp = "default"; tmp = "default";
} }
if (appContext.getAMContainerResourceRequests() == null) { if (appContext.getAMContainerResourceRequests() == null) {
List<ResourceRequest> amResourceRequests = new ArrayList<ResourceRequest>(); List<ResourceRequest> amResourceRequests = new ArrayList<ResourceRequest>();
amResourceRequests amResourceRequests
.add(ResourceRequest.newInstance(Priority.newInstance(priority), "*", .add(ResourceRequest.newInstance(Priority.newInstance(amPriority),
Resources.clone(Resources.none()), 1)); "*", Resources.clone(Resources.none()), 1));
appContext.setAMContainerResourceRequests(amResourceRequests); appContext.setAMContainerResourceRequests(amResourceRequests);
} }
@ -1038,36 +1092,90 @@ public class Client {
appContext.getAMContainerResourceRequests().get(0).setProfileCapability( appContext.getAMContainerResourceRequests().get(0).setProfileCapability(
ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0))); ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0)));
} }
Resource capability = Resource.newInstance(0, 0); Resource capability = Resource.newInstance(0, 0);
validateResourceTypes(amResources.keySet(), resourceTypes);
for (Map.Entry<String, Long> entry : amResources.entrySet()) {
capability.setResourceValue(entry.getKey(), entry.getValue());
}
// set amMemory because it's used to set Xmx param // set amMemory because it's used to set Xmx param
if (profiles == null) { if (amMemory == -1) {
amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory; amMemory = (profiles == null) ? DEFAULT_AM_MEMORY :
amVCores = vcores == -1 ? DEFAULT_AM_VCORES : vcores; profiles.get(tmp).getMemorySize();
}
if (amVCores == -1) {
amVCores = (profiles == null) ? DEFAULT_AM_VCORES :
profiles.get(tmp).getVirtualCores();
}
capability.setMemorySize(amMemory); capability.setMemorySize(amMemory);
capability.setVirtualCores(amVCores); capability.setVirtualCores(amVCores);
} else {
amMemory = memory == -1 ? profiles.get(tmp).getMemorySize() : memory;
amVCores = vcores == -1 ? profiles.get(tmp).getVirtualCores() : vcores;
capability.setMemorySize(memory);
capability.setVirtualCores(vcores);
}
appContext.getAMContainerResourceRequests().get(0).getProfileCapability() appContext.getAMContainerResourceRequests().get(0).getProfileCapability()
.setProfileCapabilityOverride(capability); .setProfileCapabilityOverride(capability);
} }
private void setContainerResources(long memory, int vcores, private void setContainerResources(Map<String, Resource> profiles,
Map<String, Resource> profiles) throws IllegalArgumentException { List<ResourceTypeInfo> resourceTypes) throws IllegalArgumentException {
if (memory < -1 || memory == 0) { if (containerMemory < -1 || containerMemory == 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException("Container memory '" +
"Container memory '" + memory + "' has to be greated than 0"); containerMemory + "' has to be greated than 0");
} }
if (vcores < -1 || vcores == 0) { if (containerVirtualCores < -1 || containerVirtualCores == 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException("Container vcores '" +
"Container vcores '" + vcores + "' has to be greated than 0"); containerVirtualCores + "' has to be greated than 0");
} }
validateResourceTypes(containerResources.keySet(), resourceTypes);
if (profiles == null) { if (profiles == null) {
containerMemory = memory == -1 ? DEFAULT_CONTAINER_MEMORY : memory; containerMemory = containerMemory == -1 ?
containerVirtualCores = vcores == -1 ? DEFAULT_CONTAINER_VCORES : vcores; DEFAULT_CONTAINER_MEMORY : containerMemory;
containerVirtualCores = containerVirtualCores == -1 ?
DEFAULT_CONTAINER_VCORES : containerVirtualCores;
}
}
private void validateResourceTypes(Iterable<String> resourceNames,
List<ResourceTypeInfo> resourceTypes) {
for (String resourceName : resourceNames) {
if (!resourceTypes.stream().anyMatch(e ->
e.getName().equals(resourceName))) {
throw new ResourceNotFoundException("Unknown resource: " +
resourceName);
} }
} }
} }
static Map<String, Long> parseResourcesString(String resourcesStr) {
Map<String, Long> resources = new HashMap<>();
// Ignore the grouping "[]"
if (resourcesStr.startsWith("[")) {
resourcesStr = resourcesStr.substring(1);
}
if (resourcesStr.endsWith("]")) {
resourcesStr = resourcesStr.substring(0, resourcesStr.length());
}
for (String resource : resourcesStr.trim().split(",")) {
resource = resource.trim();
if (!resource.matches("^[^=]+=\\d+\\s?\\w*$")) {
throw new IllegalArgumentException("\"" + resource + "\" is not a " +
"valid resource type/amount pair. " +
"Please provide key=amount pairs separated by commas.");
}
String[] splits = resource.split("=");
String key = splits[0], value = splits[1];
String units = ResourceUtils.getUnits(value);
String valueWithoutUnit = value.substring(
0, value.length() - units.length()).trim();
Long resourceValue = Long.valueOf(valueWithoutUnit);
if (!units.isEmpty()) {
resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
}
if (key.equals("memory")) {
key = ResourceInformation.MEMORY_URI;
}
resources.put(key, resourceValue);
}
return resources;
}
}

View File

@ -38,6 +38,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -54,11 +55,15 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@ -71,11 +76,11 @@ import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter; import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion; import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
@ -89,6 +94,7 @@ import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -112,6 +118,7 @@ public class TestDistributedShell {
private static final int NUM_NMS = 1; private static final int NUM_NMS = 1;
private static final float DEFAULT_TIMELINE_VERSION = 1.0f; private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
private static final int MIN_ALLOCATION_MB = 128;
protected final static String APPMASTER_JAR = protected final static String APPMASTER_JAR =
JarFinder.getJar(ApplicationMaster.class); JarFinder.getJar(ApplicationMaster.class);
@ -138,7 +145,8 @@ public class TestDistributedShell {
LOG.info("Starting up YARN cluster"); LOG.info("Starting up YARN cluster");
conf = new YarnConfiguration(); conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
MIN_ALLOCATION_MB);
// reduce the teardown waiting time // reduce the teardown waiting time
conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000); conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
conf.set("yarn.log.dir", "target"); conf.set("yarn.log.dir", "target");
@ -1433,4 +1441,167 @@ public class TestDistributedShell {
Assert.fail("Job execution with opportunistic containers failed."); Assert.fail("Job execution with opportunistic containers failed.");
} }
} }
@Test
@TimelineVersion(2.0f)
public void testDistributedShellWithResources() throws Exception {
doTestDistributedShellWithResources(false);
}
@Test
@TimelineVersion(2.0f)
public void testDistributedShellWithResourcesWithLargeContainers()
throws Exception {
doTestDistributedShellWithResources(true);
}
public void doTestDistributedShellWithResources(boolean largeContainers)
throws Exception {
Resource clusterResource = yarnCluster.getResourceManager()
.getResourceScheduler().getClusterResource();
String masterMemoryString = "1 Gi";
String containerMemoryString = "512 Mi";
long masterMemory = 1024;
long containerMemory = 512;
Assume.assumeTrue("The cluster doesn't have enough memory for this test",
clusterResource.getMemorySize() >= masterMemory + containerMemory);
Assume.assumeTrue("The cluster doesn't have enough cores for this test",
clusterResource.getVirtualCores() >= 2);
if (largeContainers) {
masterMemory = clusterResource.getMemorySize() * 2 / 3;
masterMemory = masterMemory - masterMemory % MIN_ALLOCATION_MB;
masterMemoryString = masterMemory + "Mi";
containerMemory = clusterResource.getMemorySize() / 3;
containerMemory = containerMemory - containerMemory % MIN_ALLOCATION_MB;
containerMemoryString = String.valueOf(containerMemory);
}
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"2",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--master_resources",
"memory=" + masterMemoryString + ",vcores=1",
"--container_resources",
"memory=" + containerMemoryString + ",vcores=1",
};
LOG.info("Initializing DS Client");
Client client = new Client(new Configuration(yarnCluster.getConfig()));
Assert.assertTrue(client.init(args));
LOG.info("Running DS Client");
final AtomicBoolean result = new AtomicBoolean(false);
Thread t = new Thread() {
public void run() {
try {
result.set(client.run());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
t.start();
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration(yarnCluster.getConfig()));
yarnClient.start();
while (true) {
List<ApplicationReport> apps = yarnClient.getApplications();
if (apps.isEmpty()) {
Thread.sleep(10);
continue;
}
ApplicationReport appReport = apps.get(0);
ApplicationId appId = appReport.getApplicationId();
List<ApplicationAttemptReport> appAttempts =
yarnClient.getApplicationAttempts(appId);
if (appAttempts.isEmpty()) {
Thread.sleep(10);
continue;
}
ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
ContainerId amContainerId = appAttemptReport.getAMContainerId();
if (amContainerId == null) {
Thread.sleep(10);
continue;
}
ContainerReport report = yarnClient.getContainerReport(amContainerId);
Resource masterResource = report.getAllocatedResource();
Assert.assertEquals(masterMemory, masterResource.getMemorySize());
Assert.assertEquals(1, masterResource.getVirtualCores());
List<ContainerReport> containers =
yarnClient.getContainers(appAttemptReport.getApplicationAttemptId());
if (containers.size() < 2) {
Thread.sleep(10);
continue;
}
for (ContainerReport container : containers) {
if (!container.getContainerId().equals(amContainerId)) {
Resource containerResource = container.getAllocatedResource();
Assert.assertEquals(containerMemory,
containerResource.getMemorySize());
Assert.assertEquals(1, containerResource.getVirtualCores());
}
}
return;
}
}
@Test(expected=IllegalArgumentException.class)
public void testDistributedShellAMResourcesWithIllegalArguments()
throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"1",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--master_resources",
"memory-mb=invalid"
};
Client client = new Client(new Configuration(yarnCluster.getConfig()));
client.init(args);
}
@Test(expected=MissingArgumentException.class)
public void testDistributedShellAMResourcesWithMissingArgumentValue()
throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"1",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--master_resources"
};
Client client = new Client(new Configuration(yarnCluster.getConfig()));
client.init(args);
}
@Test(expected=ResourceNotFoundException.class)
public void testDistributedShellAMResourcesWithUnknownResource()
throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"1",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--master_resources",
"unknown-resource=5"
};
Client client = new Client(new Configuration(yarnCluster.getConfig()));
client.init(args);
client.run();
}
} }