From 928964102029e96406f5482e8900802f38164501 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 18 Dec 2017 10:07:16 +0800 Subject: [PATCH] YARN-7617. Add a flag in distributed shell to automatically PROMOTE opportunistic containers to guaranteed once they are started. Contributed by Weiwei Yang. --- .../distributedshell/ApplicationMaster.java | 49 ++++++++++++++++++- .../applications/distributedshell/Client.java | 11 +++++ .../markdown/OpportunisticContainers.md.vm | 2 +- 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 926de50ceb1..b3fa0ff3c0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -93,6 +93,8 @@ import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; @@ -247,6 +249,8 @@ public enum DSEntity { // Execution type of the containers. // Default GUARANTEED. private ExecutionType containerType = ExecutionType.GUARANTEED; + // Whether to automatically promote opportunistic containers. + private boolean autoPromoteContainers = false; // Resource profile for the container private String containerResourceProfile = ""; @@ -420,6 +424,9 @@ public boolean init(String[] args) throws ParseException, IOException { "Environment for shell script. Specified as env_key=env_val pairs"); opts.addOption("container_type", true, "Container execution type, GUARANTEED or OPPORTUNISTIC"); + opts.addOption("promote_opportunistic_after_start", false, + "Flag to indicate whether to automatically promote opportunistic" + + " containers to guaranteed."); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, @@ -576,6 +583,9 @@ public boolean init(String[] args) throws ParseException, IOException { } containerType = ExecutionType.valueOf(containerTypeStr); } + if (cliParser.hasOption("promote_opportunistic_after_start")) { + autoPromoteContainers = true; + } containerMemory = Integer.parseInt(cliParser.getOptionValue( "container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( @@ -977,7 +987,24 @@ public void onContainersAllocated(List allocatedContainers) { @Override public void onContainersUpdated( - List containers) {} + List containers) { + for (UpdatedContainer container : containers) { + LOG.info("Container {} updated, updateType={}, resource={}, " + + "execType={}", + container.getContainer().getId(), + container.getUpdateType().toString(), + container.getContainer().getResource().toString(), + container.getContainer().getExecutionType()); + + // TODO Remove this line with finalized updateContainer API. + // Currently nm client needs to notify the NM to update container + // execution type via NMClient#updateContainerResource() or + // NMClientAsync#updateContainerResourceAsync() when + // auto-update.containers is disabled, but this API is + // under evolving and will need to be replaced by a proper new API. + nmClientAsync.updateContainerResourceAsync(container.getContainer()); + } + } @Override public void onShutdownRequest() { @@ -1004,7 +1031,7 @@ public void onError(Throwable e) { } @VisibleForTesting - static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler { + class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler { private ConcurrentMap containers = new ConcurrentHashMap(); @@ -1033,6 +1060,24 @@ public void onContainerStatusReceived(ContainerId containerId, LOG.debug("Container Status: id=" + containerId + ", status=" + containerStatus); } + + // If promote_opportunistic_after_start is set, automatically promote + // opportunistic containers to guaranteed. + if (autoPromoteContainers) { + if (containerStatus.getState() == ContainerState.RUNNING) { + Container container = containers.get(containerId); + if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + // Promote container + LOG.info("Promoting container {} to {}", container.getId(), + container.getExecutionType()); + UpdateContainerRequest updateRequest = UpdateContainerRequest + .newInstance(container.getVersion(), container.getId(), + ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, + ExecutionType.GUARANTEED); + amRMClient.requestContainerUpdate(container, updateRequest); + } + } + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 16bf0fd82e5..e299acc8fe0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -175,6 +175,8 @@ public class Client { private String nodeLabelExpression = null; // Container type, default GUARANTEED. private ExecutionType containerType = ExecutionType.GUARANTEED; + // Whether to auto promote opportunistic containers + private boolean autoPromoteContainers = false; // log4j.properties file // if available, add to local resources and set into classpath @@ -292,6 +294,9 @@ public Client(Configuration conf) throws Exception { opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); opts.addOption("container_resource_profile", true, "Resource profile for the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); + opts.addOption("promote_opportunistic_after_start", false, + "Flag to indicate whether to automatically promote opportunistic" + + " containers to guaranteed."); opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application attempts." + @@ -449,6 +454,9 @@ public boolean init(String[] args) throws ParseException { } containerType = ExecutionType.valueOf(containerTypeStr); } + if (cliParser.hasOption("promote_opportunistic_after_start")) { + autoPromoteContainers = true; + } containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); containerVirtualCores = @@ -759,6 +767,9 @@ public boolean run() throws IOException, YarnException { if (containerType != null) { vargs.add("--container_type " + String.valueOf(containerType)); } + if (autoPromoteContainers) { + vargs.add("--promote_opportunistic_after_start"); + } if (containerMemory > 0) { vargs.add("--container_memory " + String.valueOf(containerMemory)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm index 7882b876219..f1c75aecafa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm @@ -83,7 +83,7 @@ Another sample job is the distributed shell, it allows us to run a given shell c $ yarn org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-${project.version}.jar.jar -shell_command sleep -shell_args 10 -num_containers 10 -container_type OPPORTUNISTIC ``` -By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`. +By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`. By adding flag `-promote_opportunistic_after_start` to the above command, application master will attempt to promote all opportunistic containers to guaranteed once they are started. $H3 Opportunistic Containers in Web UI