YARN-7617. Add a flag in distributed shell to automatically PROMOTE opportunistic containers to guaranteed once they are started. Contributed by Weiwei Yang.

(cherry picked from commit 9289641020)
This commit is contained in:
Weiwei Yang 2017-12-18 10:07:16 +08:00
parent 90636cf127
commit de2156dc90
3 changed files with 60 additions and 4 deletions

View File

@ -92,6 +92,8 @@ import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
@ -243,6 +245,8 @@ public class ApplicationMaster {
// Execution type of the containers.
// Default GUARANTEED.
private ExecutionType containerType = ExecutionType.GUARANTEED;
// Whether to automatically promote opportunistic containers.
private boolean autoPromoteContainers = false;
// Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger();
@ -412,6 +416,9 @@ public class ApplicationMaster {
"Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("container_type", true,
"Container execution type, GUARANTEED or OPPORTUNISTIC");
opts.addOption("promote_opportunistic_after_start", false,
"Flag to indicate whether to automatically promote opportunistic"
+ " containers to guaranteed.");
opts.addOption("container_memory", true,
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true,
@ -566,6 +573,9 @@ public class ApplicationMaster {
}
containerType = ExecutionType.valueOf(containerTypeStr);
}
if (cliParser.hasOption("promote_opportunistic_after_start")) {
autoPromoteContainers = true;
}
containerMemory = Integer.parseInt(cliParser.getOptionValue(
"container_memory", "10"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
@ -964,7 +974,24 @@ public class ApplicationMaster {
@Override
public void onContainersUpdated(
List<UpdatedContainer> containers) {}
List<UpdatedContainer> containers) {
for (UpdatedContainer container : containers) {
LOG.info("Container {} updated, updateType={}, resource={}, "
+ "execType={}",
container.getContainer().getId(),
container.getUpdateType().toString(),
container.getContainer().getResource().toString(),
container.getContainer().getExecutionType());
// TODO Remove this line with finalized updateContainer API.
// Currently nm client needs to notify the NM to update container
// execution type via NMClient#updateContainerResource() or
// NMClientAsync#updateContainerResourceAsync() when
// auto-update.containers is disabled, but this API is
// under evolving and will need to be replaced by a proper new API.
nmClientAsync.updateContainerResourceAsync(container.getContainer());
}
}
@Override
public void onShutdownRequest() {
@ -991,7 +1018,7 @@ public class ApplicationMaster {
}
@VisibleForTesting
static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
private ConcurrentMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>();
@ -1020,6 +1047,24 @@ public class ApplicationMaster {
LOG.debug("Container Status: id=" + containerId + ", status=" +
containerStatus);
}
// If promote_opportunistic_after_start is set, automatically promote
// opportunistic containers to guaranteed.
if (autoPromoteContainers) {
if (containerStatus.getState() == ContainerState.RUNNING) {
Container container = containers.get(containerId);
if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
// Promote container
LOG.info("Promoting container {} to {}", container.getId(),
container.getExecutionType());
UpdateContainerRequest updateRequest = UpdateContainerRequest
.newInstance(container.getVersion(), container.getId(),
ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null,
ExecutionType.GUARANTEED);
amRMClient.requestContainerUpdate(container, updateRequest);
}
}
}
}
@Override

View File

@ -161,6 +161,8 @@ public class Client {
private String nodeLabelExpression = null;
// Container type, default GUARANTEED.
private ExecutionType containerType = ExecutionType.GUARANTEED;
// Whether to auto promote opportunistic containers
private boolean autoPromoteContainers = false;
// log4j.properties file
// if available, add to local resources and set into classpath
@ -276,6 +278,9 @@ public class Client {
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("promote_opportunistic_after_start", false,
"Flag to indicate whether to automatically promote opportunistic"
+ " containers to guaranteed.");
opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("keep_containers_across_application_attempts", false,
"Flag to indicate whether to keep containers across application attempts." +
@ -452,7 +457,10 @@ public class Client {
}
containerType = ExecutionType.valueOf(containerTypeStr);
}
if (cliParser.hasOption("promote_opportunistic_after_start")) {
autoPromoteContainers = true;
}
nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
@ -718,6 +726,9 @@ public class Client {
if (containerType != null) {
vargs.add("--container_type " + String.valueOf(containerType));
}
if (autoPromoteContainers) {
vargs.add("--promote_opportunistic_after_start");
}
vargs.add("--num_containers " + String.valueOf(numContainers));
if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression);

View File

@ -83,7 +83,7 @@ Another sample job is the distributed shell, it allows us to run a given shell c
$ yarn org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-${project.version}.jar.jar -shell_command sleep -shell_args 10 -num_containers 10 -container_type OPPORTUNISTIC
```
By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`.
By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`. By adding flag `-promote_opportunistic_after_start` to the above command, application master will attempt to promote all opportunistic containers to guaranteed once they are started.
$H3 Opportunistic Containers in Web UI