YARN-10571. Refactor dynamic queue handling logic. Contributed by Andras Gyori.
This commit is contained in:
parent
29105ffb63
commit
626be24c3e
|
@ -233,8 +233,6 @@ public class CapacityScheduler extends
|
||||||
private AppPriorityACLsManager appPriorityACLManager;
|
private AppPriorityACLsManager appPriorityACLManager;
|
||||||
private boolean multiNodePlacementEnabled;
|
private boolean multiNodePlacementEnabled;
|
||||||
|
|
||||||
private CapacitySchedulerAutoQueueHandler autoQueueHandler;
|
|
||||||
|
|
||||||
private boolean printedVerboseLoggingForAsyncScheduling;
|
private boolean printedVerboseLoggingForAsyncScheduling;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -343,9 +341,6 @@ public class CapacityScheduler extends
|
||||||
this.labelManager, this.appPriorityACLManager);
|
this.labelManager, this.appPriorityACLManager);
|
||||||
this.queueManager.setCapacitySchedulerContext(this);
|
this.queueManager.setCapacitySchedulerContext(this);
|
||||||
|
|
||||||
this.autoQueueHandler = new CapacitySchedulerAutoQueueHandler(
|
|
||||||
this.queueManager);
|
|
||||||
|
|
||||||
this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
|
this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
|
||||||
|
|
||||||
this.activitiesManager = new ActivitiesManager(rmContext);
|
this.activitiesManager = new ActivitiesManager(rmContext);
|
||||||
|
@ -970,7 +965,8 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
if (fallbackContext.hasParentQueue()) {
|
if (fallbackContext.hasParentQueue()) {
|
||||||
try {
|
try {
|
||||||
return autoCreateLeafQueue(fallbackContext);
|
writeLock.lock();
|
||||||
|
return queueManager.createQueue(fallbackContext);
|
||||||
} catch (YarnException | IOException e) {
|
} catch (YarnException | IOException e) {
|
||||||
// A null queue is expected if the placementContext is null. In order
|
// A null queue is expected if the placementContext is null. In order
|
||||||
// not to disrupt the control flow, if we fail to auto create a queue,
|
// not to disrupt the control flow, if we fail to auto create a queue,
|
||||||
|
@ -1007,6 +1003,8 @@ public class CapacityScheduler extends
|
||||||
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
|
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
|
||||||
message));
|
message));
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2557,30 +2555,7 @@ public class CapacityScheduler extends
|
||||||
throws SchedulerDynamicEditException {
|
throws SchedulerDynamicEditException {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
LOG.info("Removing queue: " + queueName);
|
queueManager.removeLegacyDynamicQueue(queueName);
|
||||||
CSQueue q = this.getQueue(queueName);
|
|
||||||
if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(
|
|
||||||
q.getClass()))) {
|
|
||||||
throw new SchedulerDynamicEditException(
|
|
||||||
"The queue that we are asked " + "to remove (" + queueName
|
|
||||||
+ ") is not a AutoCreatedLeafQueue or ReservationQueue");
|
|
||||||
}
|
|
||||||
AbstractAutoCreatedLeafQueue disposableLeafQueue =
|
|
||||||
(AbstractAutoCreatedLeafQueue) q;
|
|
||||||
// at this point we should have no more apps
|
|
||||||
if (disposableLeafQueue.getNumApplications() > 0) {
|
|
||||||
throw new SchedulerDynamicEditException(
|
|
||||||
"The queue " + queueName + " is not empty " + disposableLeafQueue
|
|
||||||
.getApplications().size() + " active apps "
|
|
||||||
+ disposableLeafQueue.getPendingApplications().size()
|
|
||||||
+ " pending apps");
|
|
||||||
}
|
|
||||||
|
|
||||||
((AbstractManagedParentQueue) disposableLeafQueue.getParent())
|
|
||||||
.removeChildQueue(q);
|
|
||||||
this.queueManager.removeQueue(queueName);
|
|
||||||
LOG.info(
|
|
||||||
"Removal of AutoCreatedLeafQueue " + queueName + " has succeeded");
|
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -2629,34 +2604,7 @@ public class CapacityScheduler extends
|
||||||
throws SchedulerDynamicEditException, IOException {
|
throws SchedulerDynamicEditException, IOException {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
if (queue == null) {
|
queueManager.addLegacyDynamicQueue(queue);
|
||||||
throw new SchedulerDynamicEditException(
|
|
||||||
"Queue specified is null. Should be an implementation of "
|
|
||||||
+ "AbstractAutoCreatedLeafQueue");
|
|
||||||
} else if (!(AbstractAutoCreatedLeafQueue.class
|
|
||||||
.isAssignableFrom(queue.getClass()))) {
|
|
||||||
throw new SchedulerDynamicEditException(
|
|
||||||
"Queue is not an implementation of "
|
|
||||||
+ "AbstractAutoCreatedLeafQueue : " + queue.getClass());
|
|
||||||
}
|
|
||||||
|
|
||||||
AbstractAutoCreatedLeafQueue newQueue =
|
|
||||||
(AbstractAutoCreatedLeafQueue) queue;
|
|
||||||
|
|
||||||
if (newQueue.getParent() == null || !(AbstractManagedParentQueue.class.
|
|
||||||
isAssignableFrom(newQueue.getParent().getClass()))) {
|
|
||||||
throw new SchedulerDynamicEditException(
|
|
||||||
"ParentQueue for " + newQueue + " is not properly set"
|
|
||||||
+ " (should be set and be a PlanQueue or ManagedParentQueue)");
|
|
||||||
}
|
|
||||||
|
|
||||||
AbstractManagedParentQueue parent =
|
|
||||||
(AbstractManagedParentQueue) newQueue.getParent();
|
|
||||||
String queuePath = newQueue.getQueuePath();
|
|
||||||
parent.addChildQueue(newQueue);
|
|
||||||
this.queueManager.addQueue(queuePath, newQueue);
|
|
||||||
|
|
||||||
LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded");
|
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -3490,41 +3438,4 @@ public class CapacityScheduler extends
|
||||||
public void setQueueManager(CapacitySchedulerQueueManager qm) {
|
public void setQueueManager(CapacitySchedulerQueueManager qm) {
|
||||||
this.queueManager = qm;
|
this.queueManager = qm;
|
||||||
}
|
}
|
||||||
|
|
||||||
private LeafQueue autoCreateLeafQueue(
|
|
||||||
ApplicationPlacementContext placementContext)
|
|
||||||
throws IOException, YarnException {
|
|
||||||
String leafQueueName = placementContext.getQueue();
|
|
||||||
String parentQueueName = placementContext.getParentQueue();
|
|
||||||
|
|
||||||
if (!StringUtils.isEmpty(parentQueueName)) {
|
|
||||||
CSQueue parentQueue = getQueue(parentQueueName);
|
|
||||||
|
|
||||||
if (parentQueue != null &&
|
|
||||||
conf.isAutoCreateChildQueueEnabled(parentQueue.getQueuePath())) {
|
|
||||||
// Case 1: Handle ManagedParentQueue
|
|
||||||
ManagedParentQueue autoCreateEnabledParentQueue =
|
|
||||||
(ManagedParentQueue) parentQueue;
|
|
||||||
AutoCreatedLeafQueue autoCreatedLeafQueue =
|
|
||||||
new AutoCreatedLeafQueue(
|
|
||||||
this, leafQueueName, autoCreateEnabledParentQueue);
|
|
||||||
|
|
||||||
addQueue(autoCreatedLeafQueue);
|
|
||||||
return autoCreatedLeafQueue;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
writeLock.lock();
|
|
||||||
return autoQueueHandler.autoCreateQueue(placementContext);
|
|
||||||
} finally {
|
|
||||||
writeLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new SchedulerDynamicEditException(
|
|
||||||
"Could not auto-create leaf queue for " + leafQueueName
|
|
||||||
+ ". Queue mapping does not specify"
|
|
||||||
+ " which parent queue it needs to be created under.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,140 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Manages the validation and the creation of a Capacity Scheduler
|
|
||||||
* queue at runtime.
|
|
||||||
*/
|
|
||||||
public class CapacitySchedulerAutoQueueHandler {
|
|
||||||
private final CapacitySchedulerQueueManager queueManager;
|
|
||||||
private static final int MAXIMUM_DEPTH_ALLOWED = 2;
|
|
||||||
|
|
||||||
public CapacitySchedulerAutoQueueHandler(
|
|
||||||
CapacitySchedulerQueueManager queueManager) {
|
|
||||||
this.queueManager = queueManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a LeafQueue and its upper hierarchy given a path. A parent is
|
|
||||||
* eligible for creation if either the placement context creation flags are
|
|
||||||
* set, or the auto queue creation is enabled for the first static parent in
|
|
||||||
* the hierarchy.
|
|
||||||
*
|
|
||||||
* @param queue the application placement information of the queue
|
|
||||||
* @return LeafQueue part of a given queue path
|
|
||||||
* @throws YarnException if the given path is not eligible to be auto created
|
|
||||||
*/
|
|
||||||
public LeafQueue autoCreateQueue(ApplicationPlacementContext queue)
|
|
||||||
throws YarnException {
|
|
||||||
ApplicationPlacementContext parentContext =
|
|
||||||
CSQueueUtils.extractQueuePath(queue.getParentQueue());
|
|
||||||
List<ApplicationPlacementContext> parentsToCreate = new ArrayList<>();
|
|
||||||
|
|
||||||
ApplicationPlacementContext queueCandidateContext = parentContext;
|
|
||||||
CSQueue firstExistingQueue = getQueue(
|
|
||||||
queueCandidateContext.getFullQueuePath());
|
|
||||||
|
|
||||||
while (firstExistingQueue == null) {
|
|
||||||
parentsToCreate.add(queueCandidateContext);
|
|
||||||
queueCandidateContext = CSQueueUtils.extractQueuePath(
|
|
||||||
queueCandidateContext.getParentQueue());
|
|
||||||
firstExistingQueue = getQueue(
|
|
||||||
queueCandidateContext.getFullQueuePath());
|
|
||||||
}
|
|
||||||
|
|
||||||
CSQueue firstExistingStaticQueue = firstExistingQueue;
|
|
||||||
// Include the LeafQueue in the distance
|
|
||||||
int firstStaticParentDistance = parentsToCreate.size() + 1;
|
|
||||||
|
|
||||||
while(isNonStaticParent(firstExistingStaticQueue)) {
|
|
||||||
queueCandidateContext = CSQueueUtils.extractQueuePath(
|
|
||||||
queueCandidateContext.getParentQueue());
|
|
||||||
firstExistingStaticQueue = getQueue(
|
|
||||||
queueCandidateContext.getFullQueuePath());
|
|
||||||
++firstStaticParentDistance;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reverse the collection to to represent the hierarchy to be created
|
|
||||||
// from highest to lowest level
|
|
||||||
Collections.reverse(parentsToCreate);
|
|
||||||
|
|
||||||
if (!(firstExistingQueue instanceof ParentQueue)) {
|
|
||||||
throw new SchedulerDynamicEditException(
|
|
||||||
"Could not auto create hierarchy of "
|
|
||||||
+ queue.getFullQueuePath() + ". Queue "
|
|
||||||
+ firstExistingQueue.getQueuePath() +
|
|
||||||
" is not a ParentQueue."
|
|
||||||
);
|
|
||||||
}
|
|
||||||
ParentQueue existingParentQueue = (ParentQueue) firstExistingQueue;
|
|
||||||
int depthLimit = extractDepthLimit(existingParentQueue);
|
|
||||||
|
|
||||||
if (depthLimit == 0) {
|
|
||||||
throw new SchedulerDynamicEditException("Auto creation of queue " +
|
|
||||||
queue.getFullQueuePath() + " is not enabled under parent "
|
|
||||||
+ existingParentQueue.getQueuePath());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (firstStaticParentDistance > depthLimit) {
|
|
||||||
throw new SchedulerDynamicEditException(
|
|
||||||
"Could not auto create queue " + queue.getFullQueuePath()
|
|
||||||
+ ". The distance of the LeafQueue from the first static " +
|
|
||||||
"ParentQueue is" + firstStaticParentDistance + ", which is " +
|
|
||||||
"above the limit.");
|
|
||||||
}
|
|
||||||
|
|
||||||
for (ApplicationPlacementContext current : parentsToCreate) {
|
|
||||||
existingParentQueue = existingParentQueue
|
|
||||||
.addDynamicParentQueue(current.getFullQueuePath());
|
|
||||||
queueManager.addQueue(existingParentQueue.getQueuePath(),
|
|
||||||
existingParentQueue);
|
|
||||||
}
|
|
||||||
|
|
||||||
LeafQueue leafQueue = existingParentQueue.addDynamicLeafQueue(
|
|
||||||
queue.getFullQueuePath());
|
|
||||||
queueManager.addQueue(leafQueue.getQueuePath(), leafQueue);
|
|
||||||
|
|
||||||
return leafQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
private int extractDepthLimit(ParentQueue parentQueue) {
|
|
||||||
if (parentQueue.isEligibleForAutoQueueCreation()) {
|
|
||||||
return MAXIMUM_DEPTH_ALLOWED;
|
|
||||||
} else {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private CSQueue getQueue(String queue) {
|
|
||||||
return queue != null ? queueManager.getQueue(queue) : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isNonStaticParent(CSQueue queue) {
|
|
||||||
return (!(queue instanceof AbstractCSQueue)
|
|
||||||
|| ((AbstractCSQueue) queue).isDynamicQueue());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -23,10 +23,13 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -70,6 +73,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final int MAXIMUM_DYNAMIC_QUEUE_DEPTH = 2;
|
||||||
private static final QueueHook NOOP = new QueueHook();
|
private static final QueueHook NOOP = new QueueHook();
|
||||||
private CapacitySchedulerContext csContext;
|
private CapacitySchedulerContext csContext;
|
||||||
private final YarnAuthorizationProvider authorizer;
|
private final YarnAuthorizationProvider authorizer;
|
||||||
|
@ -437,6 +441,229 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
return this.queueStateManager;
|
return this.queueStateManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes an {@code AutoCreatedLeafQueue} from the manager collection and
|
||||||
|
* from its parent children collection.
|
||||||
|
*
|
||||||
|
* @param queueName queue to be removed
|
||||||
|
* @throws SchedulerDynamicEditException if queue is not eligible for deletion
|
||||||
|
*/
|
||||||
|
public void removeLegacyDynamicQueue(String queueName)
|
||||||
|
throws SchedulerDynamicEditException {
|
||||||
|
LOG.info("Removing queue: " + queueName);
|
||||||
|
CSQueue q = this.getQueue(queueName);
|
||||||
|
if (q == null || !(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(
|
||||||
|
q.getClass()))) {
|
||||||
|
throw new SchedulerDynamicEditException(
|
||||||
|
"The queue that we are asked " + "to remove (" + queueName
|
||||||
|
+ ") is not a AutoCreatedLeafQueue or ReservationQueue");
|
||||||
|
}
|
||||||
|
AbstractAutoCreatedLeafQueue disposableLeafQueue =
|
||||||
|
(AbstractAutoCreatedLeafQueue) q;
|
||||||
|
// at this point we should have no more apps
|
||||||
|
if (disposableLeafQueue.getNumApplications() > 0) {
|
||||||
|
throw new SchedulerDynamicEditException(
|
||||||
|
"The queue " + queueName + " is not empty " + disposableLeafQueue
|
||||||
|
.getApplications().size() + " active apps "
|
||||||
|
+ disposableLeafQueue.getPendingApplications().size()
|
||||||
|
+ " pending apps");
|
||||||
|
}
|
||||||
|
|
||||||
|
((AbstractManagedParentQueue) disposableLeafQueue.getParent())
|
||||||
|
.removeChildQueue(q);
|
||||||
|
removeQueue(queueName);
|
||||||
|
LOG.info(
|
||||||
|
"Removal of AutoCreatedLeafQueue " + queueName + " has succeeded");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds an {@code AutoCreatedLeafQueue} to the manager collection and extends
|
||||||
|
* the children collection of its parent.
|
||||||
|
*
|
||||||
|
* @param queue to be added
|
||||||
|
* @throws SchedulerDynamicEditException if queue is not eligible to be added
|
||||||
|
* @throws IOException if parent can not accept the queue
|
||||||
|
*/
|
||||||
|
public void addLegacyDynamicQueue(Queue queue)
|
||||||
|
throws SchedulerDynamicEditException, IOException {
|
||||||
|
if (queue == null) {
|
||||||
|
throw new SchedulerDynamicEditException(
|
||||||
|
"Queue specified is null. Should be an implementation of "
|
||||||
|
+ "AbstractAutoCreatedLeafQueue");
|
||||||
|
} else if (!(AbstractAutoCreatedLeafQueue.class
|
||||||
|
.isAssignableFrom(queue.getClass()))) {
|
||||||
|
throw new SchedulerDynamicEditException(
|
||||||
|
"Queue is not an implementation of "
|
||||||
|
+ "AbstractAutoCreatedLeafQueue : " + queue.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
AbstractAutoCreatedLeafQueue newQueue =
|
||||||
|
(AbstractAutoCreatedLeafQueue) queue;
|
||||||
|
|
||||||
|
if (newQueue.getParent() == null || !(AbstractManagedParentQueue.class.
|
||||||
|
isAssignableFrom(newQueue.getParent().getClass()))) {
|
||||||
|
throw new SchedulerDynamicEditException(
|
||||||
|
"ParentQueue for " + newQueue + " is not properly set"
|
||||||
|
+ " (should be set and be a PlanQueue or ManagedParentQueue)");
|
||||||
|
}
|
||||||
|
|
||||||
|
AbstractManagedParentQueue parent =
|
||||||
|
(AbstractManagedParentQueue) newQueue.getParent();
|
||||||
|
String queuePath = newQueue.getQueuePath();
|
||||||
|
parent.addChildQueue(newQueue);
|
||||||
|
addQueue(queuePath, newQueue);
|
||||||
|
|
||||||
|
LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Auto creates a LeafQueue and its upper hierarchy given a path at runtime.
|
||||||
|
*
|
||||||
|
* @param queue the application placement information of the queue
|
||||||
|
* @return the auto created LeafQueue
|
||||||
|
* @throws YarnException if the given path is not eligible to be auto created
|
||||||
|
* @throws IOException if the given path can not be added to the parent
|
||||||
|
*/
|
||||||
|
public LeafQueue createQueue(ApplicationPlacementContext queue)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
String leafQueueName = queue.getQueue();
|
||||||
|
String parentQueueName = queue.getParentQueue();
|
||||||
|
|
||||||
|
if (!StringUtils.isEmpty(parentQueueName)) {
|
||||||
|
CSQueue parentQueue = getQueue(parentQueueName);
|
||||||
|
|
||||||
|
if (parentQueue != null && csContext.getConfiguration()
|
||||||
|
.isAutoCreateChildQueueEnabled(parentQueue.getQueuePath())) {
|
||||||
|
return createLegacyAutoQueue(queue);
|
||||||
|
} else {
|
||||||
|
return createAutoQueue(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new SchedulerDynamicEditException(
|
||||||
|
"Could not auto-create leaf queue for " + leafQueueName
|
||||||
|
+ ". Queue mapping does not specify"
|
||||||
|
+ " which parent queue it needs to be created under.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines the missing parent paths of a potentially auto creatable queue.
|
||||||
|
* The missing parents are sorted in a way that the first item is the highest
|
||||||
|
* in the hierarchy.
|
||||||
|
* Example:
|
||||||
|
* root.a, root.a.b, root.a.b.c
|
||||||
|
*
|
||||||
|
* @param queue to be auto created
|
||||||
|
* @return missing parent paths
|
||||||
|
* @throws SchedulerDynamicEditException if the given queue is not eligible
|
||||||
|
* to be auto created
|
||||||
|
*/
|
||||||
|
public List<String> determineMissingParents(
|
||||||
|
ApplicationPlacementContext queue) throws SchedulerDynamicEditException {
|
||||||
|
if (!queue.hasParentQueue()) {
|
||||||
|
throw new SchedulerDynamicEditException("Can not auto create queue "
|
||||||
|
+ queue.getFullQueuePath() + " due to missing ParentQueue path.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start from the first parent
|
||||||
|
int firstStaticParentDistance = 1;
|
||||||
|
|
||||||
|
StringBuilder parentCandidate = new StringBuilder(queue.getParentQueue());
|
||||||
|
LinkedList<String> parentsToCreate = new LinkedList<>();
|
||||||
|
|
||||||
|
CSQueue firstExistingParent = getQueue(parentCandidate.toString());
|
||||||
|
CSQueue firstExistingStaticParent = firstExistingParent;
|
||||||
|
|
||||||
|
while (isNonStaticParent(firstExistingStaticParent)
|
||||||
|
&& parentCandidate.length() != 0) {
|
||||||
|
++firstStaticParentDistance;
|
||||||
|
|
||||||
|
if (firstStaticParentDistance > MAXIMUM_DYNAMIC_QUEUE_DEPTH) {
|
||||||
|
throw new SchedulerDynamicEditException(
|
||||||
|
"Could not auto create queue " + queue.getFullQueuePath()
|
||||||
|
+ ". The distance of the LeafQueue from the first static " +
|
||||||
|
"ParentQueue is " + firstStaticParentDistance + ", which is " +
|
||||||
|
"above the limit.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (firstExistingParent == null) {
|
||||||
|
parentsToCreate.addFirst(parentCandidate.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
int lastIndex = parentCandidate.lastIndexOf(".");
|
||||||
|
parentCandidate.setLength(Math.max(lastIndex, 0));
|
||||||
|
|
||||||
|
if (firstExistingParent == null) {
|
||||||
|
firstExistingParent = getQueue(parentCandidate.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
firstExistingStaticParent = getQueue(parentCandidate.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(firstExistingParent instanceof ParentQueue)) {
|
||||||
|
throw new SchedulerDynamicEditException(
|
||||||
|
"Could not auto create hierarchy of "
|
||||||
|
+ queue.getFullQueuePath() + ". Queue " + queue.getParentQueue() +
|
||||||
|
" is not a ParentQueue."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
ParentQueue existingParentQueue = (ParentQueue) firstExistingParent;
|
||||||
|
|
||||||
|
if (!existingParentQueue.isEligibleForAutoQueueCreation()) {
|
||||||
|
throw new SchedulerDynamicEditException("Auto creation of queue " +
|
||||||
|
queue.getFullQueuePath() + " is not enabled under parent "
|
||||||
|
+ existingParentQueue.getQueuePath());
|
||||||
|
}
|
||||||
|
|
||||||
|
return parentsToCreate;
|
||||||
|
}
|
||||||
|
|
||||||
|
private LeafQueue createAutoQueue(ApplicationPlacementContext queue)
|
||||||
|
throws SchedulerDynamicEditException {
|
||||||
|
List<String> parentsToCreate = determineMissingParents(queue);
|
||||||
|
// First existing parent is either the parent of the last missing parent
|
||||||
|
// or the parent of the given path
|
||||||
|
String existingParentName = queue.getParentQueue();
|
||||||
|
if (!parentsToCreate.isEmpty()) {
|
||||||
|
existingParentName = parentsToCreate.get(0).substring(
|
||||||
|
0, parentsToCreate.get(0).lastIndexOf("."));
|
||||||
|
}
|
||||||
|
|
||||||
|
ParentQueue existingParentQueue = (ParentQueue) getQueue(
|
||||||
|
existingParentName);
|
||||||
|
|
||||||
|
for (String current : parentsToCreate) {
|
||||||
|
existingParentQueue = existingParentQueue.addDynamicParentQueue(current);
|
||||||
|
addQueue(existingParentQueue.getQueuePath(), existingParentQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
LeafQueue leafQueue = existingParentQueue.addDynamicLeafQueue(
|
||||||
|
queue.getFullQueuePath());
|
||||||
|
addQueue(leafQueue.getQueuePath(), leafQueue);
|
||||||
|
|
||||||
|
return leafQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private LeafQueue createLegacyAutoQueue(ApplicationPlacementContext queue)
|
||||||
|
throws IOException, SchedulerDynamicEditException {
|
||||||
|
CSQueue parentQueue = getQueue(queue.getParentQueue());
|
||||||
|
// Case 1: Handle ManagedParentQueue
|
||||||
|
ManagedParentQueue autoCreateEnabledParentQueue =
|
||||||
|
(ManagedParentQueue) parentQueue;
|
||||||
|
AutoCreatedLeafQueue autoCreatedLeafQueue =
|
||||||
|
new AutoCreatedLeafQueue(
|
||||||
|
csContext, queue.getQueue(), autoCreateEnabledParentQueue);
|
||||||
|
|
||||||
|
addLegacyDynamicQueue(autoCreatedLeafQueue);
|
||||||
|
return autoCreatedLeafQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isNonStaticParent(CSQueue queue) {
|
||||||
|
return (!(queue instanceof AbstractCSQueue)
|
||||||
|
|| ((AbstractCSQueue) queue).isDynamicQueue());
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isDynamicQueue(CSQueue queue) {
|
private boolean isDynamicQueue(CSQueue queue) {
|
||||||
return (queue instanceof AbstractCSQueue) &&
|
return (queue instanceof AbstractCSQueue) &&
|
||||||
((AbstractCSQueue) queue).isDynamicQueue();
|
((AbstractCSQueue) queue).isDynamicQueue();
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
|
@ -49,6 +48,8 @@ import org.slf4j.LoggerFactory;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
public class TestCapacitySchedulerNewQueueAutoCreation
|
public class TestCapacitySchedulerNewQueueAutoCreation
|
||||||
extends TestCapacitySchedulerAutoCreatedQueueBase {
|
extends TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
@ -59,7 +60,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
|
||||||
private MockRM mockRM = null;
|
private MockRM mockRM = null;
|
||||||
private CapacityScheduler cs;
|
private CapacityScheduler cs;
|
||||||
private CapacitySchedulerConfiguration csConf;
|
private CapacitySchedulerConfiguration csConf;
|
||||||
private CapacitySchedulerAutoQueueHandler autoQueueHandler;
|
private CapacitySchedulerQueueManager autoQueueHandler;
|
||||||
private AutoCreatedQueueDeletionPolicy policy = new
|
private AutoCreatedQueueDeletionPolicy policy = new
|
||||||
AutoCreatedQueueDeletionPolicy();
|
AutoCreatedQueueDeletionPolicy();
|
||||||
|
|
||||||
|
@ -114,8 +115,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
|
||||||
policy.init(cs.getConfiguration(), cs.getRMContext(), cs);
|
policy.init(cs.getConfiguration(), cs.getRMContext(), cs);
|
||||||
mockRM.start();
|
mockRM.start();
|
||||||
cs.start();
|
cs.start();
|
||||||
autoQueueHandler = new CapacitySchedulerAutoQueueHandler(
|
autoQueueHandler = cs.getCapacitySchedulerQueueManager();
|
||||||
cs.getCapacitySchedulerQueueManager());
|
|
||||||
mockRM.registerNode("h1:1234", MAX_MEMORY * GB); // label = x
|
mockRM.registerNode("h1:1234", MAX_MEMORY * GB); // label = x
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -608,6 +608,34 @@ public class TestCapacitySchedulerNewQueueAutoCreation
|
||||||
Assert.assertEquals(50, e1.getMaxApplications());
|
Assert.assertEquals(50, e1.getMaxApplications());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = SchedulerDynamicEditException.class)
|
||||||
|
public void testAutoCreateQueueWithAmbiguousNonFullPathParentName()
|
||||||
|
throws Exception {
|
||||||
|
startScheduler();
|
||||||
|
|
||||||
|
createQueue("root.a.a");
|
||||||
|
createQueue("a.a");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoCreateQueueIfFirstExistingParentQueueIsNotStatic()
|
||||||
|
throws Exception {
|
||||||
|
startScheduler();
|
||||||
|
|
||||||
|
// create a dynamic ParentQueue
|
||||||
|
createQueue("root.a.a-parent-auto.a1-leaf-auto");
|
||||||
|
Assert.assertNotNull(cs.getQueue("root.a.a-parent-auto"));
|
||||||
|
|
||||||
|
// create a new dynamic LeafQueue under the existing ParentQueue
|
||||||
|
createQueue("root.a.a-parent-auto.a2-leaf-auto");
|
||||||
|
|
||||||
|
CSQueue a2Leaf = cs.getQueue("a2-leaf-auto");
|
||||||
|
|
||||||
|
// Make sure a2-leaf-auto is under a-parent-auto
|
||||||
|
Assert.assertEquals("root.a.a-parent-auto",
|
||||||
|
a2Leaf.getParent().getQueuePath());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoCreateQueueIfAmbiguousQueueNames() throws Exception {
|
public void testAutoCreateQueueIfAmbiguousQueueNames() throws Exception {
|
||||||
startScheduler();
|
startScheduler();
|
||||||
|
@ -1109,8 +1137,9 @@ public class TestCapacitySchedulerNewQueueAutoCreation
|
||||||
"when its dynamic parent is removed", bAutoLeaf);
|
"when its dynamic parent is removed", bAutoLeaf);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected LeafQueue createQueue(String queuePath) throws YarnException {
|
protected LeafQueue createQueue(String queuePath) throws YarnException,
|
||||||
return autoQueueHandler.autoCreateQueue(
|
IOException {
|
||||||
|
return autoQueueHandler.createQueue(
|
||||||
CSQueueUtils.extractQueuePath(queuePath));
|
CSQueueUtils.extractQueuePath(queuePath));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,14 +38,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerAutoQueueHandler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||||
|
@ -85,7 +83,7 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
||||||
private static final int GB = 1024;
|
private static final int GB = 1024;
|
||||||
protected static MockRM RM;
|
protected static MockRM RM;
|
||||||
|
|
||||||
private CapacitySchedulerAutoQueueHandler autoQueueHandler;
|
private CapacitySchedulerQueueManager autoQueueHandler;
|
||||||
private CapacitySchedulerConfiguration csConf;
|
private CapacitySchedulerConfiguration csConf;
|
||||||
|
|
||||||
private static class ExpectedQueueWithProperties {
|
private static class ExpectedQueueWithProperties {
|
||||||
|
@ -330,13 +328,13 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
||||||
|
|
||||||
private void initAutoQueueHandler() throws Exception {
|
private void initAutoQueueHandler() throws Exception {
|
||||||
CapacityScheduler cs = (CapacityScheduler) RM.getResourceScheduler();
|
CapacityScheduler cs = (CapacityScheduler) RM.getResourceScheduler();
|
||||||
autoQueueHandler = new CapacitySchedulerAutoQueueHandler(
|
autoQueueHandler = cs.getCapacitySchedulerQueueManager();
|
||||||
cs.getCapacitySchedulerQueueManager());
|
|
||||||
MockNM nm1 = RM.registerNode("h1:1234", 1200 * GB); // label = x
|
MockNM nm1 = RM.registerNode("h1:1234", 1200 * GB); // label = x
|
||||||
}
|
}
|
||||||
|
|
||||||
private LeafQueue createQueue(String queuePath) throws YarnException {
|
private LeafQueue createQueue(String queuePath) throws YarnException,
|
||||||
return autoQueueHandler.autoCreateQueue(
|
IOException {
|
||||||
|
return autoQueueHandler.createQueue(
|
||||||
CSQueueUtils.extractQueuePath(queuePath));
|
CSQueueUtils.extractQueuePath(queuePath));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue