diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b3fa0ff3c0d..85496d06198 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.TimelineServiceHelper; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.LogManager; @@ -244,6 +245,9 @@ public enum DSEntity { // VirtualCores to request for the container on which the shell command will run private static final int DEFAULT_CONTAINER_VCORES = 1; private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; + // All other resources to request for the container + // on which the shell command will run + private Map containerResources = new HashMap<>(); // Priority of the request private int requestPriority; // Execution type of the containers. @@ -431,6 +435,10 @@ public boolean init(String[] args) throws ParseException, IOException { "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("container_resources", true, + "Amount of resources to be requested to run the shell command. " + + "Specified as resource type=value pairs separated by commas. " + + "E.g. -container_resources memory-mb=512,vcores=1"); opts.addOption("container_resource_profile", true, "Resource profile to be requested to run the shell command"); opts.addOption("num_containers", true, @@ -590,6 +598,14 @@ public boolean init(String[] args) throws ParseException, IOException { "container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( "container_vcores", "-1")); + containerResources = new HashMap<>(); + if (cliParser.hasOption("container_resources")) { + Map resources = Client.parseResourcesString( + cliParser.getOptionValue("container_resources")); + for (Map.Entry entry : resources.entrySet()) { + containerResources.put(entry.getKey(), entry.getValue()); + } + } containerResourceProfile = cliParser.getOptionValue("container_resource_profile", ""); numTotalContainers = Integer.parseInt(cliParser.getOptionValue( @@ -711,6 +727,7 @@ public void run() throws YarnException, IOException, InterruptedException { .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl); resourceProfiles = response.getResourceProfiles(); + ResourceUtils.reinitializeResources(response.getResourceTypes()); // Dump out information about cluster capability as seen by the // resource manager long maxMem = response.getMaximumResourceCapability().getMemorySize(); @@ -1593,6 +1610,9 @@ private ProfileCapability createProfileCapability() containerVirtualCores; resourceCapability.setMemorySize(containerMemory); resourceCapability.setVirtualCores(containerVirtualCores); + for (Map.Entry entry : containerResources.entrySet()) { + resourceCapability.setResourceValue(entry.getKey(), entry.getValue()); + } } String profileName = containerResourceProfile; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index e299acc8fe0..ef635d33b94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -29,6 +29,7 @@ import java.util.Vector; import java.util.Arrays; +import com.google.common.base.Joiner; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -70,7 +71,9 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -81,8 +84,11 @@ import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.util.YarnClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.slf4j.Logger; @@ -144,7 +150,8 @@ public class Client { private long amMemory = DEFAULT_AM_MEMORY; // Amt. of virtual core resource to request for to run the App Master private int amVCores = DEFAULT_AM_VCORES; - + // Amount of resources to request to run the App Master + private Map amResources = new HashMap<>(); // AM resource profile private String amResourceProfile = ""; @@ -168,6 +175,9 @@ public class Client { private long containerMemory = DEFAULT_CONTAINER_MEMORY; // Amt. of virtual cores to request for container in which shell script will be executed private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; + // Amt. of resources to request for container + // in which shell script will be executed + private Map containerResources = new HashMap<>(); // container resource profile private String containerResourceProfile = ""; // No. of containers in which the shell script needs to be executed @@ -265,6 +275,8 @@ public Client(Configuration conf) throws Exception { Client(String appMasterMainClass, Configuration conf) { this.conf = conf; + this.conf.setBoolean( + YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, true); this.appMasterMainClass = appMasterMainClass; yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); @@ -274,7 +286,12 @@ public Client(Configuration conf) throws Exception { opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); opts.addOption("timeout", true, "Application timeout in milliseconds"); opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); - opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); + opts.addOption("master_vcores", true, "Amount of virtual cores " + + "to be requested to run the application master"); + opts.addOption("master_resources", true, "Amount of resources " + + "to be requested to run the application master. " + + "Specified as resource type=value pairs separated by commas." + + "E.g. -master_resources memory-mb=512,vcores=2"); opts.addOption("jar", true, "Jar file containing the application master"); opts.addOption("master_resource_profile", true, "Resource profile for the application master"); opts.addOption("shell_command", true, "Shell command to be executed by " + @@ -290,8 +307,14 @@ public Client(Configuration conf) throws Exception { opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("container_type", true, "Container execution type, GUARANTEED or OPPORTUNISTIC"); - opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); - opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("container_memory", true, "Amount of memory in MB " + + "to be requested to run the shell command"); + opts.addOption("container_vcores", true, "Amount of virtual cores " + + "to be requested to run the shell command"); + opts.addOption("container_resources", true, "Amount of resources " + + "to be requested to run the shell command. " + + "Specified as resource type=value pairs separated by commas. " + + "E.g. -container_resources memory-mb=256,vcores=1"); opts.addOption("container_resource_profile", true, "Resource profile for the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("promote_opportunistic_after_start", false, @@ -403,6 +426,19 @@ public boolean init(String[] args) throws ParseException { Integer.parseInt(cliParser.getOptionValue("master_memory", "-1")); amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1")); + if (cliParser.hasOption("master_resources")) { + Map masterResources = + parseResourcesString(cliParser.getOptionValue("master_resources")); + for (Map.Entry entry : masterResources.entrySet()) { + if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { + amMemory = entry.getValue(); + } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) { + amVCores = entry.getValue().intValue(); + } else { + amResources.put(entry.getKey(), entry.getValue()); + } + } + } amResourceProfile = cliParser.getOptionValue("master_resource_profile", ""); if (!cliParser.hasOption("jar")) { @@ -461,6 +497,19 @@ public boolean init(String[] args) throws ParseException { Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1")); + if (cliParser.hasOption("container_resources")) { + Map resources = + parseResourcesString(cliParser.getOptionValue("container_resources")); + for (Map.Entry entry : resources.entrySet()) { + if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { + containerMemory = entry.getValue(); + } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) { + containerVirtualCores = entry.getValue().intValue(); + } else { + containerResources.put(entry.getKey(), entry.getValue()); + } + } + } containerResourceProfile = cliParser.getOptionValue("container_resource_profile", ""); numContainers = @@ -637,9 +686,9 @@ public boolean run() throws IOException, YarnException { // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and // vcores requirements - setAMResourceCapability(appContext, amMemory, amVCores, amResourceProfile, - amPriority, profiles); - setContainerResources(containerMemory, containerVirtualCores, profiles); + List resourceTypes = yarnClient.getResourceTypeInfo(); + setAMResourceCapability(appContext, profiles, resourceTypes); + setContainerResources(profiles, resourceTypes); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); @@ -776,6 +825,10 @@ public boolean run() throws IOException, YarnException { if (containerVirtualCores > 0) { vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); } + if (!containerResources.isEmpty()) { + Joiner.MapJoiner joiner = Joiner.on(',').withKeyValueSeparator("="); + vargs.add("--container_resources " + joiner.join(containerResources)); + } if (containerResourceProfile != null && !containerResourceProfile .isEmpty()) { vargs.add("--container_resource_profile " + containerResourceProfile); @@ -1011,25 +1064,26 @@ private void prepareTimelineDomain() { } private void setAMResourceCapability(ApplicationSubmissionContext appContext, - long memory, int vcores, String profile, int priority, - Map profiles) throws IllegalArgumentException { - if (memory < -1 || memory == 0) { + Map profiles, List resourceTypes) + throws IllegalArgumentException, IOException, YarnException { + if (amMemory < -1 || amMemory == 0) { throw new IllegalArgumentException("Invalid memory specified for" - + " application master, exiting. Specified memory=" + memory); + + " application master, exiting. Specified memory=" + amMemory); } - if (vcores < -1 || vcores == 0) { + if (amVCores < -1 || amVCores == 0) { throw new IllegalArgumentException("Invalid virtual cores specified for" - + " application master, exiting. Specified virtual cores=" + vcores); + + " application master, exiting. " + + "Specified virtual cores=" + amVCores); } - String tmp = profile; - if (profile.isEmpty()) { + String tmp = amResourceProfile; + if (amResourceProfile.isEmpty()) { tmp = "default"; } if (appContext.getAMContainerResourceRequests() == null) { List amResourceRequests = new ArrayList(); amResourceRequests - .add(ResourceRequest.newInstance(Priority.newInstance(priority), "*", - Resources.clone(Resources.none()), 1)); + .add(ResourceRequest.newInstance(Priority.newInstance(amPriority), + "*", Resources.clone(Resources.none()), 1)); appContext.setAMContainerResourceRequests(amResourceRequests); } @@ -1038,36 +1092,90 @@ private void setAMResourceCapability(ApplicationSubmissionContext appContext, appContext.getAMContainerResourceRequests().get(0).setProfileCapability( ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0))); } + Resource capability = Resource.newInstance(0, 0); - // set amMemory because it's used to set Xmx param - if (profiles == null) { - amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory; - amVCores = vcores == -1 ? DEFAULT_AM_VCORES : vcores; - capability.setMemorySize(amMemory); - capability.setVirtualCores(amVCores); - } else { - amMemory = memory == -1 ? profiles.get(tmp).getMemorySize() : memory; - amVCores = vcores == -1 ? profiles.get(tmp).getVirtualCores() : vcores; - capability.setMemorySize(memory); - capability.setVirtualCores(vcores); + + validateResourceTypes(amResources.keySet(), resourceTypes); + for (Map.Entry entry : amResources.entrySet()) { + capability.setResourceValue(entry.getKey(), entry.getValue()); } + // set amMemory because it's used to set Xmx param + if (amMemory == -1) { + amMemory = (profiles == null) ? DEFAULT_AM_MEMORY : + profiles.get(tmp).getMemorySize(); + } + if (amVCores == -1) { + amVCores = (profiles == null) ? DEFAULT_AM_VCORES : + profiles.get(tmp).getVirtualCores(); + } + capability.setMemorySize(amMemory); + capability.setVirtualCores(amVCores); appContext.getAMContainerResourceRequests().get(0).getProfileCapability() .setProfileCapabilityOverride(capability); } - private void setContainerResources(long memory, int vcores, - Map profiles) throws IllegalArgumentException { - if (memory < -1 || memory == 0) { - throw new IllegalArgumentException( - "Container memory '" + memory + "' has to be greated than 0"); + private void setContainerResources(Map profiles, + List resourceTypes) throws IllegalArgumentException { + if (containerMemory < -1 || containerMemory == 0) { + throw new IllegalArgumentException("Container memory '" + + containerMemory + "' has to be greated than 0"); } - if (vcores < -1 || vcores == 0) { - throw new IllegalArgumentException( - "Container vcores '" + vcores + "' has to be greated than 0"); + if (containerVirtualCores < -1 || containerVirtualCores == 0) { + throw new IllegalArgumentException("Container vcores '" + + containerVirtualCores + "' has to be greated than 0"); } + validateResourceTypes(containerResources.keySet(), resourceTypes); if (profiles == null) { - containerMemory = memory == -1 ? DEFAULT_CONTAINER_MEMORY : memory; - containerVirtualCores = vcores == -1 ? DEFAULT_CONTAINER_VCORES : vcores; + containerMemory = containerMemory == -1 ? + DEFAULT_CONTAINER_MEMORY : containerMemory; + containerVirtualCores = containerVirtualCores == -1 ? + DEFAULT_CONTAINER_VCORES : containerVirtualCores; } } + + private void validateResourceTypes(Iterable resourceNames, + List resourceTypes) { + for (String resourceName : resourceNames) { + if (!resourceTypes.stream().anyMatch(e -> + e.getName().equals(resourceName))) { + throw new ResourceNotFoundException("Unknown resource: " + + resourceName); + } + } + } + + static Map parseResourcesString(String resourcesStr) { + Map resources = new HashMap<>(); + + // Ignore the grouping "[]" + if (resourcesStr.startsWith("[")) { + resourcesStr = resourcesStr.substring(1); + } + if (resourcesStr.endsWith("]")) { + resourcesStr = resourcesStr.substring(0, resourcesStr.length()); + } + + for (String resource : resourcesStr.trim().split(",")) { + resource = resource.trim(); + if (!resource.matches("^[^=]+=\\d+\\s?\\w*$")) { + throw new IllegalArgumentException("\"" + resource + "\" is not a " + + "valid resource type/amount pair. " + + "Please provide key=amount pairs separated by commas."); + } + String[] splits = resource.split("="); + String key = splits[0], value = splits[1]; + String units = ResourceUtils.getUnits(value); + String valueWithoutUnit = value.substring( + 0, value.length() - units.length()).trim(); + Long resourceValue = Long.valueOf(valueWithoutUnit); + if (!units.isEmpty()) { + resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); + } + if (key.equals("memory")) { + key = ResourceInformation.MEMORY_URI; + } + resources.put(key, resourceValue); + } + return resources; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index d6bb8d6444a..667b60d21ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.cli.MissingArgumentException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -54,11 +55,15 @@ import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -71,11 +76,11 @@ import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.client.api.impl.TimelineWriter; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.TimelineVersion; @@ -89,6 +94,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -112,6 +118,7 @@ public class TestDistributedShell { private static final int NUM_NMS = 1; private static final float DEFAULT_TIMELINE_VERSION = 1.0f; private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; + private static final int MIN_ALLOCATION_MB = 128; protected final static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); @@ -138,7 +145,8 @@ private void setupInternal(int numNodeManager, float timelineVersion) LOG.info("Starting up YARN cluster"); conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + MIN_ALLOCATION_MB); // reduce the teardown waiting time conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000); conf.set("yarn.log.dir", "target"); @@ -1433,4 +1441,167 @@ public void testDSShellWithOpportunisticContainers() throws Exception { Assert.fail("Job execution with opportunistic containers failed."); } } + + @Test + @TimelineVersion(2.0f) + public void testDistributedShellWithResources() throws Exception { + doTestDistributedShellWithResources(false); + } + + @Test + @TimelineVersion(2.0f) + public void testDistributedShellWithResourcesWithLargeContainers() + throws Exception { + doTestDistributedShellWithResources(true); + } + + public void doTestDistributedShellWithResources(boolean largeContainers) + throws Exception { + Resource clusterResource = yarnCluster.getResourceManager() + .getResourceScheduler().getClusterResource(); + String masterMemoryString = "1 Gi"; + String containerMemoryString = "512 Mi"; + long masterMemory = 1024; + long containerMemory = 512; + Assume.assumeTrue("The cluster doesn't have enough memory for this test", + clusterResource.getMemorySize() >= masterMemory + containerMemory); + Assume.assumeTrue("The cluster doesn't have enough cores for this test", + clusterResource.getVirtualCores() >= 2); + if (largeContainers) { + masterMemory = clusterResource.getMemorySize() * 2 / 3; + masterMemory = masterMemory - masterMemory % MIN_ALLOCATION_MB; + masterMemoryString = masterMemory + "Mi"; + containerMemory = clusterResource.getMemorySize() / 3; + containerMemory = containerMemory - containerMemory % MIN_ALLOCATION_MB; + containerMemoryString = String.valueOf(containerMemory); + } + + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources", + "memory=" + masterMemoryString + ",vcores=1", + "--container_resources", + "memory=" + containerMemoryString + ",vcores=1", + }; + + LOG.info("Initializing DS Client"); + Client client = new Client(new Configuration(yarnCluster.getConfig())); + Assert.assertTrue(client.init(args)); + LOG.info("Running DS Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(yarnCluster.getConfig())); + yarnClient.start(); + + while (true) { + List apps = yarnClient.getApplications(); + if (apps.isEmpty()) { + Thread.sleep(10); + continue; + } + ApplicationReport appReport = apps.get(0); + ApplicationId appId = appReport.getApplicationId(); + List appAttempts = + yarnClient.getApplicationAttempts(appId); + if (appAttempts.isEmpty()) { + Thread.sleep(10); + continue; + } + ApplicationAttemptReport appAttemptReport = appAttempts.get(0); + ContainerId amContainerId = appAttemptReport.getAMContainerId(); + + if (amContainerId == null) { + Thread.sleep(10); + continue; + } + ContainerReport report = yarnClient.getContainerReport(amContainerId); + Resource masterResource = report.getAllocatedResource(); + Assert.assertEquals(masterMemory, masterResource.getMemorySize()); + Assert.assertEquals(1, masterResource.getVirtualCores()); + + List containers = + yarnClient.getContainers(appAttemptReport.getApplicationAttemptId()); + if (containers.size() < 2) { + Thread.sleep(10); + continue; + } + for (ContainerReport container : containers) { + if (!container.getContainerId().equals(amContainerId)) { + Resource containerResource = container.getAllocatedResource(); + Assert.assertEquals(containerMemory, + containerResource.getMemorySize()); + Assert.assertEquals(1, containerResource.getVirtualCores()); + } + } + + return; + } + } + + @Test(expected=IllegalArgumentException.class) + public void testDistributedShellAMResourcesWithIllegalArguments() + throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources", + "memory-mb=invalid" + }; + Client client = new Client(new Configuration(yarnCluster.getConfig())); + client.init(args); + } + + @Test(expected=MissingArgumentException.class) + public void testDistributedShellAMResourcesWithMissingArgumentValue() + throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources" + }; + Client client = new Client(new Configuration(yarnCluster.getConfig())); + client.init(args); + } + + @Test(expected=ResourceNotFoundException.class) + public void testDistributedShellAMResourcesWithUnknownResource() + throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources", + "unknown-resource=5" + }; + Client client = new Client(new Configuration(yarnCluster.getConfig())); + client.init(args); + client.run(); + } }