diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java index 2bc0407f440..86fd8b545ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java @@ -29,6 +29,10 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; * * * @see QueueInfo @@ -41,7 +45,12 @@ public enum QueueState { * Stopped - Not accepting submissions of new applications. */ STOPPED, - + + /** + * Draining - Not accepting submissions of new applications, + * and waiting for applications finish. + */ + DRAINING, /** * Running - normal operation. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index db72f7442e7..c1bb07ec603 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -432,6 +432,7 @@ message YarnClusterMetricsProto { enum QueueStateProto { Q_STOPPED = 1; Q_RUNNING = 2; + Q_DRAINING = 3; } message QueueStatisticsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java new file mode 100644 index 00000000000..761817e455f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; + +/** + * + * QueueStateManager which can be used by Scheduler to manage the queue state. + * + */ +// TODO: The class will be used by YARN-5734-OrgQueue for +// easy CapacityScheduler queue configuration management. +@SuppressWarnings("rawtypes") +@Private +@Unstable +public class QueueStateManager { + + private static final Log LOG = LogFactory.getLog(QueueStateManager.class); + + private SchedulerQueueManager queueManager; + + public synchronized void initialize(SchedulerQueueManager + newQueueManager) { + this.queueManager = newQueueManager; + } + + /** + * Stop the queue. + * @param queueName the queue name + * @throws YarnException if the queue does not exist + */ + @SuppressWarnings("unchecked") + public synchronized void stopQueue(String queueName) throws YarnException { + SchedulerQueue queue = queueManager.getQueue(queueName); + if (queue == null) { + throw new YarnException("The specified queue:" + queueName + + " does not exist!"); + } + queue.stopQueue(); + } + + /** + * Active the queue. + * @param queueName the queue name + * @throws YarnException if the queue does not exist + * or the queue can not be activated. + */ + @SuppressWarnings("unchecked") + public synchronized void activateQueue(String queueName) + throws YarnException { + SchedulerQueue queue = queueManager.getQueue(queueName); + if (queue == null) { + throw new YarnException("The specified queue:" + queueName + + " does not exist!"); + } + queue.activeQueue(); + } + + /** + * Whether this queue can be deleted. + * @param queueName the queue name + * @return true if the queue can be deleted + */ + @SuppressWarnings("unchecked") + public boolean canDelete(String queueName) { + SchedulerQueue queue = queueManager.getQueue(queueName); + if (queue == null) { + LOG.info("The specified queue:" + queueName + " does not exist!"); + return false; + } + if (queue.getState() == QueueState.STOPPED){ + return true; + } + LOG.info("Need to stop the specific queue:" + queueName + " first."); + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java new file mode 100644 index 00000000000..9a67e0188ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * + * Represents a queue in Scheduler. + * + */ +@SuppressWarnings("rawtypes") +@LimitedPrivate("yarn") +public interface SchedulerQueue extends Queue { + + /** + * Get list of child queues. + * @return a list of child queues + */ + List getChildQueues(); + + /** + * Get the parent queue. + * @return the parent queue + */ + T getParent(); + + /** + * Get current queue state. + * @return the queue state + */ + QueueState getState(); + + /** + * Update the queue state. + * @param state the queue state + */ + void updateQueueState(QueueState state); + + /** + * Stop the queue. + */ + void stopQueue(); + + /** + * Active the queue. + * @throws YarnException if the queue can not be activated. + */ + void activeQueue() throws YarnException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java index 92b989a8b25..24797a6b7b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java @@ -29,9 +29,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSche * Context of the Queues in Scheduler. * */ +@SuppressWarnings("rawtypes") @Private @Unstable -public interface SchedulerQueueManager { /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 33723929e00..d1fa410d78d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueStatistics; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessRequest; @@ -77,7 +78,7 @@ public abstract class AbstractCSQueue implements CSQueue { final Resource minimumAllocation; volatile Resource maximumAllocation; - volatile QueueState state; + private volatile QueueState state = null; final CSQueueMetrics metrics; protected final PrivilegedEntity queueEntity; @@ -292,9 +293,15 @@ public abstract class AbstractCSQueue implements CSQueue { csContext.getConfiguration().getMaximumAllocationPerQueue( getQueuePath()); - authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); + // initialized the queue state based on previous state, configured state + // and its parent state. + QueueState previous = getState(); + QueueState configuredState = csContext.getConfiguration() + .getConfiguredState(getQueuePath()); + QueueState parentState = (parent == null) ? null : parent.getState(); + initializeQueueState(previous, configuredState, parentState); - initializeQueueState(); + authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); this.acls = csContext.getConfiguration().getAcls(getQueuePath()); @@ -334,26 +341,53 @@ public abstract class AbstractCSQueue implements CSQueue { } } - private void initializeQueueState() { - // inherit from parent if state not set, only do this when we are not root - if (parent != null) { - QueueState configuredState = csContext.getConfiguration() - .getConfiguredState(getQueuePath()); - QueueState parentState = parent.getState(); - if (configuredState == null) { - this.state = parentState; - } else if (configuredState == QueueState.RUNNING - && parentState == QueueState.STOPPED) { - throw new IllegalArgumentException( - "The parent queue:" + parent.getQueueName() + " state is STOPPED, " - + "child queue:" + queueName + " state cannot be RUNNING."); + private void initializeQueueState(QueueState previousState, + QueueState configuredState, QueueState parentState) { + // verify that we can not any value for State other than RUNNING/STOPPED + if (configuredState != null && configuredState != QueueState.RUNNING + && configuredState != QueueState.STOPPED) { + throw new IllegalArgumentException("Invalid queue state configuration." + + " We can only use RUNNING or STOPPED."); + } + // If we did not set state in configuration, use Running as default state + QueueState defaultState = QueueState.RUNNING; + + if (previousState == null) { + // If current state of the queue is null, we would inherit the state + // from its parent. If this queue does not has parent, such as root queue, + // we would use the configured state. + if (parentState == null) { + updateQueueState((configuredState == null) ? defaultState + : configuredState); } else { - this.state = configuredState; + if (configuredState == null) { + updateQueueState((parentState == QueueState.DRAINING) ? + QueueState.STOPPED : parentState); + } else if (configuredState == QueueState.RUNNING + && parentState != QueueState.RUNNING) { + throw new IllegalArgumentException( + "The parent queue:" + parent.getQueueName() + + " state is STOPPED, child queue:" + queueName + + " state cannot be RUNNING."); + } else { + updateQueueState(configuredState); + } } } else { - // if this is the root queue, get the state from the configuration. - // if the state is not set, use RUNNING as default state. - this.state = csContext.getConfiguration().getState(getQueuePath()); + // when we get a refreshQueue request from AdminService, + if (previousState == QueueState.RUNNING) { + if (configuredState == QueueState.STOPPED) { + stopQueue(); + } + } else { + if (configuredState == QueueState.RUNNING) { + try { + activeQueue(); + } catch (YarnException ex) { + throw new IllegalArgumentException(ex.getMessage()); + } + } + } } } @@ -367,7 +401,7 @@ public abstract class AbstractCSQueue implements CSQueue { queueInfo.setAccessibleNodeLabels(accessibleLabels); queueInfo.setCapacity(queueCapacities.getCapacity()); queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity()); - queueInfo.setQueueState(state); + queueInfo.setQueueState(getState()); queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression); queueInfo.setCurrentCapacity(getUsedCapacity()); queueInfo.setQueueStatistics(getQueueStatistics()); @@ -846,4 +880,47 @@ public abstract class AbstractCSQueue implements CSQueue { String userName, String queue) throws AccessControlException { // Dummy implementation } + + @Override + public void updateQueueState(QueueState queueState) { + this.state = queueState; + } + + @Override + public void activeQueue() throws YarnException { + try { + this.writeLock.lock(); + if (getState() == QueueState.RUNNING) { + LOG.info("The specified queue:" + queueName + + " is already in the RUNNING state."); + } else if (getState() == QueueState.DRAINING) { + throw new YarnException( + "The queue:" + queueName + " is in the Stopping process. " + + "Please wait for the queue getting fully STOPPED."); + } else { + CSQueue parent = getParent(); + if (parent == null || parent.getState() == QueueState.RUNNING) { + updateQueueState(QueueState.RUNNING); + } else { + throw new YarnException("The parent Queue:" + parent.getQueueName() + + " is not running. Please activate the parent queue first"); + } + } + } finally { + this.writeLock.unlock(); + } + } + + protected void appFinished() { + try { + this.writeLock.lock(); + if (getState() == QueueState.DRAINING) { + if (getNumApplications() == 0) { + updateQueueState(QueueState.STOPPED); + } + } + } finally { + this.writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 550e2065346..e30ec39dbf6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -56,8 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleP */ @Stable @Private -public interface CSQueue -extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { +public interface CSQueue extends SchedulerQueue { /** * Get the parent Queue. * @return the parent queue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index be22c9ce3e7..cb614d2c367 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2475,4 +2475,9 @@ public class CapacityScheduler extends } return 0; } + + @Override + public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() { + return this.queueManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index c41a7bf5c6b..7d29619d759 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -83,4 +83,6 @@ public interface CapacitySchedulerContext { ResourceUsage getClusterResourceUsage(); ActivitiesManager getActivitiesManager(); + + CapacitySchedulerQueueManager getCapacitySchedulerQueueManager(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 7a6ce567f87..6a3c08a97a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; @@ -86,6 +87,9 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< private CSQueue root; private final RMNodeLabelsManager labelManager; + private QueueStateManager + queueStateManager; + /** * Construct the service. * @param conf the configuration @@ -95,6 +99,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< RMNodeLabelsManager labelManager) { this.authorizer = YarnAuthorizationProvider.getInstance(conf); this.labelManager = labelManager; + this.queueStateManager = new QueueStateManager<>(); } @Override @@ -142,6 +147,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); setQueueAcls(authorizer, queues); labelManager.reinitializeQueueLabels(getQueueToLabels()); + this.queueStateManager.initialize(this); LOG.info("Initialized root queue " + root); } @@ -170,6 +176,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< clusterResource)); labelManager.reinitializeQueueLabels(getQueueToLabels()); + this.queueStateManager.initialize(this); } /** @@ -358,4 +365,10 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< } return queueToLabels; } + + @Private + public QueueStateManager + getQueueStateManager() { + return this.queueStateManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index bedab429d82..5e4e4416474 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -282,7 +282,7 @@ public class LeafQueue extends AbstractCSQueue { + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = " + maximumAllocation + " [= configuredMaxAllocation ]" + "\n" + "numContainers = " + numContainers - + " [= currentNumContainers ]" + "\n" + "state = " + state + + " [= currentNumContainers ]" + "\n" + "state = " + getState() + " [= configuredState ]" + "\n" + "acls = " + aclsString + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder @@ -888,6 +888,9 @@ public class LeafQueue extends AbstractCSQueue { public void finishApplication(ApplicationId application, String user) { // Inform the activeUsersManager activeUsersManager.deactivateApplication(user, application); + + appFinished(); + // Inform the parent queue getParent().finishApplication(application, user); } @@ -2434,4 +2437,18 @@ public class LeafQueue extends AbstractCSQueue { return clusterResource; } } + + @Override + public void stopQueue() { + try { + writeLock.lock(); + if (getNumApplications() > 0) { + updateQueueState(QueueState.DRAINING); + } else { + updateQueueState(QueueState.STOPPED); + } + } finally { + writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 0ba4edef2a2..946fca3166d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -133,7 +133,7 @@ public class ParentQueue extends AbstractCSQueue { + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", absoluteMaxCapacity=" + this.queueCapacities - .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + .getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + ", reservationsContinueLooking=" + reservationsContinueLooking); } finally { @@ -369,7 +369,7 @@ public class ParentQueue extends AbstractCSQueue { "Cannot submit application " + "to non-leaf queue: " + queueName); } - if (state != QueueState.RUNNING) { + if (getState() != QueueState.RUNNING) { throw new AccessControlException("Queue " + getQueuePath() + " is STOPPED. Cannot accept submission of application: " + applicationId); @@ -411,7 +411,9 @@ public class ParentQueue extends AbstractCSQueue { public void finishApplication(ApplicationId application, String user) { removeApplication(application, user); - + + appFinished(); + // Inform the parent queue if (parent != null) { parent.finishApplication(application, user); @@ -1049,4 +1051,23 @@ public class ParentQueue extends AbstractCSQueue { parent.apply(cluster, request); } } + + @Override + public void stopQueue() { + try { + this.writeLock.lock(); + if (getNumApplications() > 0) { + updateQueueState(QueueState.DRAINING); + } else { + updateQueueState(QueueState.STOPPED); + } + if (getChildQueues() != null) { + for(CSQueue child : getChildQueues()) { + child.stopQueue(); + } + } + } finally { + this.writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index b419c4a3329..37ccdae83b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -73,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; @@ -953,6 +955,7 @@ public class TestLeafQueue { } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testComputeUserLimitAndSetHeadroom() throws IOException { LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B)); @@ -974,6 +977,14 @@ public class TestLeafQueue { Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); when(csContext.getNumClusterNodes()).thenReturn(numNodes); + CapacitySchedulerQueueManager mockCapacitySchedulerQueueManager + = mock(CapacitySchedulerQueueManager.class); + QueueStateManager mockQueueStateManager = mock(QueueStateManager.class); + when(mockCapacitySchedulerQueueManager.getQueueStateManager()).thenReturn( + mockQueueStateManager); + when(csContext.getCapacitySchedulerQueueManager()).thenReturn( + mockCapacitySchedulerQueueManager); + //our test plan contains three cases //1. single user dominate the queue, we test the headroom //2. two users, but user_0 is assigned 100% of the queue resource, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java index bd878b7b01f..9f2933efac5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java @@ -18,10 +18,23 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -32,11 +45,14 @@ public class TestQueueState { private static final String Q1 = "q1"; private static final String Q2 = "q2"; + private static final String Q3 = "q3"; private final static String Q1_PATH = CapacitySchedulerConfiguration.ROOT + "." + Q1; private final static String Q2_PATH = Q1_PATH + "." + Q2; + private final static String Q3_PATH = + Q1_PATH + "." + Q3; private CapacityScheduler cs; private YarnConfiguration conf; @@ -93,4 +109,92 @@ public class TestQueueState { + "child queue:q2 state cannot be RUNNING.")); } } + + @Test(timeout = 15000) + public void testQueueStateTransit() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1}); + csConf.setQueues(Q1_PATH, new String[] {Q2, Q3}); + + csConf.setCapacity(Q1_PATH, 100); + csConf.setCapacity(Q2_PATH, 50); + csConf.setCapacity(Q3_PATH, 50); + + conf = new YarnConfiguration(csConf); + cs = new CapacityScheduler(); + + RMContext rmContext = TestUtils.getMockRMContext(); + cs.setConf(conf); + cs.setRMContext(rmContext); + cs.init(conf); + + //by default, the state of ALL queues should be RUNNING + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); + + // submit an application to Q2 + ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + String userName = "testUser"; + cs.getQueue(Q2).submitApplication(appId, userName, Q2); + FiCaSchedulerApp app = getMockApplication(appId, userName, + Resources.createResource(4, 0)); + cs.getQueue(Q2).submitApplicationAttempt(app, userName); + + // set Q2 state to stop and do reinitialize. + csConf.setState(Q2_PATH, QueueState.STOPPED); + conf = new YarnConfiguration(csConf); + cs.reinitialize(conf, rmContext); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); + + // set Q1 state to stop and do reinitialize. + csConf.setState(Q1_PATH, QueueState.STOPPED); + conf = new YarnConfiguration(csConf); + cs.reinitialize(conf, rmContext); + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + // Active Q3, should fail + csConf.setState(Q3_PATH, QueueState.RUNNING); + conf = new YarnConfiguration(csConf); + try { + cs.reinitialize(conf, rmContext); + Assert.fail("Should throw an Exception."); + } catch (Exception ex) { + // Do Nothing + } + + // stop the app running in q2 + cs.getQueue(Q2).finishApplicationAttempt(app, Q2); + cs.getQueue(Q2).finishApplication(appId, userName); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + } + + private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user, + Resource amResource) { + FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + doReturn(applicationAttemptId.getApplicationId()). + when(application).getApplicationId(); + doReturn(applicationAttemptId).when(application).getApplicationAttemptId(); + doReturn(user).when(application).getUser(); + doReturn(amResource).when(application).getAMResource(); + doReturn(Priority.newInstance(0)).when(application).getPriority(); + doReturn(CommonNodeLabelsManager.NO_LABEL).when(application) + .getAppAMNodePartitionName(); + doReturn(amResource).when(application).getAMResource( + CommonNodeLabelsManager.NO_LABEL); + when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))) + .thenCallRealMethod(); + return application; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java new file mode 100644 index 00000000000..7763dac79a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test QueueStateManager. + * + */ +public class TestQueueStateManager { + private static final String Q1 = "q1"; + private static final String Q2 = "q2"; + private static final String Q3 = "q3"; + + private final static String Q1_PATH = + CapacitySchedulerConfiguration.ROOT + "." + Q1; + private final static String Q2_PATH = + Q1_PATH + "." + Q2; + private final static String Q3_PATH = + Q1_PATH + "." + Q3; + private CapacityScheduler cs; + private YarnConfiguration conf; + + @Test + public void testQueueStateManager() throws AccessControlException, + YarnException { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1}); + csConf.setQueues(Q1_PATH, new String[] {Q2, Q3}); + + csConf.setCapacity(Q1_PATH, 100); + csConf.setCapacity(Q2_PATH, 50); + csConf.setCapacity(Q3_PATH, 50); + + conf = new YarnConfiguration(csConf); + cs = new CapacityScheduler(); + + RMContext rmContext = TestUtils.getMockRMContext(); + cs.setConf(conf); + cs.setRMContext(rmContext); + cs.init(conf); + + @SuppressWarnings("rawtypes") + QueueStateManager stateManager = cs.getCapacitySchedulerQueueManager() + .getQueueStateManager(); + + //by default, the state of both queues should be RUNNING + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); + + // Stop Q2, and verify that Q2 transmits to STOPPED STATE + stateManager.stopQueue(Q2); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + + // Stop Q1, and verify that Q1, as well as its child: Q3, + // transmits to STOPPED STATE + stateManager.stopQueue(Q1); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + Assert.assertTrue(stateManager.canDelete(Q1)); + Assert.assertTrue(stateManager.canDelete(Q2)); + Assert.assertTrue(stateManager.canDelete(Q3)); + + // Active Q2, it will fail. + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + + // Now active Q1 + stateManager.activateQueue(Q1); + // Q1 should be in RUNNING state. Its children: Q2 and Q3 + // should still be in STOPPED state. + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + // Now active Q2 and Q3 + stateManager.activateQueue(Q2); + stateManager.activateQueue(Q3); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); + + Assert.assertFalse(stateManager.canDelete(Q1)); + Assert.assertFalse(stateManager.canDelete(Q2)); + Assert.assertFalse(stateManager.canDelete(Q3)); + + ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + String userName = "testUser"; + cs.getQueue(Q2).submitApplication(appId, userName, Q2); + FiCaSchedulerApp app = getMockApplication(appId, userName, + Resources.createResource(4, 0)); + cs.getQueue(Q2).submitApplicationAttempt(app, userName); + stateManager.stopQueue(Q1); + + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + cs.getQueue(Q2).finishApplicationAttempt(app, Q2); + cs.getQueue(Q2).finishApplication(appId, userName); + + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + } + + private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user, + Resource amResource) { + FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + doReturn(applicationAttemptId.getApplicationId()). + when(application).getApplicationId(); + doReturn(applicationAttemptId).when(application).getApplicationAttemptId(); + doReturn(user).when(application).getUser(); + doReturn(amResource).when(application).getAMResource(); + doReturn(Priority.newInstance(0)).when(application).getPriority(); + doReturn(CommonNodeLabelsManager.NO_LABEL).when(application) + .getAppAMNodePartitionName(); + doReturn(amResource).when(application).getAMResource( + CommonNodeLabelsManager.NO_LABEL); + when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))) + .thenCallRealMethod(); + return application; + } +}