Backport "YARN-2918. RM should not fail on startup if queue's configured labels do not exist in cluster-node-labels. Contributed by Wangda Tan" to 2.7.1

This commit is contained in:
Wangda Tan 2015-05-20 13:27:01 -07:00
parent 6c7840f5b5
commit 114d41aeb2
13 changed files with 312 additions and 83 deletions

View File

@ -103,6 +103,9 @@ Release 2.7.1 - UNRELEASED
YARN-3677. Fix findbugs warnings in yarn-server-resourcemanager. YARN-3677. Fix findbugs warnings in yarn-server-resourcemanager.
(Vinod Kumar Vavilapalli via ozawa) (Vinod Kumar Vavilapalli via ozawa)
YARN-2918. RM should not fail on startup if queue's configured labels do
not exist in cluster-node-labels. (Wangda Tan via jianhe)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -501,7 +501,7 @@ public class ApplicationMasterService extends AbstractService implements
try { try {
RMServerUtils.normalizeAndValidateRequests(ask, RMServerUtils.normalizeAndValidateRequests(ask,
rScheduler.getMaximumResourceCapability(), app.getQueue(), rScheduler.getMaximumResourceCapability(), app.getQueue(),
rScheduler); rScheduler, rmContext);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e); LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw e; throw e;

View File

@ -384,7 +384,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
try { try {
SchedulerUtils.normalizeAndValidateRequest(amReq, SchedulerUtils.normalizeAndValidateRequest(amReq,
scheduler.getMaximumResourceCapability(), scheduler.getMaximumResourceCapability(),
submissionContext.getQueue(), scheduler, isRecovery); submissionContext.getQueue(), scheduler, isRecovery, rmContext);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request" LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + submissionContext.getApplicationId(), e); + " for application " + submissionContext.getApplicationId(), e);

View File

@ -91,11 +91,12 @@ public class RMServerUtils {
* requested memory/vcore is non-negative and not greater than max * requested memory/vcore is non-negative and not greater than max
*/ */
public static void normalizeAndValidateRequests(List<ResourceRequest> ask, public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
Resource maximumResource, String queueName, YarnScheduler scheduler) Resource maximumResource, String queueName, YarnScheduler scheduler,
RMContext rmContext)
throws InvalidResourceRequestException { throws InvalidResourceRequestException {
for (ResourceRequest resReq : ask) { for (ResourceRequest resReq : ask) {
SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource, SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource,
queueName, scheduler); queueName, scheduler, rmContext);
} }
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -211,7 +212,7 @@ public class SchedulerUtils {
public static void normalizeAndValidateRequest(ResourceRequest resReq, public static void normalizeAndValidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler, Resource maximumResource, String queueName, YarnScheduler scheduler,
boolean isRecovery) boolean isRecovery, RMContext rmContext)
throws InvalidResourceRequestException { throws InvalidResourceRequestException {
QueueInfo queueInfo = null; QueueInfo queueInfo = null;
@ -223,15 +224,16 @@ public class SchedulerUtils {
} }
SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo); SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo);
if (!isRecovery) { if (!isRecovery) {
validateResourceRequest(resReq, maximumResource, queueInfo); validateResourceRequest(resReq, maximumResource, queueInfo, rmContext);
} }
} }
public static void normalizeAndvalidateRequest(ResourceRequest resReq, public static void normalizeAndvalidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler) Resource maximumResource, String queueName, YarnScheduler scheduler,
RMContext rmContext)
throws InvalidResourceRequestException { throws InvalidResourceRequestException {
normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler, normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler,
false); false, rmContext);
} }
/** /**
@ -240,8 +242,8 @@ public class SchedulerUtils {
* *
* @throws InvalidResourceRequestException when there is invalid request * @throws InvalidResourceRequestException when there is invalid request
*/ */
public static void validateResourceRequest(ResourceRequest resReq, private static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, QueueInfo queueInfo) Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
throws InvalidResourceRequestException { throws InvalidResourceRequestException {
if (resReq.getCapability().getMemory() < 0 || if (resReq.getCapability().getMemory() < 0 ||
resReq.getCapability().getMemory() > maximumResource.getMemory()) { resReq.getCapability().getMemory() > maximumResource.getMemory()) {
@ -284,7 +286,7 @@ public class SchedulerUtils {
if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) { if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) {
if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(), if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(),
labelExp)) { labelExp, rmContext)) {
throw new InvalidResourceRequestException("Invalid resource request" throw new InvalidResourceRequestException("Invalid resource request"
+ ", queue=" + ", queue="
+ queueInfo.getQueueName() + queueInfo.getQueueName()
@ -357,19 +359,36 @@ public class SchedulerUtils {
return true; return true;
} }
/**
* Check queue label expression, check if node label in queue's
* node-label-expression existed in clusterNodeLabels if rmContext != null
*/
public static boolean checkQueueLabelExpression(Set<String> queueLabels, public static boolean checkQueueLabelExpression(Set<String> queueLabels,
String labelExpression) { String labelExpression, RMContext rmContext) {
if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
return true;
}
// if label expression is empty, we can allocate container on any node // if label expression is empty, we can allocate container on any node
if (labelExpression == null) { if (labelExpression == null) {
return true; return true;
} }
for (String str : labelExpression.split("&&")) { for (String str : labelExpression.split("&&")) {
if (!str.trim().isEmpty() str = str.trim();
&& (queueLabels == null || !queueLabels.contains(str.trim()))) { if (!str.trim().isEmpty()) {
return false; // check queue label
if (queueLabels == null) {
return false;
} else {
if (!queueLabels.contains(str)
&& !queueLabels.contains(RMNodeLabelsManager.ANY)) {
return false;
}
}
// check node label manager contains this label
if (null != rmContext) {
RMNodeLabelsManager nlm = rmContext.getNodeLabelManager();
if (nlm != null && !nlm.containsNodeLabel(str)) {
return false;
}
}
} }
} }
return true; return true;

View File

@ -114,11 +114,9 @@ public abstract class AbstractCSQueue implements CSQueue {
protected void setupConfigurableCapacities() { protected void setupConfigurableCapacities() {
CSQueueUtils.loadUpdateAndCheckCapacities( CSQueueUtils.loadUpdateAndCheckCapacities(
getQueuePath(), getQueuePath(),
accessibleLabels,
csContext.getConfiguration(), csContext.getConfiguration(),
queueCapacities, queueCapacities,
parent == null ? null : parent.getQueueCapacities(), parent == null ? null : parent.getQueueCapacities());
csContext.getRMContext().getNodeLabelManager());
} }
@Override @Override
@ -244,8 +242,6 @@ public abstract class AbstractCSQueue implements CSQueue {
if (this.accessibleLabels == null && parent != null) { if (this.accessibleLabels == null && parent != null) {
this.accessibleLabels = parent.getAccessibleNodeLabels(); this.accessibleLabels = parent.getAccessibleNodeLabels();
} }
SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
this.accessibleLabels);
// inherit from parent if labels not set // inherit from parent if labels not set
if (this.defaultLabelExpression == null && parent != null if (this.defaultLabelExpression == null && parent != null

View File

@ -117,39 +117,23 @@ class CSQueueUtils {
* - Check if capacities/absolute-capacities legal * - Check if capacities/absolute-capacities legal
*/ */
public static void loadUpdateAndCheckCapacities(String queuePath, public static void loadUpdateAndCheckCapacities(String queuePath,
Set<String> accessibleLabels, CapacitySchedulerConfiguration csConf, CapacitySchedulerConfiguration csConf,
QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities, QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
RMNodeLabelsManager nlm) { loadCapacitiesByLabelsFromConf(queuePath,
loadCapacitiesByLabelsFromConf(queuePath, accessibleLabels, nlm,
queueCapacities, csConf); queueCapacities, csConf);
updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities); updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities);
capacitiesSanityCheck(queuePath, queueCapacities); capacitiesSanityCheck(queuePath, queueCapacities);
} }
// Considered NO_LABEL, ANY and null cases
private static Set<String> normalizeAccessibleNodeLabels(Set<String> labels,
RMNodeLabelsManager mgr) {
Set<String> accessibleLabels = new HashSet<String>();
if (labels != null) {
accessibleLabels.addAll(labels);
}
if (accessibleLabels.contains(CommonNodeLabelsManager.ANY)) {
accessibleLabels.addAll(mgr.getClusterNodeLabels());
}
accessibleLabels.add(CommonNodeLabelsManager.NO_LABEL);
return accessibleLabels;
}
private static void loadCapacitiesByLabelsFromConf(String queuePath, private static void loadCapacitiesByLabelsFromConf(String queuePath,
Set<String> labels, RMNodeLabelsManager mgr,
QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) { QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) {
queueCapacities.clearConfigurableFields(); queueCapacities.clearConfigurableFields();
labels = normalizeAccessibleNodeLabels(labels, mgr); Set<String> configuredNodelabels =
csConf.getConfiguredNodeLabels(queuePath);
for (String label : labels) { for (String label : configuredNodelabels) {
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
queueCapacities.setCapacity(CommonNodeLabelsManager.NO_LABEL, queueCapacities.setCapacity(CommonNodeLabelsManager.NO_LABEL,
csConf.getNonLabeledQueueCapacity(queuePath) / 100); csConf.getNonLabeledQueueCapacity(queuePath) / 100);

View File

@ -23,8 +23,10 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.StringTokenizer; import java.util.StringTokenizer;
@ -859,4 +861,35 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
defaultVal); defaultVal);
return preemptionDisabled; return preemptionDisabled;
} }
/**
* Get configured node labels in a given queuePath
*/
public Set<String> getConfiguredNodeLabels(String queuePath) {
Set<String> configuredNodeLabels = new HashSet<String>();
Entry<String, String> e = null;
Iterator<Entry<String, String>> iter = iterator();
while (iter.hasNext()) {
e = iter.next();
String key = e.getKey();
if (key.startsWith(getQueuePrefix(queuePath) + ACCESSIBLE_NODE_LABELS
+ DOT)) {
// Find <label-name> in
// <queue-path>.accessible-node-labels.<label-name>.property
int labelStartIdx =
key.indexOf(ACCESSIBLE_NODE_LABELS)
+ ACCESSIBLE_NODE_LABELS.length() + 1;
int labelEndIndx = key.indexOf('.', labelStartIdx);
String labelName = key.substring(labelStartIdx, labelEndIndx);
configuredNodeLabels.add(labelName);
}
}
// always add NO_LABEL
configuredNodeLabels.add(RMNodeLabelsManager.NO_LABEL);
return configuredNodeLabels;
}
} }

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -174,8 +175,8 @@ public class LeafQueue extends AbstractCSQueue {
maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent =
conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath()); conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, if (!SchedulerUtils.checkQueueLabelExpression(
this.defaultLabelExpression)) { this.accessibleLabels, this.defaultLabelExpression, null)) {
throw new IOException("Invalid default label expression of " throw new IOException("Invalid default label expression of "
+ " queue=" + " queue="
+ getQueueName() + getQueueName()

View File

@ -67,6 +67,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -604,6 +606,8 @@ public class TestAppManager{
when(scheduler.getMaximumResourceCapability()).thenReturn( when(scheduler.getMaximumResourceCapability()).thenReturn(
Resources.createResource( Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
ResourceCalculator rc = new DefaultResourceCalculator();
when(scheduler.getResourceCalculator()).thenReturn(rc);
return scheduler; return scheduler;
} }

View File

@ -146,6 +146,8 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -1289,6 +1291,8 @@ public class TestClientRMService {
Arrays.asList(getApplicationAttemptId(103))); Arrays.asList(getApplicationAttemptId(103)));
ApplicationAttemptId attemptId = getApplicationAttemptId(1); ApplicationAttemptId attemptId = getApplicationAttemptId(1);
when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null); when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
ResourceCalculator rc = new DefaultResourceCalculator();
when(yarnScheduler.getResourceCalculator()).thenReturn(rc);
return yarnScheduler; return yarnScheduler;
} }

View File

@ -64,8 +64,10 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -83,12 +85,15 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
public class TestSchedulerUtils { public class TestSchedulerUtils {
private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class); private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class);
private RMContext rmContext = getMockRMContext();
@Test (timeout = 30000) @Test (timeout = 30000)
public void testNormalizeRequest() { public void testNormalizeRequest() {
ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
@ -206,6 +211,8 @@ public class TestSchedulerUtils {
// set queue accessible node labesl to [x, y] // set queue accessible node labesl to [x, y]
queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.clear();
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
rmContext.getNodeLabelManager().addToCluserNodeLabels(
ImmutableSet.of("x", "y"));
Resource resource = Resources.createResource( Resource resource = Resources.createResource(
0, 0,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
@ -213,22 +220,44 @@ public class TestSchedulerUtils {
mock(Priority.class), ResourceRequest.ANY, resource, 1); mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x"); resReq.setNodeLabelExpression("x");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
resReq.setNodeLabelExpression("y"); resReq.setNodeLabelExpression("y");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
resReq.setNodeLabelExpression(""); resReq.setNodeLabelExpression("");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
resReq.setNodeLabelExpression(" "); resReq.setNodeLabelExpression(" ");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
e.printStackTrace(); e.printStackTrace();
fail("Should be valid when request labels is a subset of queue labels"); fail("Should be valid when request labels is a subset of queue labels");
} finally {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
Arrays.asList("x", "y"));
}
// same as above, but cluster node labels don't contains label being
// requested. should fail
try {
// set queue accessible node labesl to [x, y]
queueAccessibleNodeLabels.clear();
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
Resource resource = Resources.createResource(
0,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler, rmContext);
fail("Should fail");
} catch (InvalidResourceRequestException e) {
} }
// queue has labels, failed cases (when ask a label not included by queue) // queue has labels, failed cases (when ask a label not included by queue)
@ -236,6 +265,9 @@ public class TestSchedulerUtils {
// set queue accessible node labesl to [x, y] // set queue accessible node labesl to [x, y]
queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.clear();
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
rmContext.getNodeLabelManager().addToCluserNodeLabels(
ImmutableSet.of("x",
"y"));
Resource resource = Resources.createResource( Resource resource = Resources.createResource(
0, 0,
@ -244,9 +276,12 @@ public class TestSchedulerUtils {
mock(Priority.class), ResourceRequest.ANY, resource, 1); mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("z"); resReq.setNodeLabelExpression("z");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
fail("Should fail"); fail("Should fail");
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
} finally {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
Arrays.asList("x", "y"));
} }
// we don't allow specify more than two node labels in a single expression // we don't allow specify more than two node labels in a single expression
@ -255,6 +290,8 @@ public class TestSchedulerUtils {
// set queue accessible node labesl to [x, y] // set queue accessible node labesl to [x, y]
queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.clear();
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
rmContext.getNodeLabelManager().addToCluserNodeLabels(
ImmutableSet.of("x", "y"));
Resource resource = Resources.createResource( Resource resource = Resources.createResource(
0, 0,
@ -263,9 +300,12 @@ public class TestSchedulerUtils {
mock(Priority.class), ResourceRequest.ANY, resource, 1); mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x && y"); resReq.setNodeLabelExpression("x && y");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
fail("Should fail"); fail("Should fail");
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
} finally {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
Arrays.asList("x", "y"));
} }
// queue doesn't have label, succeed (when request no label) // queue doesn't have label, succeed (when request no label)
@ -280,15 +320,15 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest( ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1); mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
resReq.setNodeLabelExpression(""); resReq.setNodeLabelExpression("");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
resReq.setNodeLabelExpression(" "); resReq.setNodeLabelExpression(" ");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
e.printStackTrace(); e.printStackTrace();
fail("Should be valid when request labels is empty"); fail("Should be valid when request labels is empty");
@ -299,6 +339,9 @@ public class TestSchedulerUtils {
// set queue accessible node labels to empty // set queue accessible node labels to empty
queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.clear();
rmContext.getNodeLabelManager().addToCluserNodeLabels(
ImmutableSet.of("x"));
Resource resource = Resources.createResource( Resource resource = Resources.createResource(
0, 0,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
@ -306,9 +349,12 @@ public class TestSchedulerUtils {
mock(Priority.class), ResourceRequest.ANY, resource, 1); mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x"); resReq.setNodeLabelExpression("x");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
fail("Should fail"); fail("Should fail");
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
} finally {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
Arrays.asList("x"));
} }
// queue is "*", always succeeded // queue is "*", always succeeded
@ -317,6 +363,39 @@ public class TestSchedulerUtils {
queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.clear();
queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY); queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY);
rmContext.getNodeLabelManager().addToCluserNodeLabels(
ImmutableSet.of("x", "y", "z"));
Resource resource = Resources.createResource(
0,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler, rmContext);
resReq.setNodeLabelExpression("y");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler, rmContext);
resReq.setNodeLabelExpression("z");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler, rmContext);
} catch (InvalidResourceRequestException e) {
e.printStackTrace();
fail("Should be valid when queue can access any labels");
} finally {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
Arrays.asList("x", "y", "z"));
}
// same as above, but cluster node labels don't contains label, should fail
try {
// set queue accessible node labels to empty
queueAccessibleNodeLabels.clear();
queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY);
Resource resource = Resources.createResource( Resource resource = Resources.createResource(
0, 0,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
@ -324,18 +403,9 @@ public class TestSchedulerUtils {
mock(Priority.class), ResourceRequest.ANY, resource, 1); mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x"); resReq.setNodeLabelExpression("x");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
fail("Should fail");
resReq.setNodeLabelExpression("y");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("z");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
e.printStackTrace();
fail("Should be valid when queue can access any labels");
} }
// we don't allow resource name other than ANY and specify label // we don't allow resource name other than ANY and specify label
@ -343,6 +413,8 @@ public class TestSchedulerUtils {
// set queue accessible node labesl to [x, y] // set queue accessible node labesl to [x, y]
queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.clear();
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
rmContext.getNodeLabelManager().addToCluserNodeLabels(
ImmutableSet.of("x", "y"));
Resource resource = Resources.createResource( Resource resource = Resources.createResource(
0, 0,
@ -351,9 +423,12 @@ public class TestSchedulerUtils {
mock(Priority.class), "rack", resource, 1); mock(Priority.class), "rack", resource, 1);
resReq.setNodeLabelExpression("x"); resReq.setNodeLabelExpression("x");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
fail("Should fail"); fail("Should fail");
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
} finally {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
Arrays.asList("x", "y"));
} }
// we don't allow resource name other than ANY and specify label even if // we don't allow resource name other than ANY and specify label even if
@ -363,6 +438,8 @@ public class TestSchedulerUtils {
queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.clear();
queueAccessibleNodeLabels.addAll(Arrays queueAccessibleNodeLabels.addAll(Arrays
.asList(CommonNodeLabelsManager.ANY)); .asList(CommonNodeLabelsManager.ANY));
rmContext.getNodeLabelManager().addToCluserNodeLabels(
ImmutableSet.of("x"));
Resource resource = Resources.createResource( Resource resource = Resources.createResource(
0, 0,
@ -371,9 +448,12 @@ public class TestSchedulerUtils {
mock(Priority.class), "rack", resource, 1); mock(Priority.class), "rack", resource, 1);
resReq.setNodeLabelExpression("x"); resReq.setNodeLabelExpression("x");
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler); scheduler, rmContext);
fail("Should fail"); fail("Should fail");
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
} finally {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
Arrays.asList("x"));
} }
} }
@ -395,7 +475,7 @@ public class TestSchedulerUtils {
BuilderUtils.newResourceRequest(mock(Priority.class), BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1); ResourceRequest.ANY, resource, 1);
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler); mockScheduler, rmContext);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
fail("Zero memory should be accepted"); fail("Zero memory should be accepted");
} }
@ -409,7 +489,7 @@ public class TestSchedulerUtils {
BuilderUtils.newResourceRequest(mock(Priority.class), BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1); ResourceRequest.ANY, resource, 1);
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler); mockScheduler, rmContext);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
fail("Zero vcores should be accepted"); fail("Zero vcores should be accepted");
} }
@ -424,7 +504,7 @@ public class TestSchedulerUtils {
BuilderUtils.newResourceRequest(mock(Priority.class), BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1); ResourceRequest.ANY, resource, 1);
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler); mockScheduler, rmContext);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
fail("Max memory should be accepted"); fail("Max memory should be accepted");
} }
@ -439,7 +519,7 @@ public class TestSchedulerUtils {
BuilderUtils.newResourceRequest(mock(Priority.class), BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1); ResourceRequest.ANY, resource, 1);
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler); mockScheduler, rmContext);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
fail("Max vcores should not be accepted"); fail("Max vcores should not be accepted");
} }
@ -453,7 +533,7 @@ public class TestSchedulerUtils {
BuilderUtils.newResourceRequest(mock(Priority.class), BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1); ResourceRequest.ANY, resource, 1);
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler); mockScheduler, rmContext);
fail("Negative memory should not be accepted"); fail("Negative memory should not be accepted");
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
// expected // expected
@ -468,7 +548,7 @@ public class TestSchedulerUtils {
BuilderUtils.newResourceRequest(mock(Priority.class), BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1); ResourceRequest.ANY, resource, 1);
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler); mockScheduler, rmContext);
fail("Negative vcores should not be accepted"); fail("Negative vcores should not be accepted");
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
// expected // expected
@ -484,7 +564,7 @@ public class TestSchedulerUtils {
BuilderUtils.newResourceRequest(mock(Priority.class), BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1); ResourceRequest.ANY, resource, 1);
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler); mockScheduler, rmContext);
fail("More than max memory should not be accepted"); fail("More than max memory should not be accepted");
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
// expected // expected
@ -501,7 +581,7 @@ public class TestSchedulerUtils {
BuilderUtils.newResourceRequest(mock(Priority.class), BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1); ResourceRequest.ANY, resource, 1);
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler); mockScheduler, rmContext);
fail("More than max vcores should not be accepted"); fail("More than max vcores should not be accepted");
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
// expected // expected
@ -632,4 +712,12 @@ public class TestSchedulerUtils {
Assert.assertNull(applications.get(appId)); Assert.assertNull(applications.get(appId));
return app; return app;
} }
private static RMContext getMockRMContext() {
RMContext rmContext = mock(RMContext.class);
RMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
nlm.init(new Configuration(false));
when(rmContext.getNodeLabelManager()).thenReturn(nlm);
return rmContext;
}
} }

View File

@ -551,7 +551,7 @@ public class TestQueueParsing {
ServiceOperations.stopQuietly(capacityScheduler); ServiceOperations.stopQuietly(capacityScheduler);
} }
@Test(expected = Exception.class) @Test
public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager() public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager()
throws IOException { throws IOException {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
@ -579,7 +579,7 @@ public class TestQueueParsing {
ServiceOperations.stopQuietly(nodeLabelsManager); ServiceOperations.stopQuietly(nodeLabelsManager);
} }
@Test(expected = Exception.class) @Test
public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager() public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager()
throws IOException { throws IOException {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
@ -607,7 +607,7 @@ public class TestQueueParsing {
ServiceOperations.stopQuietly(nodeLabelsManager); ServiceOperations.stopQuietly(nodeLabelsManager);
} }
@Test(expected = Exception.class) @Test
public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager() public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager()
throws IOException { throws IOException {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
@ -635,7 +635,7 @@ public class TestQueueParsing {
ServiceOperations.stopQuietly(nodeLabelsManager); ServiceOperations.stopQuietly(nodeLabelsManager);
} }
@Test(expected = Exception.class) @Test
public void testQueueParsingWhenLabelsNotExist() throws IOException { public void testQueueParsingWhenLabelsNotExist() throws IOException {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf = CapacitySchedulerConfiguration csConf =
@ -769,4 +769,100 @@ public class TestQueueParsing {
Assert.assertEquals(0.10 * 0.4, a2.getAbsoluteCapacity(), DELTA); Assert.assertEquals(0.10 * 0.4, a2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.15, a2.getAbsoluteMaximumCapacity(), DELTA); Assert.assertEquals(0.15, a2.getAbsoluteMaximumCapacity(), DELTA);
} }
/**
* Test init a queue configuration, children's capacity for a given label
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
*/
@Test(expected = IllegalArgumentException.class)
public void testQueueParsingFailWhenSumOfChildrenNonLabeledCapacityNot100Percent()
throws IOException {
nodeLabelManager.addToCluserNodeLabels(ImmutableSet
.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfiguration(csConf);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 5);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
ServiceOperations.stopQuietly(capacityScheduler);
}
/**
* Test init a queue configuration, children's capacity for a given label
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
*/
@Test(expected = IllegalArgumentException.class)
public void testQueueParsingFailWhenSumOfChildrenLabeledCapacityNot100Percent()
throws IOException {
nodeLabelManager.addToCluserNodeLabels(ImmutableSet
.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabels(csConf);
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT + ".b.b3",
"red", 24);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
ServiceOperations.stopQuietly(capacityScheduler);
}
/**
* Test init a queue configuration, children's capacity for a given label
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
*/
@Test(expected = IllegalArgumentException.class)
public void testQueueParsingWithSumOfChildLabelCapacityNot100PercentWithWildCard()
throws IOException {
nodeLabelManager.addToCluserNodeLabels(ImmutableSet
.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabels(csConf);
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT + ".b.b3",
"red", 24);
csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT,
ImmutableSet.of(RMNodeLabelsManager.ANY));
csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT + ".b",
ImmutableSet.of(RMNodeLabelsManager.ANY));
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
ServiceOperations.stopQuietly(capacityScheduler);
}
} }