YARN-7745. Allow DistributedShell to take a placement specification for containers it wants to launch. (Arun Suresh via wangda)

Change-Id: Ided146d662e944a8a4692e5d6885f23fd9bbcad5
This commit is contained in:
Wangda Tan 2018-01-18 14:22:45 -08:00 committed by Arun Suresh
parent 38af237969
commit e60f51299d
3 changed files with 263 additions and 12 deletions

View File

@ -42,6 +42,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@ -87,8 +88,11 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.ExecutionType;
@ -99,6 +103,7 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@ -274,6 +279,10 @@ public enum DSEntity {
@VisibleForTesting
protected AtomicInteger numRequestedContainers = new AtomicInteger();
protected AtomicInteger numIgnore = new AtomicInteger();
protected AtomicInteger totalRetries = new AtomicInteger(10);
// Shell command to be executed
private String shellCommand = "";
// Args to be passed to the shell command
@ -289,6 +298,9 @@ public enum DSEntity {
// File length needed for local resource
private long shellScriptPathLen = 0;
// Placement Specifications
private Map<String, PlacementSpec> placementSpecs = null;
// Container retry options
private ContainerRetryPolicy containerRetryPolicy =
ContainerRetryPolicy.NEVER_RETRY;
@ -334,6 +346,7 @@ public enum DSEntity {
private final String windows_command = "cmd /c";
private int yarnShellIdCounter = 1;
private final AtomicLong allocIdCounter = new AtomicLong(1);
@VisibleForTesting
protected final Set<ContainerId> launchedContainers =
@ -457,6 +470,7 @@ public boolean init(String[] args) throws ParseException, IOException {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
opts.addOption("placement_spec", true, "Placement specification");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage");
@ -487,6 +501,17 @@ public boolean init(String[] args) throws ParseException, IOException {
dumpOutDebugInfo();
}
if (cliParser.hasOption("placement_spec")) {
String placementSpec = cliParser.getOptionValue("placement_spec");
LOG.info("Placement Spec received [{}]", placementSpec);
parsePlacementSpecs(placementSpec);
LOG.info("Total num containers requested [{}]", numTotalContainers);
if (numTotalContainers == 0) {
throw new IllegalArgumentException(
"Cannot run distributed shell with no containers");
}
}
Map<String, String> envs = System.getenv();
if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
@ -609,8 +634,11 @@ public boolean init(String[] args) throws ParseException, IOException {
}
containerResourceProfile =
cliParser.getOptionValue("container_resource_profile", "");
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
"num_containers", "1"));
if (this.placementSpecs == null) {
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
"num_containers", "1"));
}
if (numTotalContainers == 0) {
throw new IllegalArgumentException(
"Cannot run distributed shell with no containers");
@ -642,6 +670,17 @@ public boolean init(String[] args) throws ParseException, IOException {
return true;
}
private void parsePlacementSpecs(String placementSpecifications) {
Map<String, PlacementSpec> pSpecs =
PlacementSpec.parse(placementSpecifications);
this.placementSpecs = new HashMap<>();
this.numTotalContainers = 0;
for (PlacementSpec pSpec : pSpecs.values()) {
this.numTotalContainers += pSpec.numContainers;
this.placementSpecs.put(pSpec.sourceTag, pSpec);
}
}
/**
* Helper function to print usage
*
@ -719,9 +758,19 @@ public void run() throws YarnException, IOException, InterruptedException {
// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
Map<Set<String>, PlacementConstraint> placementConstraintMap = null;
if (this.placementSpecs != null) {
placementConstraintMap = new HashMap<>();
for (PlacementSpec spec : this.placementSpecs.values()) {
if (spec.constraint != null) {
placementConstraintMap.put(
Collections.singleton(spec.sourceTag), spec.constraint);
}
}
}
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
appMasterTrackingUrl, placementConstraintMap);
resourceProfiles = response.getResourceProfiles();
ResourceUtils.reinitializeResources(response.getResourceTypes());
// Dump out information about cluster capability as seen by the
@ -765,9 +814,20 @@ public void run() throws YarnException, IOException, InterruptedException {
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
if (this.placementSpecs == null) {
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
} else {
List<SchedulingRequest> schedReqs = new ArrayList<>();
for (PlacementSpec pSpec : this.placementSpecs.values()) {
for (int i = 0; i < pSpec.numContainers; i++) {
SchedulingRequest sr = setupSchedulingRequest(pSpec);
schedReqs.add(sr);
}
}
amRMClient.addSchedulingRequests(schedReqs);
}
numRequestedContainers.set(numTotalContainers);
}
@ -933,6 +993,12 @@ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
numRequestedContainers.decrementAndGet();
// we do not need to release the container as it would be done
// by the RM
// Ignore these containers if placementspec is enabled
// for the time being.
if (placementSpecs != null) {
numIgnore.incrementAndGet();
}
}
} else {
// nothing to do
@ -962,14 +1028,18 @@ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
int askCount = numTotalContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount);
if (askCount > 0) {
for (int i = 0; i < askCount; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
// Dont bother re-asking if we are using placementSpecs
if (placementSpecs == null) {
if (askCount > 0) {
for (int i = 0; i < askCount; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
}
}
if (numCompletedContainers.get() == numTotalContainers) {
if (numCompletedContainers.get() + numIgnore.get() >=
numTotalContainers) {
done = true;
}
}
@ -1028,6 +1098,23 @@ public void onContainersUpdated(
}
}
@Override
public void onRequestsRejected(List<RejectedSchedulingRequest> rejReqs) {
List<SchedulingRequest> reqsToRetry = new ArrayList<>();
for (RejectedSchedulingRequest rejReq : rejReqs) {
LOG.info("Scheduling Request {} has been rejected. Reason {}",
rejReq.getRequest(), rejReq.getReason());
reqsToRetry.add(rejReq.getRequest());
}
totalRetries.addAndGet(-1 * reqsToRetry.size());
if (totalRetries.get() <= 0) {
LOG.info("Exiting, since retries are exhausted !!");
done = true;
} else {
amRMClient.addSchedulingRequests(reqsToRetry);
}
}
@Override
public void onShutdownRequest() {
done = true;
@ -1335,6 +1422,19 @@ private ContainerRequest setupContainerAskForRM() {
return request;
}
private SchedulingRequest setupSchedulingRequest(PlacementSpec spec) {
long allocId = allocIdCounter.incrementAndGet();
SchedulingRequest sReq = SchedulingRequest.newInstance(
allocId, Priority.newInstance(requestPriority),
ExecutionTypeRequest.newInstance(),
Collections.singleton(spec.sourceTag),
ResourceSizing.newInstance(
createProfileCapability().getProfileCapabilityOverride()), null);
sReq.setPlacementConstraint(spec.constraint);
LOG.info("Scheduling Request made: " + sReq.toString());
return sReq;
}
private boolean fileExist(String filePath) {
return new File(filePath).exists();
}

View File

@ -188,6 +188,8 @@ public class Client {
// Whether to auto promote opportunistic containers
private boolean autoPromoteContainers = false;
// Placement specification
private String placementSpec = "";
// log4j.properties file
// if available, add to local resources and set into classpath
private String log4jPropFile = "";
@ -366,6 +368,10 @@ public Client(Configuration conf) throws Exception {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
opts.addOption("placement_spec", true,
"Placement specification. Please note, if this option is specified,"
+ " The \"num_containers\" option will be ignored. All requested"
+ " containers will be of type GUARANTEED" );
}
/**
@ -419,6 +425,11 @@ public boolean init(String[] args) throws ParseException {
keepContainers = true;
}
if (cliParser.hasOption("placement_spec")) {
placementSpec = cliParser.getOptionValue("placement_spec");
// Check if it is parsable
PlacementSpec.parse(this.placementSpec);
}
appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
@ -834,6 +845,9 @@ public boolean run() throws IOException, YarnException {
vargs.add("--container_resource_profile " + containerResourceProfile);
}
vargs.add("--num_containers " + String.valueOf(numContainers));
if (placementSpec != null && placementSpec.length() > 0) {
vargs.add("--placement_spec " + placementSpec);
}
if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression);
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
/**
* Class encapsulating a SourceTag, number of container and a Placement
* Constraint.
*/
public class PlacementSpec {
private static final Logger LOG =
LoggerFactory.getLogger(PlacementSpec.class);
private static final String SPEC_DELIM = ":";
private static final String KV_SPLIT_DELIM = "=";
private static final String SPEC_VAL_DELIM = ",";
private static final String IN = "in";
private static final String NOT_IN = "notin";
private static final String CARDINALITY = "cardinality";
public final String sourceTag;
public final int numContainers;
public final PlacementConstraint constraint;
public PlacementSpec(String sourceTag, int numContainers,
PlacementConstraint constraint) {
this.sourceTag = sourceTag;
this.numContainers = numContainers;
this.constraint = constraint;
}
// Placement specification should be of the form:
// PlacementSpec => ""|KeyVal;PlacementSpec
// KeyVal => SourceTag=Constraint
// SourceTag => String
// Constraint => NumContainers|
// NumContainers,"in",Scope,TargetTag|
// NumContainers,"notin",Scope,TargetTag|
// NumContainers,"cardinality",Scope,TargetTag,MinCard,MaxCard
// NumContainers => int (number of containers)
// Scope => "NODE"|"RACK"
// TargetTag => String (Target Tag)
// MinCard => int (min cardinality - needed if ConstraintType == cardinality)
// MaxCard => int (max cardinality - needed if ConstraintType == cardinality)
/**
* Parser to convert a string representation of a placement spec to mapping
* from source tag to Placement Constraint.
*
* @param specs Placement spec.
* @return Mapping from source tag to placement constraint.
*/
public static Map<String, PlacementSpec> parse(String specs) {
LOG.info("Parsing Placement Specs: [{}]", specs);
Scanner s = new Scanner(specs).useDelimiter(SPEC_DELIM);
Map<String, PlacementSpec> pSpecs = new HashMap<>();
while (s.hasNext()) {
String sp = s.next();
LOG.info("Parsing Spec: [{}]", sp);
String[] specSplit = sp.split(KV_SPLIT_DELIM);
String sourceTag = specSplit[0];
Scanner ps = new Scanner(specSplit[1]).useDelimiter(SPEC_VAL_DELIM);
int numContainers = ps.nextInt();
if (!ps.hasNext()) {
pSpecs.put(sourceTag,
new PlacementSpec(sourceTag, numContainers, null));
LOG.info("Creating Spec without constraint {}: num[{}]",
sourceTag, numContainers);
continue;
}
String cType = ps.next().toLowerCase();
String scope = ps.next().toLowerCase();
String targetTag = ps.next();
scope = scope.equals("rack") ? PlacementConstraints.RACK :
PlacementConstraints.NODE;
PlacementConstraint pc;
if (cType.equals(IN)) {
pc = PlacementConstraints.build(
PlacementConstraints.targetIn(scope,
PlacementConstraints.PlacementTargets.allocationTag(
targetTag)));
LOG.info("Creating IN Constraint for source tag [{}], num[{}]: " +
"scope[{}], target[{}]",
sourceTag, numContainers, scope, targetTag);
} else if (cType.equals(NOT_IN)) {
pc = PlacementConstraints.build(
PlacementConstraints.targetNotIn(scope,
PlacementConstraints.PlacementTargets.allocationTag(
targetTag)));
LOG.info("Creating NOT_IN Constraint for source tag [{}], num[{}]: " +
"scope[{}], target[{}]",
sourceTag, numContainers, scope, targetTag);
} else if (cType.equals(CARDINALITY)) {
int minCard = ps.nextInt();
int maxCard = ps.nextInt();
pc = PlacementConstraints.build(
PlacementConstraints.targetCardinality(scope, minCard, maxCard,
PlacementConstraints.PlacementTargets.allocationTag(
targetTag)));
LOG.info("Creating CARDINALITY Constraint source tag [{}], num[{}]: " +
"scope[{}], min[{}], max[{}], target[{}]",
sourceTag, numContainers, scope, minCard, maxCard, targetTag);
} else {
throw new RuntimeException(
"Could not parse constraintType [" + cType + "]" +
" in [" + specSplit[1] + "]");
}
pSpecs.put(sourceTag, new PlacementSpec(sourceTag, numContainers, pc));
}
return pSpecs;
}
}