YARN-10506. Update queue creation logic to use weight mode and allow the flexible static/dynamic creation. (Contributed by Qi Zhu, Andras Gyori)
Change-Id: I118862fd5e11ee6888275e2bcf667fedfa56c5d7
This commit is contained in:
parent
6cd540e964
commit
3d46141583
|
@ -42,6 +42,10 @@ public class ApplicationPlacementContext {
|
|||
return queue;
|
||||
}
|
||||
|
||||
public void setQueue(String q) {
|
||||
queue = q;
|
||||
}
|
||||
|
||||
public String getParentQueue() {
|
||||
return parentQueue;
|
||||
}
|
||||
|
@ -49,4 +53,13 @@ public class ApplicationPlacementContext {
|
|||
public boolean hasParentQueue() {
|
||||
return parentQueue != null;
|
||||
}
|
||||
|
||||
public String getFullQueuePath() {
|
||||
if (parentQueue != null) {
|
||||
return parentQueue + "." + queue;
|
||||
} else {
|
||||
return queue;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -151,6 +151,14 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
private Map<String, Float> userWeights = new HashMap<String, Float>();
|
||||
private int maxParallelApps;
|
||||
|
||||
// is it a dynamic queue?
|
||||
private boolean dynamicQueue = false;
|
||||
|
||||
// When this queue has application submit to?
|
||||
// This property only applies to dynamic queue,
|
||||
// and will be used to check when the queue need to be removed.
|
||||
private long lastSubmittedTimestamp;
|
||||
|
||||
public AbstractCSQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
this(cs, cs.getConfiguration(), queueName, parent, old);
|
||||
|
@ -172,7 +180,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
this.metrics = old != null ?
|
||||
(CSQueueMetrics) old.getMetrics() :
|
||||
CSQueueMetrics.forQueue(getQueuePath(), parent,
|
||||
cs.getConfiguration().getEnableUserMetrics(), cs.getConf());
|
||||
cs.getConfiguration().getEnableUserMetrics(), configuration);
|
||||
|
||||
this.csContext = cs;
|
||||
this.minimumAllocation = csContext.getMinimumResourceCapability();
|
||||
|
@ -192,6 +200,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
writeLock = lock.writeLock();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void setupConfigurableCapacities() {
|
||||
setupConfigurableCapacities(csContext.getConfiguration());
|
||||
}
|
||||
|
@ -345,11 +354,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
return defaultLabelExpression;
|
||||
}
|
||||
|
||||
void setupQueueConfigs(Resource clusterResource)
|
||||
throws IOException {
|
||||
setupQueueConfigs(clusterResource, csContext.getConfiguration());
|
||||
}
|
||||
|
||||
protected void setupQueueConfigs(Resource clusterResource,
|
||||
CapacitySchedulerConfiguration configuration) throws
|
||||
IOException {
|
||||
|
@ -405,7 +409,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
QueueState parentState = (parent == null) ? null : parent.getState();
|
||||
initializeQueueState(previous, configuredState, parentState);
|
||||
|
||||
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
|
||||
authorizer = YarnAuthorizationProvider.getInstance(configuration);
|
||||
|
||||
this.acls = configuration.getAcls(getQueuePath());
|
||||
|
||||
|
@ -437,7 +441,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
}
|
||||
|
||||
this.reservationsContinueLooking =
|
||||
csContext.getConfiguration().getReservationContinueLook();
|
||||
configuration.getReservationContinueLook();
|
||||
|
||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this,
|
||||
configuration);
|
||||
|
@ -1609,4 +1613,38 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isDynamicQueue() {
|
||||
readLock.lock();
|
||||
|
||||
try {
|
||||
return dynamicQueue;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void setDynamicQueue(boolean dynamicQueue) {
|
||||
writeLock.lock();
|
||||
|
||||
try {
|
||||
this.dynamicQueue = dynamicQueue;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public long getLastSubmittedTimestamp() {
|
||||
return lastSubmittedTimestamp;
|
||||
}
|
||||
|
||||
// "Tab" the queue, so this queue won't be removed because of idle timeout.
|
||||
public void signalToSubmitToQueue() {
|
||||
writeLock.lock();
|
||||
try {
|
||||
this.lastSubmittedTimestamp = System.currentTimeMillis();
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
|
|||
writeLock.lock();
|
||||
try {
|
||||
// Set new configs
|
||||
setupQueueConfigs(clusterResource);
|
||||
setupQueueConfigs(clusterResource, csContext.getConfiguration());
|
||||
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Set;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
|
@ -312,4 +313,15 @@ public class CSQueueUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static ApplicationPlacementContext extractQueuePath(String queuePath) {
|
||||
int parentQueueNameEndIndex = queuePath.lastIndexOf(".");
|
||||
if (parentQueueNameEndIndex > -1) {
|
||||
String parent = queuePath.substring(0, parentQueueNameEndIndex).trim();
|
||||
String leaf = queuePath.substring(parentQueueNameEndIndex + 1).trim();
|
||||
return new ApplicationPlacementContext(leaf, parent);
|
||||
} else{
|
||||
return new ApplicationPlacementContext(queuePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -230,6 +230,8 @@ public class CapacityScheduler extends
|
|||
private AppPriorityACLsManager appPriorityACLManager;
|
||||
private boolean multiNodePlacementEnabled;
|
||||
|
||||
private CapacitySchedulerAutoQueueHandler autoQueueHandler;
|
||||
|
||||
private static boolean printedVerboseLoggingForAsyncScheduling = false;
|
||||
|
||||
/**
|
||||
|
@ -340,6 +342,9 @@ public class CapacityScheduler extends
|
|||
this.labelManager, this.appPriorityACLManager);
|
||||
this.queueManager.setCapacitySchedulerContext(this);
|
||||
|
||||
this.autoQueueHandler = new CapacitySchedulerAutoQueueHandler(
|
||||
this.queueManager, this.conf);
|
||||
|
||||
this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
|
||||
|
||||
this.activitiesManager = new ActivitiesManager(rmContext);
|
||||
|
@ -3329,44 +3334,6 @@ public class CapacityScheduler extends
|
|||
return null;
|
||||
}
|
||||
|
||||
private LeafQueue autoCreateLeafQueue(
|
||||
ApplicationPlacementContext placementContext)
|
||||
throws IOException, YarnException {
|
||||
|
||||
AutoCreatedLeafQueue autoCreatedLeafQueue = null;
|
||||
|
||||
String leafQueueName = placementContext.getQueue();
|
||||
String parentQueueName = placementContext.getParentQueue();
|
||||
|
||||
if (!StringUtils.isEmpty(parentQueueName)) {
|
||||
CSQueue parentQueue = getQueue(parentQueueName);
|
||||
|
||||
if (parentQueue != null && conf.isAutoCreateChildQueueEnabled(
|
||||
parentQueue.getQueuePath())) {
|
||||
|
||||
ManagedParentQueue autoCreateEnabledParentQueue =
|
||||
(ManagedParentQueue) parentQueue;
|
||||
autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName,
|
||||
autoCreateEnabledParentQueue);
|
||||
|
||||
addQueue(autoCreatedLeafQueue);
|
||||
|
||||
} else{
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Could not auto-create leaf queue for " + leafQueueName
|
||||
+ ". Queue mapping specifies an invalid parent queue "
|
||||
+ "which does not exist "
|
||||
+ parentQueueName);
|
||||
}
|
||||
} else{
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Could not auto-create leaf queue for " + leafQueueName
|
||||
+ ". Queue mapping does not specify"
|
||||
+ " which parent queue it needs to be created under.");
|
||||
}
|
||||
return autoCreatedLeafQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetSchedulerMetrics() {
|
||||
CapacitySchedulerMetrics.destroy();
|
||||
|
@ -3403,4 +3370,43 @@ public class CapacityScheduler extends
|
|||
public void setQueueManager(CapacitySchedulerQueueManager qm) {
|
||||
this.queueManager = qm;
|
||||
}
|
||||
|
||||
private LeafQueue autoCreateLeafQueue(
|
||||
ApplicationPlacementContext placementContext)
|
||||
throws IOException, YarnException {
|
||||
String leafQueueName = placementContext.getQueue();
|
||||
String parentQueueName = placementContext.getParentQueue();
|
||||
|
||||
if (!StringUtils.isEmpty(parentQueueName)) {
|
||||
CSQueue parentQueue = getQueue(parentQueueName);
|
||||
|
||||
if (parentQueue == null) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Could not auto-create leaf queue for " + leafQueueName
|
||||
+ ". Queue mapping specifies an invalid parent queue "
|
||||
+ "which does not exist " + parentQueueName);
|
||||
}
|
||||
|
||||
if (parentQueue != null &&
|
||||
conf.isAutoCreateChildQueueEnabled(parentQueue.getQueuePath())) {
|
||||
// Case 1: Handle ManagedParentQueue
|
||||
AutoCreatedLeafQueue autoCreatedLeafQueue = null;
|
||||
ManagedParentQueue autoCreateEnabledParentQueue =
|
||||
(ManagedParentQueue) parentQueue;
|
||||
autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName,
|
||||
autoCreateEnabledParentQueue);
|
||||
|
||||
addQueue(autoCreatedLeafQueue);
|
||||
return autoCreatedLeafQueue;
|
||||
|
||||
} else {
|
||||
return autoQueueHandler.autoCreateQueue(placementContext);
|
||||
}
|
||||
}
|
||||
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Could not auto-create leaf queue for " + leafQueueName
|
||||
+ ". Queue mapping does not specify"
|
||||
+ " which parent queue it needs to be created under.");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Manages the validation and the creation of a Capacity Scheduler
|
||||
* queue at runtime.
|
||||
*/
|
||||
public class CapacitySchedulerAutoQueueHandler {
|
||||
private final CapacitySchedulerQueueManager queueManager;
|
||||
private final CapacitySchedulerConfiguration conf;
|
||||
private static final int MAXIMUM_DEPTH_ALLOWED = 2;
|
||||
|
||||
public CapacitySchedulerAutoQueueHandler(
|
||||
CapacitySchedulerQueueManager queueManager,
|
||||
CapacitySchedulerConfiguration conf) {
|
||||
this.queueManager = queueManager;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a LeafQueue and its upper hierarchy given a path. A parent is
|
||||
* eligible for creation if either the placement context creation flags are
|
||||
* set, or the auto queue creation is enabled for the first static parent in
|
||||
* the hierarchy.
|
||||
*
|
||||
* @param queue the application placement information of the queue
|
||||
* @return LeafQueue part of a given queue path
|
||||
* @throws YarnException if the given path is not eligible to be auto created
|
||||
*/
|
||||
public LeafQueue autoCreateQueue(ApplicationPlacementContext queue)
|
||||
throws YarnException {
|
||||
ApplicationPlacementContext parentContext =
|
||||
CSQueueUtils.extractQueuePath(queue.getParentQueue());
|
||||
List<ApplicationPlacementContext> parentsToCreate = new ArrayList<>();
|
||||
|
||||
ApplicationPlacementContext queueCandidateContext = parentContext;
|
||||
CSQueue existingQueueCandidate = getQueue(queueCandidateContext.getQueue());
|
||||
|
||||
while (existingQueueCandidate == null) {
|
||||
parentsToCreate.add(queueCandidateContext);
|
||||
queueCandidateContext = CSQueueUtils.extractQueuePath(
|
||||
queueCandidateContext.getParentQueue());
|
||||
existingQueueCandidate = getQueue(queueCandidateContext.getQueue());
|
||||
}
|
||||
|
||||
// Reverse the collection to to represent the hierarchy to be created
|
||||
// from highest to lowest level
|
||||
Collections.reverse(parentsToCreate);
|
||||
|
||||
if (!(existingQueueCandidate instanceof ParentQueue)) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Could not auto create hierarchy of "
|
||||
+ queue.getFullQueuePath() + ". Queue "
|
||||
+ existingQueueCandidate.getQueuePath() +
|
||||
" is not a ParentQueue."
|
||||
);
|
||||
}
|
||||
ParentQueue existingParentQueue = (ParentQueue) existingQueueCandidate;
|
||||
int depthLimit = extractDepthLimit(existingParentQueue);
|
||||
// The number of levels to be created including the LeafQueue
|
||||
// (which is last)
|
||||
int levelsToCreate = parentsToCreate.size() + 1;
|
||||
|
||||
if (depthLimit == 0) {
|
||||
throw new SchedulerDynamicEditException("Auto creation of queue " +
|
||||
queue.getFullQueuePath() + " is not enabled under parent "
|
||||
+ existingParentQueue.getQueuePath());
|
||||
}
|
||||
|
||||
if (levelsToCreate > depthLimit) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Could not auto create queue " + queue.getFullQueuePath()
|
||||
+ ". In order to create the desired queue hierarchy, " +
|
||||
levelsToCreate + " levels of queues would need " +
|
||||
"to be created, which is above the limit.");
|
||||
}
|
||||
|
||||
for (ApplicationPlacementContext current : parentsToCreate) {
|
||||
existingParentQueue = existingParentQueue
|
||||
.addDynamicParentQueue(current.getFullQueuePath());
|
||||
queueManager.addQueue(existingParentQueue.getQueuePath(),
|
||||
existingParentQueue);
|
||||
}
|
||||
|
||||
LeafQueue leafQueue = existingParentQueue.addDynamicLeafQueue(
|
||||
queue.getFullQueuePath());
|
||||
queueManager.addQueue(leafQueue.getQueuePath(), leafQueue);
|
||||
|
||||
return leafQueue;
|
||||
}
|
||||
|
||||
private int extractDepthLimit(ParentQueue parentQueue) {
|
||||
if (parentQueue.isEligibleForAutoQueueCreation()) {
|
||||
return MAXIMUM_DEPTH_ALLOWED;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private CSQueue getQueue(String queue) {
|
||||
return queue != null ? queueManager.getQueue(queue) : null;
|
||||
}
|
||||
}
|
|
@ -106,6 +106,10 @@ public final class CapacitySchedulerConfigValidator {
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean isDynamicQueue(CSQueue csQueue) {
|
||||
return ((AbstractCSQueue)csQueue).isDynamicQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure all existing queues are present. Queues cannot be deleted if its not
|
||||
* in Stopped state, Queue's cannot be moved from one hierarchy to other also.
|
||||
|
@ -144,11 +148,13 @@ public final class CapacitySchedulerConfigValidator {
|
|||
LOG.info("Deleting Queue " + queuePath + ", as it is not"
|
||||
+ " present in the modified capacity configuration xml");
|
||||
} else {
|
||||
if (!isDynamicQueue(oldQueue)) {
|
||||
throw new IOException(oldQueue.getQueuePath() + " cannot be"
|
||||
+ " deleted from the capacity scheduler configuration, as the"
|
||||
+ " queue is not yet in stopped state. Current State : "
|
||||
+ oldQueue.getState());
|
||||
}
|
||||
}
|
||||
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
|
||||
//Queue's cannot be moved from one hierarchy to other
|
||||
throw new IOException(
|
||||
|
|
|
@ -2008,6 +2008,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED =
|
||||
AUTO_CREATE_CHILD_QUEUE_PREFIX + "enabled";
|
||||
|
||||
@Private
|
||||
private static final String AUTO_QUEUE_CREATION_V2_PREFIX =
|
||||
"auto-queue-creation-v2";
|
||||
|
||||
@Private
|
||||
public static final String AUTO_QUEUE_CREATION_V2_ENABLED =
|
||||
AUTO_QUEUE_CREATION_V2_PREFIX + ".enabled";
|
||||
|
||||
@Private
|
||||
public static final boolean DEFAULT_AUTO_QUEUE_CREATION_ENABLED = false;
|
||||
|
||||
@Private
|
||||
public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX =
|
||||
"leaf-queue-template";
|
||||
|
@ -2044,6 +2055,20 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
autoCreationEnabled);
|
||||
}
|
||||
|
||||
public void setAutoQueueCreationV2Enabled(String queuePath,
|
||||
boolean autoQueueCreation) {
|
||||
setBoolean(
|
||||
getQueuePrefix(queuePath) + AUTO_QUEUE_CREATION_V2_ENABLED,
|
||||
autoQueueCreation);
|
||||
}
|
||||
|
||||
public boolean isAutoQueueCreationV2Enabled(String queuePath) {
|
||||
boolean isAutoQueueCreation = getBoolean(
|
||||
getQueuePrefix(queuePath) + AUTO_QUEUE_CREATION_V2_ENABLED,
|
||||
DEFAULT_AUTO_QUEUE_CREATION_ENABLED);
|
||||
return isAutoQueueCreation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the auto created leaf queue's template configuration prefix
|
||||
* Leaf queue's template capacities are configured at the parent queue
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -212,7 +214,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
* @param conf the CapacitySchedulerConfiguration
|
||||
* @param parent the parent queue
|
||||
* @param queueName the queue name
|
||||
* @param queues all the queues
|
||||
* @param newQueues all the queues
|
||||
* @param oldQueues the old queues
|
||||
* @param hook the queue hook
|
||||
* @return the CSQueue
|
||||
|
@ -222,18 +224,28 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
CapacitySchedulerContext csContext,
|
||||
CapacitySchedulerConfiguration conf,
|
||||
CSQueue parent, String queueName,
|
||||
CSQueueStore queues,
|
||||
CSQueueStore newQueues,
|
||||
CSQueueStore oldQueues,
|
||||
QueueHook hook) throws IOException {
|
||||
CSQueue queue;
|
||||
String fullQueueName = (parent == null) ?
|
||||
queueName :
|
||||
(parent.getQueuePath() + "." + queueName);
|
||||
String[] childQueueNames = conf.getQueues(fullQueueName);
|
||||
String[] staticChildQueueNames = conf.getQueues(fullQueueName);
|
||||
List<String> childQueueNames = staticChildQueueNames != null ?
|
||||
Arrays.asList(staticChildQueueNames) : Collections.emptyList();
|
||||
|
||||
boolean isReservableQueue = conf.isReservable(fullQueueName);
|
||||
boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(
|
||||
fullQueueName);
|
||||
if (childQueueNames == null || childQueueNames.length == 0) {
|
||||
boolean isDynamicParent = false;
|
||||
|
||||
CSQueue oldQueue = oldQueues.get(fullQueueName);
|
||||
if (oldQueue instanceof ParentQueue) {
|
||||
isDynamicParent = ((ParentQueue) oldQueue).isDynamicQueue();
|
||||
}
|
||||
|
||||
if (childQueueNames.size() == 0 && !isDynamicParent) {
|
||||
if (null == parent) {
|
||||
throw new IllegalStateException(
|
||||
"Queue configuration missing child queue names for " + queueName);
|
||||
|
@ -258,7 +270,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
}
|
||||
childQueues.add(resQueue);
|
||||
((PlanQueue) queue).setChildQueues(childQueues);
|
||||
queues.add(resQueue);
|
||||
newQueues.add(resQueue);
|
||||
|
||||
} else if (isAutoCreateEnabled) {
|
||||
queue = new ManagedParentQueue(csContext, queueName, parent,
|
||||
|
@ -291,14 +303,14 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
List<CSQueue> childQueues = new ArrayList<>();
|
||||
for (String childQueueName : childQueueNames) {
|
||||
CSQueue childQueue = parseQueue(csContext, conf, queue, childQueueName,
|
||||
queues, oldQueues, hook);
|
||||
newQueues, oldQueues, hook);
|
||||
childQueues.add(childQueue);
|
||||
}
|
||||
parentQueue.setChildQueues(childQueues);
|
||||
|
||||
}
|
||||
|
||||
queues.add(queue);
|
||||
newQueues.add(queue);
|
||||
|
||||
LOG.info("Initialized queue: " + fullQueueName);
|
||||
return queue;
|
||||
|
@ -321,7 +333,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
}
|
||||
|
||||
for (CSQueue queue : existingQueues.getQueues()) {
|
||||
if (newQueues.get(queue.getQueuePath()) == null && !(
|
||||
if (!((AbstractCSQueue) queue).isDynamicQueue() && newQueues.get(
|
||||
queue.getQueuePath()) == null && !(
|
||||
queue instanceof AutoCreatedLeafQueue && conf
|
||||
.isAutoCreateChildQueueEnabled(
|
||||
queue.getParent().getQueuePath()))) {
|
||||
|
|
|
@ -168,11 +168,6 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
}
|
||||
|
||||
protected void setupQueueConfigs(Resource clusterResource)
|
||||
throws IOException {
|
||||
setupQueueConfigs(clusterResource, csContext.getConfiguration());
|
||||
}
|
||||
|
||||
@SuppressWarnings("checkstyle:nowhitespaceafter")
|
||||
protected void setupQueueConfigs(Resource clusterResource,
|
||||
CapacitySchedulerConfiguration conf) throws
|
||||
|
@ -529,6 +524,13 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
writeLock.lock();
|
||||
try {
|
||||
// We skip reinitialize for dynamic queues, when this is called, and
|
||||
// new queue is different from this queue, we will make this queue to be
|
||||
// static queue.
|
||||
if (newlyParsedQueue != this) {
|
||||
this.setDynamicQueue(false);
|
||||
}
|
||||
|
||||
// Sanity check
|
||||
if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue
|
||||
.getQueuePath().equals(getQueuePath())) {
|
||||
|
@ -552,11 +554,6 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
}
|
||||
|
||||
setupQueueConfigs(clusterResource, configuration);
|
||||
|
||||
// queue metrics are updated, more resource may be available
|
||||
// activate the pending applications if possible
|
||||
activateApplications();
|
||||
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
|
|||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -108,11 +109,18 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
|
||||
public ParentQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
this(cs, cs.getConfiguration(), queueName, parent, old);
|
||||
}
|
||||
|
||||
private ParentQueue(CapacitySchedulerContext cs,
|
||||
CapacitySchedulerConfiguration csConf, String queueName, CSQueue parent,
|
||||
CSQueue old)
|
||||
throws IOException {
|
||||
super(cs, queueName, parent, old);
|
||||
this.scheduler = cs;
|
||||
this.rootQueue = (parent == null);
|
||||
|
||||
float rawCapacity = cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath());
|
||||
float rawCapacity = csConf.getNonLabeledQueueCapacity(getQueuePath());
|
||||
|
||||
if (rootQueue &&
|
||||
(rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
|
||||
|
@ -125,7 +133,7 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
this.allowZeroCapacitySum =
|
||||
cs.getConfiguration().getAllowZeroCapacitySum(getQueuePath());
|
||||
|
||||
setupQueueConfigs(cs.getClusterResource());
|
||||
setupQueueConfigs(cs.getClusterResource(), csConf);
|
||||
|
||||
LOG.info("Initialized parent-queue " + queueName +
|
||||
" name=" + queueName +
|
||||
|
@ -139,11 +147,12 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
queueOrderingPolicy.getConfigName();
|
||||
}
|
||||
|
||||
protected void setupQueueConfigs(Resource clusterResource)
|
||||
protected void setupQueueConfigs(Resource clusterResource,
|
||||
CapacitySchedulerConfiguration csConf)
|
||||
throws IOException {
|
||||
writeLock.lock();
|
||||
try {
|
||||
super.setupQueueConfigs(clusterResource);
|
||||
super.setupQueueConfigs(clusterResource, csConf);
|
||||
StringBuilder aclsString = new StringBuilder();
|
||||
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
|
||||
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
|
||||
|
@ -158,7 +167,7 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
}
|
||||
|
||||
// Initialize queue ordering policy
|
||||
queueOrderingPolicy = csContext.getConfiguration().getQueueOrderingPolicy(
|
||||
queueOrderingPolicy = csConf.getQueueOrderingPolicy(
|
||||
getQueuePath(), parent == null ?
|
||||
null :
|
||||
((ParentQueue) parent).getQueueOrderingPolicyConfigName());
|
||||
|
@ -247,14 +256,11 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
+ "double check, details:" + diagMsg.toString());
|
||||
}
|
||||
|
||||
if (weightIsSet) {
|
||||
if (weightIsSet || queues.isEmpty()) {
|
||||
return QueueCapacityType.WEIGHT;
|
||||
} else if (absoluteMinResSet) {
|
||||
return QueueCapacityType.ABSOLUTE_RESOURCE;
|
||||
} else if (percentageIsSet) {
|
||||
return QueueCapacityType.PERCENT;
|
||||
} else {
|
||||
// When all values equals to 0, consider it is a percent mode.
|
||||
return QueueCapacityType.PERCENT;
|
||||
}
|
||||
}
|
||||
|
@ -465,11 +471,131 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
"numContainers=" + getNumContainers();
|
||||
}
|
||||
|
||||
private CapacitySchedulerConfiguration getConfForAutoCreatedQueue(
|
||||
String childQueuePath, boolean isLeaf) {
|
||||
// Copy existing config
|
||||
CapacitySchedulerConfiguration dupCSConfig =
|
||||
new CapacitySchedulerConfiguration(
|
||||
csContext.getConfiguration(), false);
|
||||
if (isLeaf) {
|
||||
// FIXME: Ideally we should disable user limit factor, see YARN-10531
|
||||
// dupCSConfig.setUserLimitFactor(childQueuePath, );
|
||||
|
||||
// Set Max AM percentage to a higher value
|
||||
dupCSConfig.setMaximumApplicationMasterResourcePerQueuePercent(
|
||||
childQueuePath, 0.5f);
|
||||
}
|
||||
|
||||
return dupCSConfig;
|
||||
}
|
||||
|
||||
private CSQueue createNewQueue(String childQueuePath, boolean isLeaf)
|
||||
throws SchedulerDynamicEditException {
|
||||
try {
|
||||
AbstractCSQueue childQueue;
|
||||
String queueShortName = childQueuePath.substring(
|
||||
childQueuePath.lastIndexOf(".") + 1);
|
||||
|
||||
if (isLeaf) {
|
||||
childQueue = new LeafQueue(csContext,
|
||||
getConfForAutoCreatedQueue(childQueuePath, isLeaf), queueShortName,
|
||||
this, null);
|
||||
} else{
|
||||
childQueue = new ParentQueue(csContext,
|
||||
getConfForAutoCreatedQueue(childQueuePath, isLeaf), queueShortName,
|
||||
this, null);
|
||||
}
|
||||
childQueue.setDynamicQueue(true);
|
||||
// It should be sufficient now, we don't need to set more, because weights
|
||||
// related setup will be handled in updateClusterResources
|
||||
|
||||
return childQueue;
|
||||
} catch (IOException e) {
|
||||
throw new SchedulerDynamicEditException(e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
public ParentQueue addDynamicParentQueue(String queuePath)
|
||||
throws SchedulerDynamicEditException {
|
||||
return (ParentQueue) addDynamicChildQueue(queuePath, false);
|
||||
}
|
||||
|
||||
public LeafQueue addDynamicLeafQueue(String queuePath)
|
||||
throws SchedulerDynamicEditException {
|
||||
return (LeafQueue) addDynamicChildQueue(queuePath, true);
|
||||
}
|
||||
|
||||
// New method to add child queue
|
||||
private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf)
|
||||
throws SchedulerDynamicEditException {
|
||||
writeLock.lock();
|
||||
try {
|
||||
// Check if queue exists, if queue exists, write a warning message (this
|
||||
// should not happen, since it will be handled before calling this method)
|
||||
// , but we will move on.
|
||||
CSQueue queue =
|
||||
csContext.getCapacitySchedulerQueueManager().getQueueByFullName(
|
||||
childQueuePath);
|
||||
if (queue != null) {
|
||||
LOG.warn(
|
||||
"This should not happen, trying to create queue=" + childQueuePath
|
||||
+ ", however the queue already exists");
|
||||
return queue;
|
||||
}
|
||||
|
||||
// First, check if we allow creation or not
|
||||
boolean weightsAreUsed = false;
|
||||
try {
|
||||
weightsAreUsed = getCapacityConfigurationTypeForQueues(childQueues)
|
||||
== QueueCapacityType.WEIGHT;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Caught Exception during auto queue creation", e);
|
||||
}
|
||||
if (!weightsAreUsed) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Trying to create new queue=" + childQueuePath
|
||||
+ " but not all the queues under parent=" + this.getQueuePath()
|
||||
+ " are using weight-based capacity. Failed to created queue");
|
||||
}
|
||||
|
||||
CSQueue newQueue = createNewQueue(childQueuePath, isLeaf);
|
||||
this.childQueues.add(newQueue);
|
||||
|
||||
// Call updateClusterResource
|
||||
// , which will deal with all effectiveMin/MaxResource
|
||||
// Calculation
|
||||
this.updateClusterResource(csContext.getClusterResource(),
|
||||
new ResourceLimits(this.csContext.getClusterResource()));
|
||||
|
||||
return newQueue;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether this queue supports adding additional child queues
|
||||
* dynamically.
|
||||
* @return true, if queue is eligible to create additional queues dynamically,
|
||||
* false otherwise
|
||||
*/
|
||||
public boolean isEligibleForAutoQueueCreation() {
|
||||
return isDynamicQueue() || csContext.getConfiguration().
|
||||
isAutoQueueCreationV2Enabled(getQueuePath());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reinitialize(CSQueue newlyParsedQueue,
|
||||
Resource clusterResource) throws IOException {
|
||||
writeLock.lock();
|
||||
try {
|
||||
// We skip reinitialize for dynamic queues, when this is called, and
|
||||
// new queue is different from this queue, we will make this queue to be
|
||||
// static queue.
|
||||
if (newlyParsedQueue != this) {
|
||||
this.setDynamicQueue(false);
|
||||
}
|
||||
|
||||
// Sanity check
|
||||
if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue
|
||||
.getQueuePath().equals(getQueuePath())) {
|
||||
|
@ -481,7 +607,7 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue;
|
||||
|
||||
// Set new configs
|
||||
setupQueueConfigs(clusterResource);
|
||||
setupQueueConfigs(clusterResource, csContext.getConfiguration());
|
||||
|
||||
// Re-configure existing child queues and add new ones
|
||||
// The CS has already checked to ensure all existing child queues are present!
|
||||
|
@ -537,6 +663,10 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
Map.Entry<String, CSQueue> e = itr.next();
|
||||
String queueName = e.getKey();
|
||||
if (!newChildQueues.containsKey(queueName)) {
|
||||
if (((AbstractCSQueue)e.getValue()).isDynamicQueue()) {
|
||||
// Don't remove dynamic queue if we cannot find it in the config.
|
||||
continue;
|
||||
}
|
||||
itr.remove();
|
||||
}
|
||||
}
|
||||
|
@ -1045,11 +1175,26 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
// below calculation for effective capacities
|
||||
updateAbsoluteCapacities();
|
||||
|
||||
// Normalize all dynamic queue queue's weight to 1 for all accessible node
|
||||
// labels, this is important because existing node labels could keep
|
||||
// changing when new node added, or node label mapping changed. We need
|
||||
// this to ensure auto created queue can access all labels.
|
||||
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
|
||||
for (CSQueue queue : childQueues) {
|
||||
// For dynamic queue, we will set weight to 1 every time, because it
|
||||
// is possible new labels added to the parent.
|
||||
if (((AbstractCSQueue) queue).isDynamicQueue()) {
|
||||
queue.getQueueCapacities().setWeight(nodeLabel, 1f);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Normalize weight of children
|
||||
if (getCapacityConfigurationTypeForQueues(childQueues)
|
||||
== QueueCapacityType.WEIGHT) {
|
||||
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
|
||||
float sumOfWeight = 0;
|
||||
|
||||
for (CSQueue queue : childQueues) {
|
||||
float weight = Math.max(0,
|
||||
queue.getQueueCapacities().getWeight(nodeLabel));
|
||||
|
|
|
@ -46,6 +46,7 @@ public class PlanQueue extends AbstractManagedParentQueue {
|
|||
public PlanQueue(CapacitySchedulerContext cs, String queueName,
|
||||
CSQueue parent, CSQueue old) throws IOException {
|
||||
super(cs, queueName, parent, old);
|
||||
updateAbsoluteCapacities();
|
||||
|
||||
this.schedulerContext = cs;
|
||||
// Set the reservation queue attributes for the Plan
|
||||
|
@ -100,7 +101,7 @@ public class PlanQueue extends AbstractManagedParentQueue {
|
|||
}
|
||||
|
||||
// Set new configs
|
||||
setupQueueConfigs(clusterResource);
|
||||
setupQueueConfigs(clusterResource, csContext.getConfiguration());
|
||||
|
||||
updateQuotas(newlyParsedParentQueue.userLimit,
|
||||
newlyParsedParentQueue.userLimitFactor,
|
||||
|
|
|
@ -82,7 +82,7 @@ public class QueueCapacities {
|
|||
.append("reserved_cap=" + capacitiesArr[7] + "%, ")
|
||||
.append("abs_reserved_cap=" + capacitiesArr[8] + "%, ")
|
||||
.append("weight=" + capacitiesArr[9] + "w, ")
|
||||
.append("normalized_weight=" + capacitiesArr[9] + "w}");
|
||||
.append("normalized_weight=" + capacitiesArr[10] + "w}");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -907,7 +907,12 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
@Test
|
||||
public void testDynamicAutoQueueCreationWithTags()
|
||||
throws Exception {
|
||||
MockRM rm = null;
|
||||
// This test we will reinitialize mockRM, so stop the previous initialized
|
||||
// mockRM to avoid issues like MetricsSystem
|
||||
if (mockRM != null) {
|
||||
mockRM.stop();
|
||||
}
|
||||
mockRM = null;
|
||||
try {
|
||||
CapacitySchedulerConfiguration csConf
|
||||
= new CapacitySchedulerConfiguration();
|
||||
|
@ -929,35 +934,35 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
|
||||
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
||||
mgr.init(csConf);
|
||||
rm = new MockRM(csConf) {
|
||||
mockRM = new MockRM(csConf) {
|
||||
@Override
|
||||
public RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
rm.start();
|
||||
MockNM nm = rm.registerNode("127.0.0.1:1234", 16 * GB);
|
||||
mockRM.start();
|
||||
MockNM nm = mockRM.registerNode("127.0.0.1:1234", 16 * GB);
|
||||
|
||||
MockRMAppSubmissionData data =
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(GB, mockRM)
|
||||
.withAppName("apptodynamicqueue")
|
||||
.withUser("hadoop")
|
||||
.withAcls(null)
|
||||
.withUnmanagedAM(false)
|
||||
.withApplicationTags(Sets.newHashSet("userid=testuser"))
|
||||
.build();
|
||||
RMApp app = MockRMAppSubmitter.submit(rm, data);
|
||||
MockRM.launchAndRegisterAM(app, rm, nm);
|
||||
RMApp app = MockRMAppSubmitter.submit(mockRM, data);
|
||||
MockRM.launchAndRegisterAM(app, mockRM, nm);
|
||||
nm.nodeHeartbeat(true);
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
||||
CSQueue queue = cs.getQueue("root.a.testuser");
|
||||
assertNotNull("Leaf queue has not been auto-created", queue);
|
||||
assertEquals("Number of running applications", 1,
|
||||
queue.getNumApplications());
|
||||
} finally {
|
||||
if (rm != null) {
|
||||
rm.close();
|
||||
if (mockRM != null) {
|
||||
mockRM.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,436 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class TestCapacitySchedulerNewQueueAutoCreation
|
||||
extends TestCapacitySchedulerAutoCreatedQueueBase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
org.apache.hadoop.yarn.server.resourcemanager
|
||||
.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.class);
|
||||
public static final int GB = 1024;
|
||||
private static final int MAX_MEMORY = 1200;
|
||||
private MockRM mockRM = null;
|
||||
private CapacityScheduler cs;
|
||||
private CapacitySchedulerConfiguration csConf;
|
||||
private CapacitySchedulerAutoQueueHandler autoQueueHandler;
|
||||
|
||||
/*
|
||||
Create the following structure:
|
||||
root
|
||||
/ \
|
||||
a b
|
||||
/
|
||||
a1
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
csConf = new CapacitySchedulerConfiguration();
|
||||
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
|
||||
// By default, set 3 queues, a/b, and a.a1
|
||||
csConf.setQueues("root", new String[]{"a", "b"});
|
||||
csConf.setNonLabeledQueueWeight("root", 1f);
|
||||
csConf.setNonLabeledQueueWeight("root.a", 1f);
|
||||
csConf.setNonLabeledQueueWeight("root.b", 1f);
|
||||
csConf.setQueues("root.a", new String[]{"a1"});
|
||||
csConf.setNonLabeledQueueWeight("root.a.a1", 1f);
|
||||
csConf.setAutoQueueCreationV2Enabled("root", true);
|
||||
csConf.setAutoQueueCreationV2Enabled("root.a", true);
|
||||
csConf.setAutoQueueCreationV2Enabled("root.e", true);
|
||||
}
|
||||
|
||||
private void startScheduler() throws Exception {
|
||||
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
||||
mgr.init(csConf);
|
||||
mockRM = new MockRM(csConf) {
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
||||
cs.updatePlacementRules();
|
||||
mockRM.start();
|
||||
cs.start();
|
||||
autoQueueHandler = new CapacitySchedulerAutoQueueHandler(
|
||||
cs.getCapacitySchedulerQueueManager(), csConf);
|
||||
mockRM.registerNode("h1:1234", MAX_MEMORY * GB); // label = x
|
||||
}
|
||||
|
||||
/*
|
||||
Create and validate the following structure:
|
||||
|
||||
root
|
||||
┌─────┬────────┬─────┴─────┬─────────┐
|
||||
a b c-auto e-auto d-auto
|
||||
| |
|
||||
a1 e1-auto
|
||||
*/
|
||||
private void createBasicQueueStructureAndValidate() throws Exception {
|
||||
// queue's weights are 1
|
||||
// root
|
||||
// - a (w=1)
|
||||
// - b (w=1)
|
||||
// - c-auto (w=1)
|
||||
// - d-auto (w=1)
|
||||
// - e-auto (w=1)
|
||||
// - e1-auto (w=1)
|
||||
MockNM nm1 = mockRM.registerNode("h1:1234", 1200 * GB); // label = x
|
||||
|
||||
createQueue("root.c-auto");
|
||||
|
||||
// Check if queue c-auto got created
|
||||
CSQueue c = cs.getQueue("root.c-auto");
|
||||
Assert.assertEquals(1 / 3f, c.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(400 * GB,
|
||||
c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
|
||||
// Now add another queue-d, in the same hierarchy
|
||||
createQueue("root.d-auto");
|
||||
|
||||
// Because queue-d has the same weight of other sibling queue, its abs cap
|
||||
// become 1/4
|
||||
CSQueue d = cs.getQueue("root.d-auto");
|
||||
Assert.assertEquals(1 / 4f, d.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(300 * GB,
|
||||
d.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
|
||||
// Now we check queue c again, it should also become 1/4 capacity
|
||||
Assert.assertEquals(1 / 4f, c.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(300 * GB,
|
||||
c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
|
||||
// Now we add a two-level queue, create leaf only
|
||||
// Now add another queue a2-auto, under root.a
|
||||
createQueue("root.a.a2-auto");
|
||||
|
||||
// root.a has 1/4 abs resource, a2/a1 has the same weight, so a2 has 1/8 abs
|
||||
// capacity
|
||||
CSQueue a2 = cs.getQueue("root.a.a2-auto");
|
||||
Assert.assertEquals(1 / 8f, a2.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, a2.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(150 * GB,
|
||||
a2.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
|
||||
// try, create leaf + parent, will success
|
||||
createQueue("root.e-auto.e1-auto");
|
||||
|
||||
// Now check capacity of e and e1 (under root we have 5 queues, so e1 get
|
||||
// 1/5 capacity
|
||||
CSQueue e = cs.getQueue("root.e-auto");
|
||||
Assert.assertEquals(1 / 5f, e.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, e.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(240 * GB,
|
||||
e.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
|
||||
// Under e, there's only one queue, so e1/e have same capacity
|
||||
CSQueue e1 = cs.getQueue("root.e-auto.e1-auto");
|
||||
Assert.assertEquals(1 / 5f, e1.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, e1.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(240 * GB,
|
||||
e1.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
}
|
||||
|
||||
/*
|
||||
Create and validate the structure:
|
||||
root
|
||||
┌─────┬────────┬─────┴───────┐
|
||||
a b c-auto d-auto
|
||||
|
|
||||
a1
|
||||
*/
|
||||
@Test
|
||||
public void testAutoCreateQueueWithSiblingsUnderRoot() throws Exception {
|
||||
startScheduler();
|
||||
|
||||
createQueue("root.c-auto");
|
||||
|
||||
// Check if queue c-auto got created
|
||||
CSQueue c = cs.getQueue("root.c-auto");
|
||||
Assert.assertEquals(1 / 3f, c.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(400 * GB,
|
||||
c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
|
||||
// Now add another queue-d, in the same hierarchy
|
||||
createQueue("root.d-auto");
|
||||
|
||||
// Because queue-d has the same weight of other sibling queue, its abs cap
|
||||
// become 1/4
|
||||
CSQueue d = cs.getQueue("root.d-auto");
|
||||
Assert.assertEquals(1 / 4f, d.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(300 * GB,
|
||||
d.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
|
||||
// Now we check queue c again, it should also become 1/4 capacity
|
||||
Assert.assertEquals(1 / 4f, c.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(300 * GB,
|
||||
c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
}
|
||||
|
||||
/*
|
||||
Create and validate the structure:
|
||||
root
|
||||
┌─────┴─────┐
|
||||
b a
|
||||
/ \
|
||||
a1 a2-auto
|
||||
*/
|
||||
@Test
|
||||
public void testAutoCreateQueueStaticParentOneLevel() throws Exception {
|
||||
startScheduler();
|
||||
// Now we add a two-level queue, create leaf only
|
||||
// Now add another queue a2-auto, under root.a
|
||||
createQueue("root.a.a2-auto");
|
||||
|
||||
// root.a has 1/2 abs resource, a2/a1 has the same weight, so a2 has 1/4 abs
|
||||
// capacity
|
||||
CSQueue a2 = cs.getQueue("root.a.a2-auto");
|
||||
Assert.assertEquals(1 / 4f, a2.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, a2.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(MAX_MEMORY * (1 / 4f) * GB,
|
||||
a2.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize(),
|
||||
1e-6);
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
Create and validate the structure:
|
||||
root
|
||||
┌─────┴─────┐
|
||||
b a
|
||||
| \
|
||||
a1 a2-auto
|
||||
| \
|
||||
a3-auto a4-auto
|
||||
*/
|
||||
@Test
|
||||
public void testAutoCreateQueueAutoParentTwoLevelsWithSiblings()
|
||||
throws Exception {
|
||||
startScheduler();
|
||||
csConf.setAutoQueueCreationV2Enabled("root.a.a2-auto", true);
|
||||
|
||||
// root.a has 1/2 abs resource -> a1 and a2-auto same weight 1/4
|
||||
// -> a3-auto is alone with weight 1/4
|
||||
createQueue("root.a.a2-auto.a3-auto");
|
||||
CSQueue a3 = cs.getQueue("root.a.a2-auto.a3-auto");
|
||||
Assert.assertEquals(1 / 4f, a3.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, a3.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(MAX_MEMORY * (1 / 4f) * GB,
|
||||
a3.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize(),
|
||||
1e-6);
|
||||
|
||||
// root.a has 1/2 abs resource -> a1 and a2-auto same weight 1/4
|
||||
// -> a3-auto and a4-auto same weight 1/8
|
||||
createQueue("root.a.a2-auto.a4-auto");
|
||||
CSQueue a4 = cs.getQueue("root.a.a2-auto.a4-auto");
|
||||
Assert.assertEquals(1 / 8f, a3.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(1f, a3.getQueueCapacities().getWeight(), 1e-6);
|
||||
Assert.assertEquals(MAX_MEMORY * (1 / 8f) * GB,
|
||||
a4.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize(),
|
||||
1e-6);
|
||||
}
|
||||
|
||||
@Test(expected = SchedulerDynamicEditException.class)
|
||||
public void testAutoCreateQueueShouldFailWhenNonParentQueue()
|
||||
throws Exception {
|
||||
startScheduler();
|
||||
createQueue("root.a.a1.a2-auto");
|
||||
}
|
||||
|
||||
@Test(expected = SchedulerDynamicEditException.class)
|
||||
public void testAutoCreateQueueWhenSiblingsNotInWeightMode()
|
||||
throws Exception {
|
||||
startScheduler();
|
||||
csConf.setCapacity("root.a", 50f);
|
||||
csConf.setCapacity("root.b", 50f);
|
||||
csConf.setCapacity("root.a.a1", 100f);
|
||||
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||
createQueue("root.a.a2-auto");
|
||||
}
|
||||
|
||||
@Test(expected = SchedulerDynamicEditException.class)
|
||||
public void testAutoCreateQueueShouldFailIfDepthIsAboveLimit()
|
||||
throws Exception {
|
||||
startScheduler();
|
||||
createQueue("root.a.a3-auto.a4-auto.a5-auto");
|
||||
}
|
||||
|
||||
@Test(expected = SchedulerDynamicEditException.class)
|
||||
public void testAutoCreateQueueShouldFailIfNotEnabledForParent()
|
||||
throws Exception {
|
||||
startScheduler();
|
||||
csConf.setAutoQueueCreationV2Enabled("root", false);
|
||||
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||
createQueue("root.c-auto");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCreateQueueRefresh() throws Exception {
|
||||
startScheduler();
|
||||
|
||||
createBasicQueueStructureAndValidate();
|
||||
|
||||
// Refresh the queue to make sure all queues are still exist.
|
||||
// (Basically, dynamic queues should not disappear after refresh).
|
||||
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||
|
||||
// Double confirm, after refresh, we should still see root queue has 5
|
||||
// children.
|
||||
Assert.assertEquals(5, cs.getQueue("root").getChildQueues().size());
|
||||
Assert.assertNotNull(cs.getQueue("root.c-auto"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertDynamicToStaticQueue() throws Exception {
|
||||
startScheduler();
|
||||
|
||||
createBasicQueueStructureAndValidate();
|
||||
|
||||
// Now, update root.a's weight to 6
|
||||
csConf.setNonLabeledQueueWeight("root.a", 6f);
|
||||
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||
|
||||
// Double confirm, after refresh, we should still see root queue has 5
|
||||
// children.
|
||||
Assert.assertEquals(5, cs.getQueue("root").getChildQueues().size());
|
||||
|
||||
// Get queue a
|
||||
CSQueue a = cs.getQueue("root.a");
|
||||
|
||||
// a's abs resource should be 6/10, (since a.weight=6, all other 4 peers
|
||||
// have weight=1).
|
||||
Assert.assertEquals(6 / 10f, a.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(720 * GB,
|
||||
a.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
Assert.assertEquals(6f, a.getQueueCapacities().getWeight(), 1e-6);
|
||||
|
||||
// Set queue c-auto's weight to 6, and mark c-auto to be static queue
|
||||
csConf.setQueues("root", new String[]{"a", "b", "c-auto"});
|
||||
csConf.setNonLabeledQueueWeight("root.c-auto", 6f);
|
||||
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||
|
||||
// Get queue c
|
||||
CSQueue c = cs.getQueue("root.c-auto");
|
||||
|
||||
// c's abs resource should be 6/15, (since a/c.weight=6, all other 3 peers
|
||||
// have weight=1).
|
||||
Assert.assertEquals(6 / 15f, c.getAbsoluteCapacity(), 1e-6);
|
||||
Assert.assertEquals(480 * GB,
|
||||
c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
Assert.assertEquals(6f, c.getQueueCapacities().getWeight(), 1e-6);
|
||||
|
||||
// First, create e2-auto queue
|
||||
createQueue("root.e-auto.e2-auto");
|
||||
|
||||
// Do change 2nd level queue from dynamic to static
|
||||
csConf.setQueues("root", new String[]{"a", "b", "c-auto", "e-auto"});
|
||||
csConf.setNonLabeledQueueWeight("root.e-auto", 6f);
|
||||
csConf.setQueues("root.e-auto", new String[]{"e1-auto"});
|
||||
csConf.setNonLabeledQueueWeight("root.e-auto.e1-auto", 6f);
|
||||
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||
|
||||
// Get queue e1
|
||||
CSQueue e1 = cs.getQueue("root.e-auto.e1-auto");
|
||||
|
||||
// e's abs resource should be 6/20 * (6/7),
|
||||
// (since a/c/e.weight=6, all other 2 peers
|
||||
// have weight=1, and e1's weight is 6, e2's weight is 1).
|
||||
float e1NormalizedWeight = (6 / 20f) * (6 / 7f);
|
||||
Assert.assertEquals(e1NormalizedWeight, e1.getAbsoluteCapacity(), 1e-6);
|
||||
assertQueueMinResource(e1, MAX_MEMORY * e1NormalizedWeight);
|
||||
Assert.assertEquals(6f, e1.getQueueCapacities().getWeight(), 1e-6);
|
||||
}
|
||||
|
||||
/*
|
||||
Create the structure and convert d-auto to static and leave d1-auto as dynamic
|
||||
root
|
||||
┌─────┬─────────────┴──────┐
|
||||
a b d-auto
|
||||
| |
|
||||
a1 d1-auto
|
||||
*/
|
||||
@Test
|
||||
public void testConvertDynamicParentToStaticParent() throws Exception {
|
||||
startScheduler();
|
||||
createQueue("root.d-auto.d1-auto");
|
||||
csConf.setQueues("root", new String[]{"a", "b", "d-auto"});
|
||||
csConf.setNonLabeledQueueWeight("root.a", 6f);
|
||||
csConf.setNonLabeledQueueWeight("root.d-auto", 1f);
|
||||
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||
|
||||
CSQueue d = cs.getQueue("root.d-auto");
|
||||
|
||||
Assert.assertEquals(1 / 8f, d.getAbsoluteCapacity(), 1e-6);
|
||||
assertQueueMinResource(d, MAX_MEMORY * (1 / 8f));
|
||||
Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6);
|
||||
|
||||
CSQueue d1 = cs.getQueue("root.d-auto.d1-auto");
|
||||
Assert.assertEquals(1 / 8f, d1.getAbsoluteCapacity(), 1e-6);
|
||||
assertQueueMinResource(d1, MAX_MEMORY * (1 / 8f));
|
||||
Assert.assertEquals(1f, d1.getQueueCapacities().getWeight(), 1e-6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoQueueCreationOnAppSubmission() throws Exception {
|
||||
startScheduler();
|
||||
createBasicQueueStructureAndValidate();
|
||||
|
||||
submitApp(cs, USER0, USER0, "root.e-auto");
|
||||
|
||||
AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e-auto");
|
||||
Assert.assertNotNull(e);
|
||||
Assert.assertTrue(e.isDynamicQueue());
|
||||
|
||||
AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue(
|
||||
"root.e-auto." + USER0);
|
||||
Assert.assertNotNull(user0);
|
||||
Assert.assertTrue(user0.isDynamicQueue());
|
||||
}
|
||||
|
||||
private LeafQueue createQueue(String queuePath) throws YarnException {
|
||||
return autoQueueHandler.autoCreateQueue(
|
||||
CSQueueUtils.extractQueuePath(queuePath));
|
||||
}
|
||||
|
||||
private void assertQueueMinResource(CSQueue queue, float expected) {
|
||||
Assert.assertEquals(Math.round(expected * GB),
|
||||
queue.getQueueResourceQuotas().getEffectiveMinResource()
|
||||
.getMemorySize(), 1e-6);
|
||||
}
|
||||
}
|
|
@ -3291,7 +3291,11 @@ public class TestLeafQueue {
|
|||
newQueues, queues,
|
||||
TestUtils.spyHook);
|
||||
queues = newQueues;
|
||||
// This will not update active apps
|
||||
root.reinitialize(newRoot, csContext.getClusterResource());
|
||||
// Cause this to update active apps
|
||||
root.updateClusterResource(csContext.getClusterResource(),
|
||||
new ResourceLimits(csContext.getClusterResource()));
|
||||
|
||||
// after reinitialization
|
||||
assertEquals(3, e.getNumActiveApplications());
|
||||
|
|
Loading…
Reference in New Issue