diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index a04f57b2704..92f21ab5eb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -265,6 +265,8 @@ public class ApplicationMaster { private ExecutionType containerType = ExecutionType.GUARANTEED; // Whether to automatically promote opportunistic containers. private boolean autoPromoteContainers = false; + // Whether to enforce execution type of the containers. + private boolean enforceExecType = false; // Resource profile for the container private String containerResourceProfile = ""; @@ -456,6 +458,8 @@ public class ApplicationMaster { opts.addOption("promote_opportunistic_after_start", false, "Flag to indicate whether to automatically promote opportunistic" + " containers to guaranteed."); + opts.addOption("enforce_execution_type", false, + "Flag to indicate whether to enforce execution type of containers"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, @@ -647,6 +651,9 @@ public class ApplicationMaster { if (cliParser.hasOption("promote_opportunistic_after_start")) { autoPromoteContainers = true; } + if (cliParser.hasOption("enforce_execution_type")) { + enforceExecType = true; + } containerMemory = Integer.parseInt(cliParser.getOptionValue( "container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( @@ -1502,7 +1509,7 @@ public class ApplicationMaster { ContainerRequest request = new ContainerRequest( getTaskResourceCapability(), null, null, pri, 0, true, null, - ExecutionTypeRequest.newInstance(containerType), + ExecutionTypeRequest.newInstance(containerType, enforceExecType), containerResourceProfile); LOG.info("Requested container ask: " + request.toString()); return request; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index ecbe288d6b4..3a967701773 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -189,6 +189,8 @@ public class Client { private ExecutionType containerType = ExecutionType.GUARANTEED; // Whether to auto promote opportunistic containers private boolean autoPromoteContainers = false; + // Whether to enforce execution type of containers + private boolean enforceExecType = false; // Placement specification private String placementSpec = ""; @@ -332,6 +334,8 @@ public class Client { opts.addOption("promote_opportunistic_after_start", false, "Flag to indicate whether to automatically promote opportunistic" + " containers to guaranteed."); + opts.addOption("enforce_execution_type", false, + "Flag to indicate whether to enforce execution type of containers"); opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application " @@ -525,6 +529,9 @@ public class Client { if (cliParser.hasOption("promote_opportunistic_after_start")) { autoPromoteContainers = true; } + if (cliParser.hasOption("enforce_execution_type")) { + enforceExecType = true; + } containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); containerVirtualCores = @@ -877,6 +884,9 @@ public class Client { if (autoPromoteContainers) { vargs.add("--promote_opportunistic_after_start"); } + if (enforceExecType) { + vargs.add("--enforce_execution_type"); + } if (containerMemory > 0) { vargs.add("--container_memory " + String.valueOf(containerMemory)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index b41fea6dc50..3bca1ca64f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Supplier; import org.apache.commons.cli.MissingArgumentException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -173,6 +176,8 @@ public class TestDistributedShell { true); conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); // ATS version specific settings if (timelineVersion == 1.0f) { @@ -1468,6 +1473,97 @@ public class TestDistributedShell { } } + @Test + @TimelineVersion(2.0f) + public void testDSShellWithEnforceExecutionType() throws Exception { + Client client = new Client(new Configuration(yarnCluster.getConfig())); + try { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--shell_command", + "date", + "--container_type", + "OPPORTUNISTIC", + "--enforce_execution_type" + }; + client.init(args); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(yarnCluster.getConfig())); + yarnClient.start(); + waitForContainersLaunch(yarnClient, 2); + List apps = yarnClient.getApplications(); + ApplicationReport appReport = apps.get(0); + ApplicationId appId = appReport.getApplicationId(); + List appAttempts = + yarnClient.getApplicationAttempts(appId); + ApplicationAttemptReport appAttemptReport = appAttempts.get(0); + ApplicationAttemptId appAttemptId = + appAttemptReport.getApplicationAttemptId(); + List containers = + yarnClient.getContainers(appAttemptId); + // we should get two containers. + Assert.assertEquals(2, containers.size()); + ContainerId amContainerId = appAttemptReport.getAMContainerId(); + for (ContainerReport container : containers) { + if (!container.getContainerId().equals(amContainerId)) { + Assert.assertEquals(container.getExecutionType(), + ExecutionType.OPPORTUNISTIC); + } + } + } catch (Exception e) { + Assert.fail("Job execution with enforce execution type failed."); + } + } + + private void waitForContainersLaunch(YarnClient client, + int nContainers) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + try { + List apps = client.getApplications(); + if (apps == null || apps.isEmpty()) { + return false; + } + ApplicationId appId = apps.get(0).getApplicationId(); + List appAttempts = + client.getApplicationAttempts(appId); + if (appAttempts == null || appAttempts.isEmpty()) { + return false; + } + ApplicationAttemptId attemptId = + appAttempts.get(0).getApplicationAttemptId(); + List containers = client.getContainers(attemptId); + return (containers.size() == nContainers); + } catch (Exception e) { + return false; + } + } + }, 10, 60000); + } + @Test @TimelineVersion(2.0f) public void testDistributedShellWithResources() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm index 272c9328737..b1eea9ed27d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm @@ -84,6 +84,7 @@ $ yarn org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/ha ``` By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`. By adding flag `-promote_opportunistic_after_start` to the above command, application master will attempt to promote all opportunistic containers to guaranteed once they are started. +By adding flag '-enforce_execution_type' to the above command, scheduler will honor execution type of the containers. $H3 Opportunistic Containers in Web UI