YARN-2918. RM should not fail on startup if queue's configured labels do not exist in cluster-node-labels. Contributed by Wangda Tan
(cherry picked from commit f489a4ec96
)
This commit is contained in:
parent
a3abe8d7e4
commit
d817fbb34d
|
@ -288,6 +288,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-3584. Fixed attempt diagnostics format shown on the UI. (nijel via
|
||||
jianhe)
|
||||
|
||||
YARN-2918. RM should not fail on startup if queue's configured labels do
|
||||
not exist in cluster-node-labels. (Wangda Tan via jianhe)
|
||||
|
||||
Release 2.7.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -504,7 +504,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
try {
|
||||
RMServerUtils.normalizeAndValidateRequests(ask,
|
||||
rScheduler.getMaximumResourceCapability(), app.getQueue(),
|
||||
rScheduler);
|
||||
rScheduler, rmContext);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
|
||||
throw e;
|
||||
|
|
|
@ -383,7 +383,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
try {
|
||||
SchedulerUtils.normalizeAndValidateRequest(amReq,
|
||||
scheduler.getMaximumResourceCapability(),
|
||||
submissionContext.getQueue(), scheduler, isRecovery);
|
||||
submissionContext.getQueue(), scheduler, isRecovery, rmContext);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
LOG.warn("RM app submission failed in validating AM resource request"
|
||||
+ " for application " + submissionContext.getApplicationId(), e);
|
||||
|
|
|
@ -91,11 +91,12 @@ public class RMServerUtils {
|
|||
* requested memory/vcore is non-negative and not greater than max
|
||||
*/
|
||||
public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
|
||||
Resource maximumResource, String queueName, YarnScheduler scheduler)
|
||||
Resource maximumResource, String queueName, YarnScheduler scheduler,
|
||||
RMContext rmContext)
|
||||
throws InvalidResourceRequestException {
|
||||
for (ResourceRequest resReq : ask) {
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource,
|
||||
queueName, scheduler);
|
||||
queueName, scheduler, rmContext);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
|
@ -210,7 +211,7 @@ public class SchedulerUtils {
|
|||
|
||||
public static void normalizeAndValidateRequest(ResourceRequest resReq,
|
||||
Resource maximumResource, String queueName, YarnScheduler scheduler,
|
||||
boolean isRecovery)
|
||||
boolean isRecovery, RMContext rmContext)
|
||||
throws InvalidResourceRequestException {
|
||||
|
||||
QueueInfo queueInfo = null;
|
||||
|
@ -222,15 +223,16 @@ public class SchedulerUtils {
|
|||
}
|
||||
SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo);
|
||||
if (!isRecovery) {
|
||||
validateResourceRequest(resReq, maximumResource, queueInfo);
|
||||
validateResourceRequest(resReq, maximumResource, queueInfo, rmContext);
|
||||
}
|
||||
}
|
||||
|
||||
public static void normalizeAndvalidateRequest(ResourceRequest resReq,
|
||||
Resource maximumResource, String queueName, YarnScheduler scheduler)
|
||||
Resource maximumResource, String queueName, YarnScheduler scheduler,
|
||||
RMContext rmContext)
|
||||
throws InvalidResourceRequestException {
|
||||
normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler,
|
||||
false);
|
||||
false, rmContext);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -239,8 +241,8 @@ public class SchedulerUtils {
|
|||
*
|
||||
* @throws InvalidResourceRequestException when there is invalid request
|
||||
*/
|
||||
public static void validateResourceRequest(ResourceRequest resReq,
|
||||
Resource maximumResource, QueueInfo queueInfo)
|
||||
private static void validateResourceRequest(ResourceRequest resReq,
|
||||
Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
|
||||
throws InvalidResourceRequestException {
|
||||
if (resReq.getCapability().getMemory() < 0 ||
|
||||
resReq.getCapability().getMemory() > maximumResource.getMemory()) {
|
||||
|
@ -282,7 +284,7 @@ public class SchedulerUtils {
|
|||
|
||||
if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) {
|
||||
if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(),
|
||||
labelExp)) {
|
||||
labelExp, rmContext)) {
|
||||
throw new InvalidResourceRequestException("Invalid resource request"
|
||||
+ ", queue="
|
||||
+ queueInfo.getQueueName()
|
||||
|
@ -296,39 +298,36 @@ public class SchedulerUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static void checkIfLabelInClusterNodeLabels(RMNodeLabelsManager mgr,
|
||||
Set<String> labels) throws IOException {
|
||||
if (mgr == null) {
|
||||
if (labels != null && !labels.isEmpty()) {
|
||||
throw new IOException("NodeLabelManager is null, please check");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (labels != null) {
|
||||
for (String label : labels) {
|
||||
if (!label.equals(RMNodeLabelsManager.ANY)
|
||||
&& !mgr.containsNodeLabel(label)) {
|
||||
throw new IOException("NodeLabelManager doesn't include label = "
|
||||
+ label + ", please check.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check queue label expression, check if node label in queue's
|
||||
* node-label-expression existed in clusterNodeLabels if rmContext != null
|
||||
*/
|
||||
public static boolean checkQueueLabelExpression(Set<String> queueLabels,
|
||||
String labelExpression) {
|
||||
if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
|
||||
return true;
|
||||
}
|
||||
String labelExpression, RMContext rmContext) {
|
||||
// if label expression is empty, we can allocate container on any node
|
||||
if (labelExpression == null) {
|
||||
return true;
|
||||
}
|
||||
for (String str : labelExpression.split("&&")) {
|
||||
if (!str.trim().isEmpty()
|
||||
&& (queueLabels == null || !queueLabels.contains(str.trim()))) {
|
||||
str = str.trim();
|
||||
if (!str.trim().isEmpty()) {
|
||||
// check queue label
|
||||
if (queueLabels == null) {
|
||||
return false;
|
||||
} else {
|
||||
if (!queueLabels.contains(str)
|
||||
&& !queueLabels.contains(RMNodeLabelsManager.ANY)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// check node label manager contains this label
|
||||
if (null != rmContext) {
|
||||
RMNodeLabelsManager nlm = rmContext.getNodeLabelManager();
|
||||
if (nlm != null && !nlm.containsNodeLabel(str)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -118,11 +118,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
protected void setupConfigurableCapacities() {
|
||||
CSQueueUtils.loadUpdateAndCheckCapacities(
|
||||
getQueuePath(),
|
||||
accessibleLabels,
|
||||
csContext.getConfiguration(),
|
||||
queueCapacities,
|
||||
parent == null ? null : parent.getQueueCapacities(),
|
||||
csContext.getRMContext().getNodeLabelManager());
|
||||
parent == null ? null : parent.getQueueCapacities());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -248,8 +246,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
if (this.accessibleLabels == null && parent != null) {
|
||||
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
||||
}
|
||||
SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
|
||||
this.accessibleLabels);
|
||||
|
||||
// inherit from parent if labels not set
|
||||
if (this.defaultLabelExpression == null && parent != null
|
||||
|
|
|
@ -116,10 +116,9 @@ class CSQueueUtils {
|
|||
* - Check if capacities/absolute-capacities legal
|
||||
*/
|
||||
public static void loadUpdateAndCheckCapacities(String queuePath,
|
||||
Set<String> accessibleLabels, CapacitySchedulerConfiguration csConf,
|
||||
QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities,
|
||||
RMNodeLabelsManager nlm) {
|
||||
loadCapacitiesByLabelsFromConf(queuePath, accessibleLabels, nlm,
|
||||
CapacitySchedulerConfiguration csConf,
|
||||
QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
|
||||
loadCapacitiesByLabelsFromConf(queuePath,
|
||||
queueCapacities, csConf);
|
||||
|
||||
updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities);
|
||||
|
@ -127,28 +126,13 @@ class CSQueueUtils {
|
|||
capacitiesSanityCheck(queuePath, queueCapacities);
|
||||
}
|
||||
|
||||
// Considered NO_LABEL, ANY and null cases
|
||||
private static Set<String> normalizeAccessibleNodeLabels(Set<String> labels,
|
||||
RMNodeLabelsManager mgr) {
|
||||
Set<String> accessibleLabels = new HashSet<String>();
|
||||
if (labels != null) {
|
||||
accessibleLabels.addAll(labels);
|
||||
}
|
||||
if (accessibleLabels.contains(CommonNodeLabelsManager.ANY)) {
|
||||
accessibleLabels.addAll(mgr.getClusterNodeLabelNames());
|
||||
}
|
||||
accessibleLabels.add(CommonNodeLabelsManager.NO_LABEL);
|
||||
|
||||
return accessibleLabels;
|
||||
}
|
||||
|
||||
private static void loadCapacitiesByLabelsFromConf(String queuePath,
|
||||
Set<String> labels, RMNodeLabelsManager mgr,
|
||||
QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) {
|
||||
queueCapacities.clearConfigurableFields();
|
||||
labels = normalizeAccessibleNodeLabels(labels, mgr);
|
||||
Set<String> configuredNodelabels =
|
||||
csConf.getConfiguredNodeLabels(queuePath);
|
||||
|
||||
for (String label : labels) {
|
||||
for (String label : configuredNodelabels) {
|
||||
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
|
||||
queueCapacities.setCapacity(CommonNodeLabelsManager.NO_LABEL,
|
||||
csConf.getNonLabeledQueueCapacity(queuePath) / 100);
|
||||
|
|
|
@ -23,8 +23,10 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
|
@ -44,13 +46,14 @@ import org.apache.hadoop.yarn.security.AccessType;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
|
||||
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
|
||||
|
@ -909,4 +912,35 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
defaultVal);
|
||||
return preemptionDisabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get configured node labels in a given queuePath
|
||||
*/
|
||||
public Set<String> getConfiguredNodeLabels(String queuePath) {
|
||||
Set<String> configuredNodeLabels = new HashSet<String>();
|
||||
Entry<String, String> e = null;
|
||||
|
||||
Iterator<Entry<String, String>> iter = iterator();
|
||||
while (iter.hasNext()) {
|
||||
e = iter.next();
|
||||
String key = e.getKey();
|
||||
|
||||
if (key.startsWith(getQueuePrefix(queuePath) + ACCESSIBLE_NODE_LABELS
|
||||
+ DOT)) {
|
||||
// Find <label-name> in
|
||||
// <queue-path>.accessible-node-labels.<label-name>.property
|
||||
int labelStartIdx =
|
||||
key.indexOf(ACCESSIBLE_NODE_LABELS)
|
||||
+ ACCESSIBLE_NODE_LABELS.length() + 1;
|
||||
int labelEndIndx = key.indexOf('.', labelStartIdx);
|
||||
String labelName = key.substring(labelStartIdx, labelEndIndx);
|
||||
configuredNodeLabels.add(labelName);
|
||||
}
|
||||
}
|
||||
|
||||
// always add NO_LABEL
|
||||
configuredNodeLabels.add(RMNodeLabelsManager.NO_LABEL);
|
||||
|
||||
return configuredNodeLabels;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
@ -185,8 +186,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
maxAMResourcePerQueuePercent =
|
||||
conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
|
||||
|
||||
if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
|
||||
this.defaultLabelExpression)) {
|
||||
if (!SchedulerUtils.checkQueueLabelExpression(
|
||||
this.accessibleLabels, this.defaultLabelExpression, null)) {
|
||||
throw new IOException("Invalid default label expression of "
|
||||
+ " queue="
|
||||
+ getQueueName()
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -64,8 +65,10 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
|||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
|
@ -83,12 +86,15 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
public class TestSchedulerUtils {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class);
|
||||
|
||||
private RMContext rmContext = getMockRMContext();
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testNormalizeRequest() {
|
||||
ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
|
||||
|
@ -206,6 +212,9 @@ public class TestSchedulerUtils {
|
|||
// set queue accessible node labesl to [x, y]
|
||||
queueAccessibleNodeLabels.clear();
|
||||
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
|
||||
rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
||||
ImmutableSet.of(NodeLabel.newInstance("x"),
|
||||
NodeLabel.newInstance("y")));
|
||||
Resource resource = Resources.createResource(
|
||||
0,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
||||
|
@ -213,22 +222,44 @@ public class TestSchedulerUtils {
|
|||
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
||||
resReq.setNodeLabelExpression("x");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
|
||||
resReq.setNodeLabelExpression("y");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
|
||||
resReq.setNodeLabelExpression("");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
|
||||
resReq.setNodeLabelExpression(" ");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
e.printStackTrace();
|
||||
fail("Should be valid when request labels is a subset of queue labels");
|
||||
} finally {
|
||||
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
||||
Arrays.asList("x", "y"));
|
||||
}
|
||||
|
||||
// same as above, but cluster node labels don't contains label being
|
||||
// requested. should fail
|
||||
try {
|
||||
// set queue accessible node labesl to [x, y]
|
||||
queueAccessibleNodeLabels.clear();
|
||||
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
|
||||
Resource resource = Resources.createResource(
|
||||
0,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
||||
ResourceRequest resReq = BuilderUtils.newResourceRequest(
|
||||
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
||||
resReq.setNodeLabelExpression("x");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler, rmContext);
|
||||
|
||||
fail("Should fail");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
}
|
||||
|
||||
// queue has labels, failed cases (when ask a label not included by queue)
|
||||
|
@ -236,6 +267,9 @@ public class TestSchedulerUtils {
|
|||
// set queue accessible node labesl to [x, y]
|
||||
queueAccessibleNodeLabels.clear();
|
||||
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
|
||||
rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
||||
ImmutableSet.of(NodeLabel.newInstance("x"),
|
||||
NodeLabel.newInstance("y")));
|
||||
|
||||
Resource resource = Resources.createResource(
|
||||
0,
|
||||
|
@ -244,9 +278,12 @@ public class TestSchedulerUtils {
|
|||
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
||||
resReq.setNodeLabelExpression("z");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
fail("Should fail");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
} finally {
|
||||
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
||||
Arrays.asList("x", "y"));
|
||||
}
|
||||
|
||||
// we don't allow specify more than two node labels in a single expression
|
||||
|
@ -255,6 +292,9 @@ public class TestSchedulerUtils {
|
|||
// set queue accessible node labesl to [x, y]
|
||||
queueAccessibleNodeLabels.clear();
|
||||
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
|
||||
rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
||||
ImmutableSet.of(NodeLabel.newInstance("x"),
|
||||
NodeLabel.newInstance("y")));
|
||||
|
||||
Resource resource = Resources.createResource(
|
||||
0,
|
||||
|
@ -263,9 +303,12 @@ public class TestSchedulerUtils {
|
|||
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
||||
resReq.setNodeLabelExpression("x && y");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
fail("Should fail");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
} finally {
|
||||
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
||||
Arrays.asList("x", "y"));
|
||||
}
|
||||
|
||||
// queue doesn't have label, succeed (when request no label)
|
||||
|
@ -280,15 +323,15 @@ public class TestSchedulerUtils {
|
|||
ResourceRequest resReq = BuilderUtils.newResourceRequest(
|
||||
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
|
||||
resReq.setNodeLabelExpression("");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
|
||||
resReq.setNodeLabelExpression(" ");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
e.printStackTrace();
|
||||
fail("Should be valid when request labels is empty");
|
||||
|
@ -299,6 +342,9 @@ public class TestSchedulerUtils {
|
|||
// set queue accessible node labels to empty
|
||||
queueAccessibleNodeLabels.clear();
|
||||
|
||||
rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
||||
ImmutableSet.of(NodeLabel.newInstance("x")));
|
||||
|
||||
Resource resource = Resources.createResource(
|
||||
0,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
||||
|
@ -306,9 +352,12 @@ public class TestSchedulerUtils {
|
|||
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
||||
resReq.setNodeLabelExpression("x");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
fail("Should fail");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
} finally {
|
||||
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
||||
Arrays.asList("x"));
|
||||
}
|
||||
|
||||
// queue is "*", always succeeded
|
||||
|
@ -317,6 +366,40 @@ public class TestSchedulerUtils {
|
|||
queueAccessibleNodeLabels.clear();
|
||||
queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY);
|
||||
|
||||
rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
||||
ImmutableSet.of(NodeLabel.newInstance("x"),
|
||||
NodeLabel.newInstance("y"), NodeLabel.newInstance("z")));
|
||||
|
||||
Resource resource = Resources.createResource(
|
||||
0,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
||||
ResourceRequest resReq = BuilderUtils.newResourceRequest(
|
||||
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
||||
resReq.setNodeLabelExpression("x");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler, rmContext);
|
||||
|
||||
resReq.setNodeLabelExpression("y");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler, rmContext);
|
||||
|
||||
resReq.setNodeLabelExpression("z");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler, rmContext);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
e.printStackTrace();
|
||||
fail("Should be valid when queue can access any labels");
|
||||
} finally {
|
||||
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
||||
Arrays.asList("x", "y", "z"));
|
||||
}
|
||||
|
||||
// same as above, but cluster node labels don't contains label, should fail
|
||||
try {
|
||||
// set queue accessible node labels to empty
|
||||
queueAccessibleNodeLabels.clear();
|
||||
queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY);
|
||||
|
||||
Resource resource = Resources.createResource(
|
||||
0,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
||||
|
@ -324,18 +407,9 @@ public class TestSchedulerUtils {
|
|||
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
||||
resReq.setNodeLabelExpression("x");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
|
||||
resReq.setNodeLabelExpression("y");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
|
||||
resReq.setNodeLabelExpression("z");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
fail("Should fail");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
e.printStackTrace();
|
||||
fail("Should be valid when queue can access any labels");
|
||||
}
|
||||
|
||||
// we don't allow resource name other than ANY and specify label
|
||||
|
@ -343,6 +417,9 @@ public class TestSchedulerUtils {
|
|||
// set queue accessible node labesl to [x, y]
|
||||
queueAccessibleNodeLabels.clear();
|
||||
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
|
||||
rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
||||
ImmutableSet.of(NodeLabel.newInstance("x"),
|
||||
NodeLabel.newInstance("y")));
|
||||
|
||||
Resource resource = Resources.createResource(
|
||||
0,
|
||||
|
@ -351,9 +428,12 @@ public class TestSchedulerUtils {
|
|||
mock(Priority.class), "rack", resource, 1);
|
||||
resReq.setNodeLabelExpression("x");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
fail("Should fail");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
} finally {
|
||||
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
||||
Arrays.asList("x", "y"));
|
||||
}
|
||||
|
||||
// we don't allow resource name other than ANY and specify label even if
|
||||
|
@ -363,6 +443,8 @@ public class TestSchedulerUtils {
|
|||
queueAccessibleNodeLabels.clear();
|
||||
queueAccessibleNodeLabels.addAll(Arrays
|
||||
.asList(CommonNodeLabelsManager.ANY));
|
||||
rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
||||
ImmutableSet.of(NodeLabel.newInstance("x")));
|
||||
|
||||
Resource resource = Resources.createResource(
|
||||
0,
|
||||
|
@ -371,9 +453,12 @@ public class TestSchedulerUtils {
|
|||
mock(Priority.class), "rack", resource, 1);
|
||||
resReq.setNodeLabelExpression("x");
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
||||
scheduler);
|
||||
scheduler, rmContext);
|
||||
fail("Should fail");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
} finally {
|
||||
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
||||
Arrays.asList("x"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -395,7 +480,7 @@ public class TestSchedulerUtils {
|
|||
BuilderUtils.newResourceRequest(mock(Priority.class),
|
||||
ResourceRequest.ANY, resource, 1);
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
||||
mockScheduler);
|
||||
mockScheduler, rmContext);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
fail("Zero memory should be accepted");
|
||||
}
|
||||
|
@ -409,7 +494,7 @@ public class TestSchedulerUtils {
|
|||
BuilderUtils.newResourceRequest(mock(Priority.class),
|
||||
ResourceRequest.ANY, resource, 1);
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
||||
mockScheduler);
|
||||
mockScheduler, rmContext);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
fail("Zero vcores should be accepted");
|
||||
}
|
||||
|
@ -424,7 +509,7 @@ public class TestSchedulerUtils {
|
|||
BuilderUtils.newResourceRequest(mock(Priority.class),
|
||||
ResourceRequest.ANY, resource, 1);
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
||||
mockScheduler);
|
||||
mockScheduler, rmContext);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
fail("Max memory should be accepted");
|
||||
}
|
||||
|
@ -439,7 +524,7 @@ public class TestSchedulerUtils {
|
|||
BuilderUtils.newResourceRequest(mock(Priority.class),
|
||||
ResourceRequest.ANY, resource, 1);
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
||||
mockScheduler);
|
||||
mockScheduler, rmContext);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
fail("Max vcores should not be accepted");
|
||||
}
|
||||
|
@ -453,7 +538,7 @@ public class TestSchedulerUtils {
|
|||
BuilderUtils.newResourceRequest(mock(Priority.class),
|
||||
ResourceRequest.ANY, resource, 1);
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
||||
mockScheduler);
|
||||
mockScheduler, rmContext);
|
||||
fail("Negative memory should not be accepted");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
// expected
|
||||
|
@ -468,7 +553,7 @@ public class TestSchedulerUtils {
|
|||
BuilderUtils.newResourceRequest(mock(Priority.class),
|
||||
ResourceRequest.ANY, resource, 1);
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
||||
mockScheduler);
|
||||
mockScheduler, rmContext);
|
||||
fail("Negative vcores should not be accepted");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
// expected
|
||||
|
@ -484,7 +569,7 @@ public class TestSchedulerUtils {
|
|||
BuilderUtils.newResourceRequest(mock(Priority.class),
|
||||
ResourceRequest.ANY, resource, 1);
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
||||
mockScheduler);
|
||||
mockScheduler, rmContext);
|
||||
fail("More than max memory should not be accepted");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
// expected
|
||||
|
@ -501,7 +586,7 @@ public class TestSchedulerUtils {
|
|||
BuilderUtils.newResourceRequest(mock(Priority.class),
|
||||
ResourceRequest.ANY, resource, 1);
|
||||
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
||||
mockScheduler);
|
||||
mockScheduler, rmContext);
|
||||
fail("More than max vcores should not be accepted");
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
// expected
|
||||
|
@ -632,4 +717,12 @@ public class TestSchedulerUtils {
|
|||
Assert.assertNull(applications.get(appId));
|
||||
return app;
|
||||
}
|
||||
|
||||
private static RMContext getMockRMContext() {
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
RMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
|
||||
nlm.init(new Configuration(false));
|
||||
when(rmContext.getNodeLabelManager()).thenReturn(nlm);
|
||||
return rmContext;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -551,7 +551,7 @@ public class TestQueueParsing {
|
|||
ServiceOperations.stopQuietly(capacityScheduler);
|
||||
}
|
||||
|
||||
@Test(expected = Exception.class)
|
||||
@Test
|
||||
public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager()
|
||||
throws IOException {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
@ -579,7 +579,7 @@ public class TestQueueParsing {
|
|||
ServiceOperations.stopQuietly(nodeLabelsManager);
|
||||
}
|
||||
|
||||
@Test(expected = Exception.class)
|
||||
@Test
|
||||
public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager()
|
||||
throws IOException {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
@ -607,7 +607,7 @@ public class TestQueueParsing {
|
|||
ServiceOperations.stopQuietly(nodeLabelsManager);
|
||||
}
|
||||
|
||||
@Test(expected = Exception.class)
|
||||
@Test
|
||||
public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager()
|
||||
throws IOException {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
@ -635,7 +635,7 @@ public class TestQueueParsing {
|
|||
ServiceOperations.stopQuietly(nodeLabelsManager);
|
||||
}
|
||||
|
||||
@Test(expected = Exception.class)
|
||||
@Test
|
||||
public void testQueueParsingWhenLabelsNotExist() throws IOException {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
|
@ -769,4 +769,100 @@ public class TestQueueParsing {
|
|||
Assert.assertEquals(0.10 * 0.4, a2.getAbsoluteCapacity(), DELTA);
|
||||
Assert.assertEquals(0.15, a2.getAbsoluteMaximumCapacity(), DELTA);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test init a queue configuration, children's capacity for a given label
|
||||
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
|
||||
*/
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testQueueParsingFailWhenSumOfChildrenNonLabeledCapacityNot100Percent()
|
||||
throws IOException {
|
||||
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet
|
||||
.of("red", "blue"));
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
setupQueueConfiguration(csConf);
|
||||
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 5);
|
||||
|
||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||
RMContextImpl rmContext =
|
||||
new RMContextImpl(null, null, null, null, null, null,
|
||||
new RMContainerTokenSecretManager(csConf),
|
||||
new NMTokenSecretManagerInRM(csConf),
|
||||
new ClientToAMTokenSecretManagerInRM(), null);
|
||||
rmContext.setNodeLabelManager(nodeLabelManager);
|
||||
capacityScheduler.setConf(csConf);
|
||||
capacityScheduler.setRMContext(rmContext);
|
||||
capacityScheduler.init(csConf);
|
||||
capacityScheduler.start();
|
||||
ServiceOperations.stopQuietly(capacityScheduler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test init a queue configuration, children's capacity for a given label
|
||||
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
|
||||
*/
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testQueueParsingFailWhenSumOfChildrenLabeledCapacityNot100Percent()
|
||||
throws IOException {
|
||||
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet
|
||||
.of("red", "blue"));
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
setupQueueConfigurationWithLabels(csConf);
|
||||
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT + ".b.b3",
|
||||
"red", 24);
|
||||
|
||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||
RMContextImpl rmContext =
|
||||
new RMContextImpl(null, null, null, null, null, null,
|
||||
new RMContainerTokenSecretManager(csConf),
|
||||
new NMTokenSecretManagerInRM(csConf),
|
||||
new ClientToAMTokenSecretManagerInRM(), null);
|
||||
rmContext.setNodeLabelManager(nodeLabelManager);
|
||||
capacityScheduler.setConf(csConf);
|
||||
capacityScheduler.setRMContext(rmContext);
|
||||
capacityScheduler.init(csConf);
|
||||
capacityScheduler.start();
|
||||
ServiceOperations.stopQuietly(capacityScheduler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test init a queue configuration, children's capacity for a given label
|
||||
* doesn't equals to 100%. This expect IllegalArgumentException thrown.
|
||||
*/
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testQueueParsingWithSumOfChildLabelCapacityNot100PercentWithWildCard()
|
||||
throws IOException {
|
||||
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet
|
||||
.of("red", "blue"));
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
setupQueueConfigurationWithLabels(csConf);
|
||||
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT + ".b.b3",
|
||||
"red", 24);
|
||||
csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT,
|
||||
ImmutableSet.of(RMNodeLabelsManager.ANY));
|
||||
csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT + ".b",
|
||||
ImmutableSet.of(RMNodeLabelsManager.ANY));
|
||||
|
||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||
RMContextImpl rmContext =
|
||||
new RMContextImpl(null, null, null, null, null, null,
|
||||
new RMContainerTokenSecretManager(csConf),
|
||||
new NMTokenSecretManagerInRM(csConf),
|
||||
new ClientToAMTokenSecretManagerInRM(), null);
|
||||
rmContext.setNodeLabelManager(nodeLabelManager);
|
||||
capacityScheduler.setConf(csConf);
|
||||
capacityScheduler.setRMContext(rmContext);
|
||||
capacityScheduler.init(csConf);
|
||||
capacityScheduler.start();
|
||||
ServiceOperations.stopQuietly(capacityScheduler);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue