YARN-5756. Add state-machine implementation for scheduler queues. (Xuan Gong via wangda)

This commit is contained in:
Wangda Tan 2016-12-27 21:18:24 -08:00
parent 0ddb8defad
commit 0840b4329b
15 changed files with 621 additions and 29 deletions

View File

@ -29,6 +29,10 @@
* <ul> * <ul>
* <li>{@link #RUNNING} - normal state.</li> * <li>{@link #RUNNING} - normal state.</li>
* <li>{@link #STOPPED} - not accepting new application submissions.</li> * <li>{@link #STOPPED} - not accepting new application submissions.</li>
* <li>
* {@link #DRAINING} - not accepting new application submissions
* and waiting for applications finish.
* </li>
* </ul> * </ul>
* *
* @see QueueInfo * @see QueueInfo
@ -42,6 +46,11 @@ public enum QueueState {
*/ */
STOPPED, STOPPED,
/**
* Draining - Not accepting submissions of new applications,
* and waiting for applications finish.
*/
DRAINING,
/** /**
* Running - normal operation. * Running - normal operation.
*/ */

View File

@ -427,6 +427,7 @@ message YarnClusterMetricsProto {
enum QueueStateProto { enum QueueStateProto {
Q_STOPPED = 1; Q_STOPPED = 1;
Q_RUNNING = 2; Q_RUNNING = 2;
Q_DRAINING = 3;
} }
message QueueStatisticsProto { message QueueStatisticsProto {

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
/**
*
* QueueStateManager which can be used by Scheduler to manage the queue state.
*
*/
// TODO: The class will be used by YARN-5734-OrgQueue for
// easy CapacityScheduler queue configuration management.
@SuppressWarnings("rawtypes")
@Private
@Unstable
public class QueueStateManager<T extends SchedulerQueue,
E extends ReservationSchedulerConfiguration> {
private static final Log LOG = LogFactory.getLog(QueueStateManager.class);
private SchedulerQueueManager<T, E> queueManager;
public synchronized void initialize(SchedulerQueueManager<T, E>
newQueueManager) {
this.queueManager = newQueueManager;
}
/**
* Stop the queue.
* @param queueName the queue name
* @throws YarnException if the queue does not exist
*/
@SuppressWarnings("unchecked")
public synchronized void stopQueue(String queueName) throws YarnException {
SchedulerQueue<T> queue = queueManager.getQueue(queueName);
if (queue == null) {
throw new YarnException("The specified queue:" + queueName
+ " does not exist!");
}
queue.stopQueue();
}
/**
* Active the queue.
* @param queueName the queue name
* @throws YarnException if the queue does not exist
* or the queue can not be activated.
*/
@SuppressWarnings("unchecked")
public synchronized void activateQueue(String queueName)
throws YarnException {
SchedulerQueue<T> queue = queueManager.getQueue(queueName);
if (queue == null) {
throw new YarnException("The specified queue:" + queueName
+ " does not exist!");
}
queue.activeQueue();
}
/**
* Whether this queue can be deleted.
* @param queueName the queue name
* @return true if the queue can be deleted
*/
@SuppressWarnings("unchecked")
public boolean canDelete(String queueName) {
SchedulerQueue<T> queue = queueManager.getQueue(queueName);
if (queue == null) {
LOG.info("The specified queue:" + queueName + " does not exist!");
return false;
}
if (queue.getState() == QueueState.STOPPED){
return true;
}
LOG.info("Need to stop the specific queue:" + queueName + " first.");
return false;
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
*
* Represents a queue in Scheduler.
*
*/
@SuppressWarnings("rawtypes")
@LimitedPrivate("yarn")
public interface SchedulerQueue<T extends SchedulerQueue> extends Queue {
/**
* Get list of child queues.
* @return a list of child queues
*/
List<T> getChildQueues();
/**
* Get the parent queue.
* @return the parent queue
*/
T getParent();
/**
* Get current queue state.
* @return the queue state
*/
QueueState getState();
/**
* Update the queue state.
* @param state the queue state
*/
void updateQueueState(QueueState state);
/**
* Stop the queue.
*/
void stopQueue();
/**
* Active the queue.
* @throws YarnException if the queue can not be activated.
*/
void activeQueue() throws YarnException;
}

View File

@ -29,9 +29,10 @@
* Context of the Queues in Scheduler. * Context of the Queues in Scheduler.
* *
*/ */
@SuppressWarnings("rawtypes")
@Private @Private
@Unstable @Unstable
public interface SchedulerQueueManager<T extends Queue, public interface SchedulerQueueManager<T extends SchedulerQueue,
E extends ReservationSchedulerConfiguration> { E extends ReservationSchedulerConfiguration> {
/** /**

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.QueueStatistics; import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessRequest; import org.apache.hadoop.yarn.security.AccessRequest;
@ -77,7 +78,7 @@ public abstract class AbstractCSQueue implements CSQueue {
final Resource minimumAllocation; final Resource minimumAllocation;
volatile Resource maximumAllocation; volatile Resource maximumAllocation;
volatile QueueState state; private volatile QueueState state = null;
final CSQueueMetrics metrics; final CSQueueMetrics metrics;
protected final PrivilegedEntity queueEntity; protected final PrivilegedEntity queueEntity;
@ -292,9 +293,15 @@ void setupQueueConfigs(Resource clusterResource)
csContext.getConfiguration().getMaximumAllocationPerQueue( csContext.getConfiguration().getMaximumAllocationPerQueue(
getQueuePath()); getQueuePath());
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); // initialized the queue state based on previous state, configured state
// and its parent state.
QueueState previous = getState();
QueueState configuredState = csContext.getConfiguration()
.getConfiguredState(getQueuePath());
QueueState parentState = (parent == null) ? null : parent.getState();
initializeQueueState(previous, configuredState, parentState);
initializeQueueState(); authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
this.acls = csContext.getConfiguration().getAcls(getQueuePath()); this.acls = csContext.getConfiguration().getAcls(getQueuePath());
@ -334,26 +341,53 @@ void setupQueueConfigs(Resource clusterResource)
} }
} }
private void initializeQueueState() { private void initializeQueueState(QueueState previousState,
// inherit from parent if state not set, only do this when we are not root QueueState configuredState, QueueState parentState) {
if (parent != null) { // verify that we can not any value for State other than RUNNING/STOPPED
QueueState configuredState = csContext.getConfiguration() if (configuredState != null && configuredState != QueueState.RUNNING
.getConfiguredState(getQueuePath()); && configuredState != QueueState.STOPPED) {
QueueState parentState = parent.getState(); throw new IllegalArgumentException("Invalid queue state configuration."
if (configuredState == null) { + " We can only use RUNNING or STOPPED.");
this.state = parentState; }
} else if (configuredState == QueueState.RUNNING // If we did not set state in configuration, use Running as default state
&& parentState == QueueState.STOPPED) { QueueState defaultState = QueueState.RUNNING;
throw new IllegalArgumentException(
"The parent queue:" + parent.getQueueName() + " state is STOPPED, " if (previousState == null) {
+ "child queue:" + queueName + " state cannot be RUNNING."); // If current state of the queue is null, we would inherit the state
// from its parent. If this queue does not has parent, such as root queue,
// we would use the configured state.
if (parentState == null) {
updateQueueState((configuredState == null) ? defaultState
: configuredState);
} else { } else {
this.state = configuredState; if (configuredState == null) {
updateQueueState((parentState == QueueState.DRAINING) ?
QueueState.STOPPED : parentState);
} else if (configuredState == QueueState.RUNNING
&& parentState != QueueState.RUNNING) {
throw new IllegalArgumentException(
"The parent queue:" + parent.getQueueName()
+ " state is STOPPED, child queue:" + queueName
+ " state cannot be RUNNING.");
} else {
updateQueueState(configuredState);
}
} }
} else { } else {
// if this is the root queue, get the state from the configuration. // when we get a refreshQueue request from AdminService,
// if the state is not set, use RUNNING as default state. if (previousState == QueueState.RUNNING) {
this.state = csContext.getConfiguration().getState(getQueuePath()); if (configuredState == QueueState.STOPPED) {
stopQueue();
}
} else {
if (configuredState == QueueState.RUNNING) {
try {
activeQueue();
} catch (YarnException ex) {
throw new IllegalArgumentException(ex.getMessage());
}
}
}
} }
} }
@ -367,7 +401,7 @@ protected QueueInfo getQueueInfo() {
queueInfo.setAccessibleNodeLabels(accessibleLabels); queueInfo.setAccessibleNodeLabels(accessibleLabels);
queueInfo.setCapacity(queueCapacities.getCapacity()); queueInfo.setCapacity(queueCapacities.getCapacity());
queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity()); queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
queueInfo.setQueueState(state); queueInfo.setQueueState(getState());
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression); queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
queueInfo.setCurrentCapacity(getUsedCapacity()); queueInfo.setCurrentCapacity(getUsedCapacity());
queueInfo.setQueueStatistics(getQueueStatistics()); queueInfo.setQueueStatistics(getQueueStatistics());
@ -846,4 +880,47 @@ public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException { String userName, String queue) throws AccessControlException {
// Dummy implementation // Dummy implementation
} }
@Override
public void updateQueueState(QueueState queueState) {
this.state = queueState;
}
@Override
public void activeQueue() throws YarnException {
try {
this.writeLock.lock();
if (getState() == QueueState.RUNNING) {
LOG.info("The specified queue:" + queueName
+ " is already in the RUNNING state.");
} else if (getState() == QueueState.DRAINING) {
throw new YarnException(
"The queue:" + queueName + " is in the Stopping process. "
+ "Please wait for the queue getting fully STOPPED.");
} else {
CSQueue parent = getParent();
if (parent == null || parent.getState() == QueueState.RUNNING) {
updateQueueState(QueueState.RUNNING);
} else {
throw new YarnException("The parent Queue:" + parent.getQueueName()
+ " is not running. Please activate the parent queue first");
}
}
} finally {
this.writeLock.unlock();
}
}
protected void appFinished() {
try {
this.writeLock.lock();
if (getState() == QueueState.DRAINING) {
if (getNumApplications() == 0) {
updateQueueState(QueueState.STOPPED);
}
}
} finally {
this.writeLock.unlock();
}
}
} }

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -56,8 +57,7 @@
*/ */
@Stable @Stable
@Private @Private
public interface CSQueue public interface CSQueue extends SchedulerQueue<CSQueue> {
extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
/** /**
* Get the parent <code>Queue</code>. * Get the parent <code>Queue</code>.
* @return the parent queue * @return the parent queue

View File

@ -2474,4 +2474,9 @@ public int getAsyncSchedulingPendingBacklogs() {
} }
return 0; return 0;
} }
@Override
public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() {
return this.queueManager;
}
} }

View File

@ -83,4 +83,6 @@ public interface CapacitySchedulerContext {
ResourceUsage getClusterResourceUsage(); ResourceUsage getClusterResourceUsage();
ActivitiesManager getActivitiesManager(); ActivitiesManager getActivitiesManager();
CapacitySchedulerQueueManager getCapacitySchedulerQueueManager();
} }

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
@ -86,6 +87,9 @@ public CSQueue hook(CSQueue queue) {
private CSQueue root; private CSQueue root;
private final RMNodeLabelsManager labelManager; private final RMNodeLabelsManager labelManager;
private QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
queueStateManager;
/** /**
* Construct the service. * Construct the service.
* @param conf the configuration * @param conf the configuration
@ -95,6 +99,7 @@ public CapacitySchedulerQueueManager(Configuration conf,
RMNodeLabelsManager labelManager) { RMNodeLabelsManager labelManager) {
this.authorizer = YarnAuthorizationProvider.getInstance(conf); this.authorizer = YarnAuthorizationProvider.getInstance(conf);
this.labelManager = labelManager; this.labelManager = labelManager;
this.queueStateManager = new QueueStateManager<>();
} }
@Override @Override
@ -142,6 +147,7 @@ public void initializeQueues(CapacitySchedulerConfiguration conf)
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
setQueueAcls(authorizer, queues); setQueueAcls(authorizer, queues);
labelManager.reinitializeQueueLabels(getQueueToLabels()); labelManager.reinitializeQueueLabels(getQueueToLabels());
this.queueStateManager.initialize(this);
LOG.info("Initialized root queue " + root); LOG.info("Initialized root queue " + root);
} }
@ -170,6 +176,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
clusterResource)); clusterResource));
labelManager.reinitializeQueueLabels(getQueueToLabels()); labelManager.reinitializeQueueLabels(getQueueToLabels());
this.queueStateManager.initialize(this);
} }
/** /**
@ -358,4 +365,10 @@ private Map<String, Set<String>> getQueueToLabels() {
} }
return queueToLabels; return queueToLabels;
} }
@Private
public QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
getQueueStateManager() {
return this.queueStateManager;
}
} }

View File

@ -274,7 +274,7 @@ protected void setupQueueConfigs(Resource clusterResource)
+ "maximumAllocationMemory ]" + "\n" + "maximumAllocation = " + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
+ maximumAllocation + " [= configuredMaxAllocation ]" + "\n" + maximumAllocation + " [= configuredMaxAllocation ]" + "\n"
+ "numContainers = " + numContainers + "numContainers = " + numContainers
+ " [= currentNumContainers ]" + "\n" + "state = " + state + " [= currentNumContainers ]" + "\n" + "state = " + getState()
+ " [= configuredState ]" + "\n" + "acls = " + aclsString + " [= configuredState ]" + "\n" + "acls = " + aclsString
+ " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = " + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = "
+ nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder
@ -880,6 +880,9 @@ private void addApplicationAttempt(FiCaSchedulerApp application,
public void finishApplication(ApplicationId application, String user) { public void finishApplication(ApplicationId application, String user) {
// Inform the activeUsersManager // Inform the activeUsersManager
activeUsersManager.deactivateApplication(user, application); activeUsersManager.deactivateApplication(user, application);
appFinished();
// Inform the parent queue // Inform the parent queue
getParent().finishApplication(application, user); getParent().finishApplication(application, user);
} }
@ -2428,4 +2431,18 @@ public Resource getClusterResource() {
return clusterResource; return clusterResource;
} }
} }
@Override
public void stopQueue() {
try {
writeLock.lock();
if (getNumApplications() > 0) {
updateQueueState(QueueState.DRAINING);
} else {
updateQueueState(QueueState.STOPPED);
}
} finally {
writeLock.unlock();
}
}
} }

View File

@ -133,7 +133,7 @@ void setupQueueConfigs(Resource clusterResource)
+ ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
+ ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
+ ", absoluteMaxCapacity=" + this.queueCapacities + ", absoluteMaxCapacity=" + this.queueCapacities
.getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" .getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls="
+ aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
+ ", reservationsContinueLooking=" + reservationsContinueLooking); + ", reservationsContinueLooking=" + reservationsContinueLooking);
} finally { } finally {
@ -369,7 +369,7 @@ public void validateSubmitApplication(ApplicationId applicationId,
"Cannot submit application " + "to non-leaf queue: " + queueName); "Cannot submit application " + "to non-leaf queue: " + queueName);
} }
if (state != QueueState.RUNNING) { if (getState() != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath() throw new AccessControlException("Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: " + " is STOPPED. Cannot accept submission of application: "
+ applicationId); + applicationId);
@ -412,6 +412,8 @@ public void finishApplication(ApplicationId application, String user) {
removeApplication(application, user); removeApplication(application, user);
appFinished();
// Inform the parent queue // Inform the parent queue
if (parent != null) { if (parent != null) {
parent.finishApplication(application, user); parent.finishApplication(application, user);
@ -1049,4 +1051,23 @@ public void apply(Resource cluster,
parent.apply(cluster, request); parent.apply(cluster, request);
} }
} }
@Override
public void stopQueue() {
try {
this.writeLock.lock();
if (getNumApplications() > 0) {
updateQueueState(QueueState.DRAINING);
} else {
updateQueueState(QueueState.STOPPED);
}
if (getChildQueues() != null) {
for(CSQueue child : getChildQueues()) {
child.stopQueue();
}
}
} finally {
this.writeLock.unlock();
}
}
} }

View File

@ -23,6 +23,7 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -73,6 +74,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -953,6 +955,7 @@ public void testUserLimits() throws Exception {
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test @Test
public void testComputeUserLimitAndSetHeadroom() throws IOException { public void testComputeUserLimitAndSetHeadroom() throws IOException {
LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B)); LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B));
@ -974,6 +977,14 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes); when(csContext.getNumClusterNodes()).thenReturn(numNodes);
CapacitySchedulerQueueManager mockCapacitySchedulerQueueManager
= mock(CapacitySchedulerQueueManager.class);
QueueStateManager mockQueueStateManager = mock(QueueStateManager.class);
when(mockCapacitySchedulerQueueManager.getQueueStateManager()).thenReturn(
mockQueueStateManager);
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(
mockCapacitySchedulerQueueManager);
//our test plan contains three cases //our test plan contains three cases
//1. single user dominate the queue, we test the headroom //1. single user dominate the queue, we test the headroom
//2. two users, but user_0 is assigned 100% of the queue resource, //2. two users, but user_0 is assigned 100% of the queue resource,

View File

@ -18,10 +18,23 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -32,11 +45,14 @@ public class TestQueueState {
private static final String Q1 = "q1"; private static final String Q1 = "q1";
private static final String Q2 = "q2"; private static final String Q2 = "q2";
private static final String Q3 = "q3";
private final static String Q1_PATH = private final static String Q1_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q1; CapacitySchedulerConfiguration.ROOT + "." + Q1;
private final static String Q2_PATH = private final static String Q2_PATH =
Q1_PATH + "." + Q2; Q1_PATH + "." + Q2;
private final static String Q3_PATH =
Q1_PATH + "." + Q3;
private CapacityScheduler cs; private CapacityScheduler cs;
private YarnConfiguration conf; private YarnConfiguration conf;
@ -93,4 +109,92 @@ public void testQueueState() throws IOException {
+ "child queue:q2 state cannot be RUNNING.")); + "child queue:q2 state cannot be RUNNING."));
} }
} }
@Test(timeout = 15000)
public void testQueueStateTransit() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1});
csConf.setQueues(Q1_PATH, new String[] {Q2, Q3});
csConf.setCapacity(Q1_PATH, 100);
csConf.setCapacity(Q2_PATH, 50);
csConf.setCapacity(Q3_PATH, 50);
conf = new YarnConfiguration(csConf);
cs = new CapacityScheduler();
RMContext rmContext = TestUtils.getMockRMContext();
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
//by default, the state of ALL queues should be RUNNING
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
// submit an application to Q2
ApplicationId appId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
String userName = "testUser";
cs.getQueue(Q2).submitApplication(appId, userName, Q2);
FiCaSchedulerApp app = getMockApplication(appId, userName,
Resources.createResource(4, 0));
cs.getQueue(Q2).submitApplicationAttempt(app, userName);
// set Q2 state to stop and do reinitialize.
csConf.setState(Q2_PATH, QueueState.STOPPED);
conf = new YarnConfiguration(csConf);
cs.reinitialize(conf, rmContext);
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
// set Q1 state to stop and do reinitialize.
csConf.setState(Q1_PATH, QueueState.STOPPED);
conf = new YarnConfiguration(csConf);
cs.reinitialize(conf, rmContext);
Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
// Active Q3, should fail
csConf.setState(Q3_PATH, QueueState.RUNNING);
conf = new YarnConfiguration(csConf);
try {
cs.reinitialize(conf, rmContext);
Assert.fail("Should throw an Exception.");
} catch (Exception ex) {
// Do Nothing
}
// stop the app running in q2
cs.getQueue(Q2).finishApplicationAttempt(app, Q2);
cs.getQueue(Q2).finishApplication(appId, userName);
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
}
private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user,
Resource amResource) {
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
doReturn(applicationAttemptId.getApplicationId()).
when(application).getApplicationId();
doReturn(applicationAttemptId).when(application).getApplicationAttemptId();
doReturn(user).when(application).getUser();
doReturn(amResource).when(application).getAMResource();
doReturn(Priority.newInstance(0)).when(application).getPriority();
doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
.getAppAMNodePartitionName();
doReturn(amResource).when(application).getAMResource(
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
.thenCallRealMethod();
return application;
}
} }

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
/**
* Test QueueStateManager.
*
*/
public class TestQueueStateManager {
private static final String Q1 = "q1";
private static final String Q2 = "q2";
private static final String Q3 = "q3";
private final static String Q1_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q1;
private final static String Q2_PATH =
Q1_PATH + "." + Q2;
private final static String Q3_PATH =
Q1_PATH + "." + Q3;
private CapacityScheduler cs;
private YarnConfiguration conf;
@Test
public void testQueueStateManager() throws AccessControlException,
YarnException {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1});
csConf.setQueues(Q1_PATH, new String[] {Q2, Q3});
csConf.setCapacity(Q1_PATH, 100);
csConf.setCapacity(Q2_PATH, 50);
csConf.setCapacity(Q3_PATH, 50);
conf = new YarnConfiguration(csConf);
cs = new CapacityScheduler();
RMContext rmContext = TestUtils.getMockRMContext();
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
@SuppressWarnings("rawtypes")
QueueStateManager stateManager = cs.getCapacitySchedulerQueueManager()
.getQueueStateManager();
//by default, the state of both queues should be RUNNING
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
// Stop Q2, and verify that Q2 transmits to STOPPED STATE
stateManager.stopQueue(Q2);
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
// Stop Q1, and verify that Q1, as well as its child: Q3,
// transmits to STOPPED STATE
stateManager.stopQueue(Q1);
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
Assert.assertTrue(stateManager.canDelete(Q1));
Assert.assertTrue(stateManager.canDelete(Q2));
Assert.assertTrue(stateManager.canDelete(Q3));
// Active Q2, it will fail.
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
// Now active Q1
stateManager.activateQueue(Q1);
// Q1 should be in RUNNING state. Its children: Q2 and Q3
// should still be in STOPPED state.
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
// Now active Q2 and Q3
stateManager.activateQueue(Q2);
stateManager.activateQueue(Q3);
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
Assert.assertFalse(stateManager.canDelete(Q1));
Assert.assertFalse(stateManager.canDelete(Q2));
Assert.assertFalse(stateManager.canDelete(Q3));
ApplicationId appId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
String userName = "testUser";
cs.getQueue(Q2).submitApplication(appId, userName, Q2);
FiCaSchedulerApp app = getMockApplication(appId, userName,
Resources.createResource(4, 0));
cs.getQueue(Q2).submitApplicationAttempt(app, userName);
stateManager.stopQueue(Q1);
Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
cs.getQueue(Q2).finishApplicationAttempt(app, Q2);
cs.getQueue(Q2).finishApplication(appId, userName);
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
}
private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user,
Resource amResource) {
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
doReturn(applicationAttemptId.getApplicationId()).
when(application).getApplicationId();
doReturn(applicationAttemptId).when(application).getApplicationAttemptId();
doReturn(user).when(application).getUser();
doReturn(amResource).when(application).getAMResource();
doReturn(Priority.newInstance(0)).when(application).getPriority();
doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
.getAppAMNodePartitionName();
doReturn(amResource).when(application).getAMResource(
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
.thenCallRealMethod();
return application;
}
}