YARN-7610. Extend Distributed Shell to support launching job with opportunistic containers. Contributed by Weiwei Yang.

(cherry picked from commit 40b0045ebe)
This commit is contained in:
Weiwei Yang 2017-12-06 17:52:41 +08:00
parent bf79246c5d
commit 8bfd30b921
4 changed files with 145 additions and 28 deletions

View File

@ -41,6 +41,7 @@ import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Arrays;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@ -89,6 +90,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
@ -237,6 +240,9 @@ public class ApplicationMaster {
private int containerVirtualCores = 1;
// Priority of the request
private int requestPriority;
// Execution type of the containers.
// Default GUARANTEED.
private ExecutionType containerType = ExecutionType.GUARANTEED;
// Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger();
@ -404,6 +410,8 @@ public class ApplicationMaster {
"App Attempt ID. Not to be used unless for testing purposes");
opts.addOption("shell_env", true,
"Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("container_type", true,
"Container execution type, GUARANTEED or OPPORTUNISTIC");
opts.addOption("container_memory", true,
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true,
@ -548,6 +556,16 @@ public class ApplicationMaster {
domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
}
if (cliParser.hasOption("container_type")) {
String containerTypeStr = cliParser.getOptionValue("container_type");
if (Arrays.stream(ExecutionType.values()).noneMatch(
executionType -> executionType.toString()
.equals(containerTypeStr))) {
throw new IllegalArgumentException("Invalid container_type: "
+ containerTypeStr);
}
containerType = ExecutionType.valueOf(containerTypeStr);
}
containerMemory = Integer.parseInt(cliParser.getOptionValue(
"container_memory", "10"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
@ -1232,8 +1250,10 @@ public class ApplicationMaster {
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null,
pri);
ContainerRequest request =
new ContainerRequest(capability, null, null, pri, 0, true, null,
ExecutionTypeRequest.newInstance(containerType));
LOG.info("Requested container ask: " + request.toString());
return request;
}

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.Arrays;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@ -157,6 +159,8 @@ public class Client {
// No. of containers in which the shell script needs to be executed
private int numContainers = 1;
private String nodeLabelExpression = null;
// Container type, default GUARANTEED.
private ExecutionType containerType = ExecutionType.GUARANTEED;
// log4j.properties file
// if available, add to local resources and set into classpath
@ -267,6 +271,8 @@ public class Client {
opts.addOption("shell_env", true,
"Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_type", true,
"Container execution type, GUARANTEED or OPPORTUNISTIC");
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
@ -435,6 +441,17 @@ public class Client {
+ ", containerVirtualCores=" + containerVirtualCores
+ ", numContainer=" + numContainers);
}
if (cliParser.hasOption("container_type")) {
String containerTypeStr = cliParser.getOptionValue("container_type");
if (Arrays.stream(ExecutionType.values()).noneMatch(
executionType -> executionType.toString()
.equals(containerTypeStr))) {
throw new IllegalArgumentException("Invalid container_type: "
+ containerTypeStr);
}
containerType = ExecutionType.valueOf(containerTypeStr);
}
nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
@ -698,6 +715,9 @@ public class Client {
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
if (containerType != null) {
vargs.add("--container_type " + String.valueOf(containerType));
}
vargs.add("--num_containers " + String.valueOf(numContainers));
if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression);

View File

@ -1178,6 +1178,33 @@ public class TestDistributedShell {
e.getMessage().contains("No shell command or shell script specified " +
"to be executed by application master"));
}
LOG.info("Initializing DS Client with invalid container_type argument");
try {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"2",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--shell_command",
"date",
"--container_type",
"UNSUPPORTED_TYPE"
};
client.init(args);
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
Assert.assertTrue("The throw exception is not expected",
e.getMessage().contains("Invalid container_type: UNSUPPORTED_TYPE"));
}
}
@Test
@ -1348,4 +1375,33 @@ public class TestDistributedShell {
}
return numOfWords;
}
@Test
public void testDSShellWithOpportunisticContainers() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig()));
try {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"2",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--shell_command",
"date",
"--container_type",
"OPPORTUNISTIC"
};
client.init(args);
client.run();
} catch (Exception e) {
Assert.fail("Job execution with opportunistic containers failed.");
}
}
}

View File

@ -12,30 +12,34 @@
limitations under the License. See accompanying LICENSE file.
-->
#set ( $H3 = '###' )
#set ( $H4 = '####' )
#set ( $H5 = '#####' )
Opportunistic Containers
========================
<!-- MACRO{toc|fromDepth=0|toDepth=3} -->
<a name="Purpose"></a>Purpose
-----------------------------
Purpose
-------
This document introduces the notion of **opportunistic** container execution, and discusses how opportunistic containers are allocated and executed.
<a name="Quick_Guide"></a>Quick Guide
--------------------------------------------------------------------
Quick Guide
-----------
We start by providing a brief overview of opportunistic containers, including how a user can enable this feature and run a sample job using such containers.
###<a name="Main_Goal"></a>Main Goal
$H3 Main Goal
Unlike existing YARN containers that are scheduled in a node only if there are unallocated resources, opportunistic containers can be dispatched to an NM, even if their execution at that node cannot start immediately. In such a case, opportunistic containers will be queued at that NM until resources become available.
The main goal of opportunistic container execution is to improve cluster resource utilization, and therefore increase task throughput. Resource utilization and task throughput improvements are more pronounced for workloads that include relatively short tasks (in the order of seconds).
###<a name="Enabling_Opportunistic_Containers"></a>Enabling Opportunistic Containers
$H3 Enabling Opportunistic Containers
To enable opportunistic container allocation, the following two properties have to be present in **conf/yarn-site.xml**:
@ -52,19 +56,36 @@ By default, allocation of opportunistic containers is performed centrally throug
|:-------- |:----- |:----- |
| `yarn.nodemanager.distributed-scheduling.enabled` | Enables distributed scheduling. | `false` |
In order to submit jobs to a cluster that has AMRMProxy turned on, one must create a separate set of configs for the client from which jobs will be submitted. In these, the **conf/yarn-site.xml** should have the following additional configurations:
###<a name="Running_a_Sample_Job"></a>Running a Sample Job
| Property | Value | Description |
|:-------- |:----- |:----- |
| `yarn.resourcemanger.scheduler.address` | `localhost:8049` | Redirects jobs to the Node Manager's AMRMProxy port.|
The following command can be used to run a sample pi map-reduce job, executing 40% of mappers using opportunistic containers (substitute `3.0.0-alpha2-SNAPSHOT` below with the version of Hadoop you are using):
$H3 Running a Sample Job
$H4 MapReduce PI
The following command can be used to run a sample pi map-reduce job, executing 40% of mappers using opportunistic containers:
```
$ hadoop jar hadoop-3.0.0-alpha2-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-alpha2-SNAPSHOT.jar pi -Dmapreduce.job.num-opportunistic-maps-percent="40" 50 100
$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-${project.version}.jar pi -Dmapreduce.job.num-opportunistic-maps-percent="40" 50 100
```
By changing the value of `mapreduce.job.num-opportunistic-maps-percent` in the above command, we can specify the percentage of mappers that can be executed through opportunistic containers.
$H4 Distributed Shell
###<a name="Opportunistic_Containers_in_Web_UI"></a>Opportunistic Containers in Web UI
Another sample job is the distributed shell, it allows us to run a given shell command on a set of containers. The following command can be used to run `sleep 10` command in 10 opportunistic containers:
```
$ yarn org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-${project.version}.jar.jar -shell_command sleep -shell_args 10 -num_containers 10 -container_type OPPORTUNISTIC
```
By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`.
$H3 Opportunistic Containers in Web UI
When opportunistic container allocation is enabled, the following new columns can be observed in the Nodes page of the Web UI (`rm-address:8088/cluster/nodes`):
@ -78,8 +99,8 @@ When clicking on a specific container running on a node, the execution type of t
In the rest of the document, we provide an in-depth description of opportunistic containers, including details about their allocation and execution.
Overview <a name="Overview"></a>
--------------------------------
Overview
--------
The existing schedulers in YARN (Fair and Capacity Scheduler) allocate containers to a node only if there are unallocated resources at that node at the moment of scheduling the containers. This **guaranteed** type of execution has the advantage that once the AM dispatches a container to a node, the container execution will start immediately, since it is guaranteed that there will be available resources. Moreover, unless fairness or capacity constraints are violated, containers are guaranteed to run to completion without being preempted.
@ -99,8 +120,8 @@ Note that in the current implementation, we are allocating containers based on a
Below, we describe in more detail the [container execution types](#Container_Execution_Types), as well as the [execution](#Execution_of_Opportunistic_Containers) (including the container queuing at the NMs) and [allocation](#Allocation_of_Opportunistic_Containers) of opportunistic containers. Then we discuss how to fine-tune opportunistic containers through some [advanced configuration parameters](#Advanced_Configuration). Finally, we discuss open items for [future work](#Items_for_Future_Work).
<a name="Container_Execution_Types"></a>Container Execution Types
-----------------------------------------------------------------
Container Execution Types
-------------------------
We introduce the following two types of containers:
@ -110,8 +131,8 @@ We introduce the following two types of containers:
When an AM submits its resource requests to the RM, it specifies the type for each container (default is guaranteed), determining the way the container will be [allocated](#Allocation_of_Opportunistic_Containers). Subsequently, when the container is launched by the AM at an NM, its type determines how it will be [executed](#Execution_of_Opportunistic_Containers) by the NM.
<a name="Execution_of_Opportunistic_Containers"></a>Execution of Opportunistic Containers
---------------------------------------------------------------------------
Execution of Opportunistic Containers
-------------------------------------
When a container arrives at an NM, its execution is determined by the available resources at the NM and the container type. Guaranteed containers start their execution immediately, and if needed, the NM will kill running opportunistic containers to ensure there are sufficient resources for the guaranteed ones to start. On the other hand, opportunistic containers can be queued at the NM, if there are no resources available to start their execution when they arrive at the NM. To enable this, we extended the NM by allowing queuing of containers at each node. The NM monitors the local resources, and when there are sufficient resources available, it starts the execution of the opportunistic container that is at the head of the queue.
@ -126,12 +147,12 @@ In particular, when a container arrives at an NM, localization is performed (i.e
In the [future work items](#Items_for_Future_Work) below, we discuss different ways of prioritizing task execution (queue reordering) and of killing opportunistic containers to make space for guaranteed ones.
<a name="Allocation_of_Opportunistic_Containers"></a>Allocation of Opportunistic Containers
-----------------------------------------------------------------------------
Allocation of Opportunistic Containers
--------------------------------------
As mentioned above, we provide both a centralized and a distributed way of allocating opportunistic containers, which we describe below.
###<a name="Centralized_Allocation"></a>Centralized Allocation
$H3 Centralized Allocation
We have introduced a new service at the RM, namely the `OpportunisticContainerAllocatorAMService`, which extends the `ApplicationMasterService`. When the centralized opportunistic allocation is enabled, the resource requests from the AMs are served at the RM side by the `OpportunisticContainerAllocatorAMService`, which splits them into two sets of resource requests:
@ -141,7 +162,7 @@ We have introduced a new service at the RM, namely the `OpportunisticContainerAl
The `OpportunisticContainerAllocator` maintains a list with the [least loaded nodes](#Determining_Nodes_for_Allocation) of the cluster at each moment, and assigns containers to them in a round-robin fashion. Note that in the current implementation, we purposely do not take into account node locality constraints. Since an opportunistic container (unlike the guaranteed ones) might wait at the queue of an NM before its execution starts, it is more important to allocate it at a node that is less loaded (i.e., where queuing delay will be smaller) rather than respect its locality constraints. Moreover, we do not take into account sharing (fairness/capacity) constraints for opportunistic containers at the moment. Support for both locality and sharing constraints can be added in the future if required.
###<a name="Distributed_Allocation"></a>Distributed Allocation
$H3 Distributed Allocation
In order to enable distributed scheduling of opportunistic containers, we have introduced a new service at each NM, called `AMRMProxyService`. The `AMRMProxyService` implements the `ApplicationMasterService` protocol, and acts as a proxy between the AMs running at that node and the RM. When the `AMRMProxyService` is enabled (through a parameter), we force all AMs running at a particular node to communicate with the `AMRMProxyService` of the same node, instead of going directly to the RM. Moreover, to ensure that the AMs will not talk directly with the RM, when a new AM gets initialized, we replace its `AMRMToken` with a token signed by the `AMRMProxyService`.
@ -155,7 +176,7 @@ When distributed opportunistic scheduling is enabled, each AM sends its resource
The above procedure is similar to the one performed by the `OpportunisticContainerAllocatorAMService` in the case of centralized opportunistic scheduling described above. The main difference is that in the distributed case, the splitting of requests into guaranteed and opportunistic happens locally at the node, and only the guaranteed requests are forwarded to the RM, while the opportunistic ones are handled without contacting the RM.
###<a name="Determining_Nodes_for_Allocation"></a>Determining Nodes for Allocation
$H3 Determining Nodes for Allocation
Each NM informs the RM periodically through the NM-RM heartbeats about the number of running guaranteed and opportunistic containers, as well as the number of queued opportunistic containers. The RM gathers this information from all nodes and determines the least loaded ones.
@ -164,13 +185,13 @@ In the case of centralized allocation of opportunistic containers, this informat
At the moment, we take into account only the number of queued opportunistic containers at each node in order to estimate the time an opportunistic container would have to wait if sent to that node and, thus, determine the least loaded nodes. If the AM provided us with information about the estimated task durations, we could take them into account in order to have better estimates of the queue waiting times.
###<a name="Rebalancing_Node_Load"></a>Rebalancing Node Load
$H3 Rebalancing Node Load
Occasionally poor placement choices for opportunistic containers may be made (due to stale queue length estimates), which can lead to load imbalance between nodes. The problem is more pronounced under high cluster load, and also in the case of distributed scheduling (multiple `DistributedSchedulers` may place containers at the same NM, since they do not coordinate with each other). To deal with this load imbalance between the NM queues, we perform load shedding to dynamically re-balance the load between NMs. In particular, while aggregating at the RM the queue time estimates published by each NM, we construct a distribution and find a targeted maximal value for the length of the NM queues (based on the mean and standard deviation of the distribution). Then the RM disseminates this value to the various NMs through the heartbeat responses. Subsequently, using this information, an NM on a node whose queue length is above the threshold discards opportunistic containers to meet this maximal value. This forces the associated individual AMs to reschedule those containers elsewhere.
<a name="Advanced_Configuration"></a>Advanced Configuration
--------------------------------------------------
Advanced Configuration
----------------------
The main properties for enabling opportunistic container allocation and choosing between centralized and distributed allocation were described in the [quick guide](#Quick_Guide) in the beginning of this document. Here we present more advanced configuration. Note that using default values for those parameters should be sufficient in most cases. All parameters below have to be defined in the **conf/yarn-site.xml** file.
@ -200,8 +221,8 @@ Finally, two more properties can further tune the `AMRMProxyService` in case dis
| `yarn.nodemanager.amrmproxy.client.thread-count` | The number of threads that are used at each NM for serving the interceptors register to the `AMRMProxyService` by different jobs. | `3` |
<a name="Items_for_Future_Work"></a>Items for Future Work
-----------------------------------------------
Items for Future Work
---------------------
Here we describe multiple ways in which we can extend/enhance the allocation and execution of opportunistic containers. We also provide the JIRAs that track each item.