YARN-9191. Add cli option in DS to support enforceExecutionType in resource requests. Contributed by Abhishek Modi.
(cherry picked from commit f738b397ae
)
This commit is contained in:
parent
102db40870
commit
c6582cc04c
|
@ -264,6 +264,8 @@ public class ApplicationMaster {
|
||||||
private ExecutionType containerType = ExecutionType.GUARANTEED;
|
private ExecutionType containerType = ExecutionType.GUARANTEED;
|
||||||
// Whether to automatically promote opportunistic containers.
|
// Whether to automatically promote opportunistic containers.
|
||||||
private boolean autoPromoteContainers = false;
|
private boolean autoPromoteContainers = false;
|
||||||
|
// Whether to enforce execution type of the containers.
|
||||||
|
private boolean enforceExecType = false;
|
||||||
|
|
||||||
// Resource profile for the container
|
// Resource profile for the container
|
||||||
private String containerResourceProfile = "";
|
private String containerResourceProfile = "";
|
||||||
|
@ -455,6 +457,8 @@ public class ApplicationMaster {
|
||||||
opts.addOption("promote_opportunistic_after_start", false,
|
opts.addOption("promote_opportunistic_after_start", false,
|
||||||
"Flag to indicate whether to automatically promote opportunistic"
|
"Flag to indicate whether to automatically promote opportunistic"
|
||||||
+ " containers to guaranteed.");
|
+ " containers to guaranteed.");
|
||||||
|
opts.addOption("enforce_execution_type", false,
|
||||||
|
"Flag to indicate whether to enforce execution type of containers");
|
||||||
opts.addOption("container_memory", true,
|
opts.addOption("container_memory", true,
|
||||||
"Amount of memory in MB to be requested to run the shell command");
|
"Amount of memory in MB to be requested to run the shell command");
|
||||||
opts.addOption("container_vcores", true,
|
opts.addOption("container_vcores", true,
|
||||||
|
@ -640,6 +644,9 @@ public class ApplicationMaster {
|
||||||
if (cliParser.hasOption("promote_opportunistic_after_start")) {
|
if (cliParser.hasOption("promote_opportunistic_after_start")) {
|
||||||
autoPromoteContainers = true;
|
autoPromoteContainers = true;
|
||||||
}
|
}
|
||||||
|
if (cliParser.hasOption("enforce_execution_type")) {
|
||||||
|
enforceExecType = true;
|
||||||
|
}
|
||||||
containerMemory = Integer.parseInt(cliParser.getOptionValue(
|
containerMemory = Integer.parseInt(cliParser.getOptionValue(
|
||||||
"container_memory", "-1"));
|
"container_memory", "-1"));
|
||||||
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
|
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
|
||||||
|
@ -1479,7 +1486,7 @@ public class ApplicationMaster {
|
||||||
ContainerRequest request = new ContainerRequest(
|
ContainerRequest request = new ContainerRequest(
|
||||||
getTaskResourceCapability(),
|
getTaskResourceCapability(),
|
||||||
null, null, pri, 0, true, null,
|
null, null, pri, 0, true, null,
|
||||||
ExecutionTypeRequest.newInstance(containerType),
|
ExecutionTypeRequest.newInstance(containerType, enforceExecType),
|
||||||
containerResourceProfile);
|
containerResourceProfile);
|
||||||
LOG.info("Requested container ask: " + request.toString());
|
LOG.info("Requested container ask: " + request.toString());
|
||||||
return request;
|
return request;
|
||||||
|
|
|
@ -189,6 +189,8 @@ public class Client {
|
||||||
private ExecutionType containerType = ExecutionType.GUARANTEED;
|
private ExecutionType containerType = ExecutionType.GUARANTEED;
|
||||||
// Whether to auto promote opportunistic containers
|
// Whether to auto promote opportunistic containers
|
||||||
private boolean autoPromoteContainers = false;
|
private boolean autoPromoteContainers = false;
|
||||||
|
// Whether to enforce execution type of containers
|
||||||
|
private boolean enforceExecType = false;
|
||||||
|
|
||||||
// Placement specification
|
// Placement specification
|
||||||
private String placementSpec = "";
|
private String placementSpec = "";
|
||||||
|
@ -330,6 +332,8 @@ public class Client {
|
||||||
opts.addOption("promote_opportunistic_after_start", false,
|
opts.addOption("promote_opportunistic_after_start", false,
|
||||||
"Flag to indicate whether to automatically promote opportunistic"
|
"Flag to indicate whether to automatically promote opportunistic"
|
||||||
+ " containers to guaranteed.");
|
+ " containers to guaranteed.");
|
||||||
|
opts.addOption("enforce_execution_type", false,
|
||||||
|
"Flag to indicate whether to enforce execution type of containers");
|
||||||
opts.addOption("log_properties", true, "log4j.properties file");
|
opts.addOption("log_properties", true, "log4j.properties file");
|
||||||
opts.addOption("keep_containers_across_application_attempts", false,
|
opts.addOption("keep_containers_across_application_attempts", false,
|
||||||
"Flag to indicate whether to keep containers across application "
|
"Flag to indicate whether to keep containers across application "
|
||||||
|
@ -522,6 +526,9 @@ public class Client {
|
||||||
if (cliParser.hasOption("promote_opportunistic_after_start")) {
|
if (cliParser.hasOption("promote_opportunistic_after_start")) {
|
||||||
autoPromoteContainers = true;
|
autoPromoteContainers = true;
|
||||||
}
|
}
|
||||||
|
if (cliParser.hasOption("enforce_execution_type")) {
|
||||||
|
enforceExecType = true;
|
||||||
|
}
|
||||||
containerMemory =
|
containerMemory =
|
||||||
Integer.parseInt(cliParser.getOptionValue("container_memory", "-1"));
|
Integer.parseInt(cliParser.getOptionValue("container_memory", "-1"));
|
||||||
containerVirtualCores =
|
containerVirtualCores =
|
||||||
|
@ -874,6 +881,9 @@ public class Client {
|
||||||
if (autoPromoteContainers) {
|
if (autoPromoteContainers) {
|
||||||
vargs.add("--promote_opportunistic_after_start");
|
vargs.add("--promote_opportunistic_after_start");
|
||||||
}
|
}
|
||||||
|
if (enforceExecType) {
|
||||||
|
vargs.add("--enforce_execution_type");
|
||||||
|
}
|
||||||
if (containerMemory > 0) {
|
if (containerMemory > 0) {
|
||||||
vargs.add("--container_memory " + String.valueOf(containerMemory));
|
vargs.add("--container_memory " + String.valueOf(containerMemory));
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import org.apache.commons.cli.MissingArgumentException;
|
import org.apache.commons.cli.MissingArgumentException;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.ServerSocketUtil;
|
import org.apache.hadoop.net.ServerSocketUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.JarFinder;
|
import org.apache.hadoop.util.JarFinder;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
@ -173,6 +176,8 @@ public class TestDistributedShell {
|
||||||
true);
|
true);
|
||||||
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
|
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
|
||||||
true);
|
true);
|
||||||
|
conf.setBoolean(
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
|
||||||
|
|
||||||
// ATS version specific settings
|
// ATS version specific settings
|
||||||
if (timelineVersion == 1.0f) {
|
if (timelineVersion == 1.0f) {
|
||||||
|
@ -1468,6 +1473,97 @@ public class TestDistributedShell {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@TimelineVersion(2.0f)
|
||||||
|
public void testDSShellWithEnforceExecutionType() throws Exception {
|
||||||
|
Client client = new Client(new Configuration(yarnCluster.getConfig()));
|
||||||
|
try {
|
||||||
|
String[] args = {
|
||||||
|
"--jar",
|
||||||
|
APPMASTER_JAR,
|
||||||
|
"--num_containers",
|
||||||
|
"2",
|
||||||
|
"--master_memory",
|
||||||
|
"512",
|
||||||
|
"--master_vcores",
|
||||||
|
"2",
|
||||||
|
"--container_memory",
|
||||||
|
"128",
|
||||||
|
"--container_vcores",
|
||||||
|
"1",
|
||||||
|
"--shell_command",
|
||||||
|
"date",
|
||||||
|
"--container_type",
|
||||||
|
"OPPORTUNISTIC",
|
||||||
|
"--enforce_execution_type"
|
||||||
|
};
|
||||||
|
client.init(args);
|
||||||
|
final AtomicBoolean result = new AtomicBoolean(false);
|
||||||
|
Thread t = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
result.set(client.run());
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
t.start();
|
||||||
|
|
||||||
|
YarnClient yarnClient = YarnClient.createYarnClient();
|
||||||
|
yarnClient.init(new Configuration(yarnCluster.getConfig()));
|
||||||
|
yarnClient.start();
|
||||||
|
waitForContainersLaunch(yarnClient, 2);
|
||||||
|
List<ApplicationReport> apps = yarnClient.getApplications();
|
||||||
|
ApplicationReport appReport = apps.get(0);
|
||||||
|
ApplicationId appId = appReport.getApplicationId();
|
||||||
|
List<ApplicationAttemptReport> appAttempts =
|
||||||
|
yarnClient.getApplicationAttempts(appId);
|
||||||
|
ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
appAttemptReport.getApplicationAttemptId();
|
||||||
|
List<ContainerReport> containers =
|
||||||
|
yarnClient.getContainers(appAttemptId);
|
||||||
|
// we should get two containers.
|
||||||
|
Assert.assertEquals(2, containers.size());
|
||||||
|
ContainerId amContainerId = appAttemptReport.getAMContainerId();
|
||||||
|
for (ContainerReport container : containers) {
|
||||||
|
if (!container.getContainerId().equals(amContainerId)) {
|
||||||
|
Assert.assertEquals(container.getExecutionType(),
|
||||||
|
ExecutionType.OPPORTUNISTIC);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail("Job execution with enforce execution type failed.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForContainersLaunch(YarnClient client,
|
||||||
|
int nContainers) throws Exception {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
try {
|
||||||
|
List<ApplicationReport> apps = client.getApplications();
|
||||||
|
if (apps == null || apps.isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ApplicationId appId = apps.get(0).getApplicationId();
|
||||||
|
List<ApplicationAttemptReport> appAttempts =
|
||||||
|
client.getApplicationAttempts(appId);
|
||||||
|
if (appAttempts == null || appAttempts.isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ApplicationAttemptId attemptId =
|
||||||
|
appAttempts.get(0).getApplicationAttemptId();
|
||||||
|
List<ContainerReport> containers = client.getContainers(attemptId);
|
||||||
|
return (containers.size() == nContainers);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 10, 60000);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@TimelineVersion(2.0f)
|
@TimelineVersion(2.0f)
|
||||||
public void testDistributedShellWithResources() throws Exception {
|
public void testDistributedShellWithResources() throws Exception {
|
||||||
|
|
|
@ -84,6 +84,7 @@ $ yarn org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/ha
|
||||||
```
|
```
|
||||||
|
|
||||||
By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`. By adding flag `-promote_opportunistic_after_start` to the above command, application master will attempt to promote all opportunistic containers to guaranteed once they are started.
|
By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`. By adding flag `-promote_opportunistic_after_start` to the above command, application master will attempt to promote all opportunistic containers to guaranteed once they are started.
|
||||||
|
By adding flag '-enforce_execution_type' to the above command, scheduler will honor execution type of the containers.
|
||||||
|
|
||||||
$H3 Opportunistic Containers in Web UI
|
$H3 Opportunistic Containers in Web UI
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue