YARN-5756. Add state-machine implementation for scheduler queues. (Xuan Gong via wangda)

(cherry picked from commit 0840b4329b)
This commit is contained in:
Wangda Tan 2016-12-27 21:18:24 -08:00
parent 5d46d5d4c6
commit 42f571728c
15 changed files with 621 additions and 29 deletions

View File

@ -29,6 +29,10 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
* <ul>
* <li>{@link #RUNNING} - normal state.</li>
* <li>{@link #STOPPED} - not accepting new application submissions.</li>
* <li>
* {@link #DRAINING} - not accepting new application submissions
* and waiting for applications finish.
* </li>
* </ul>
*
* @see QueueInfo
@ -42,6 +46,11 @@ public enum QueueState {
*/
STOPPED,
/**
* Draining - Not accepting submissions of new applications,
* and waiting for applications finish.
*/
DRAINING,
/**
* Running - normal operation.
*/

View File

@ -432,6 +432,7 @@ message YarnClusterMetricsProto {
enum QueueStateProto {
Q_STOPPED = 1;
Q_RUNNING = 2;
Q_DRAINING = 3;
}
message QueueStatisticsProto {

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
/**
*
* QueueStateManager which can be used by Scheduler to manage the queue state.
*
*/
// TODO: The class will be used by YARN-5734-OrgQueue for
// easy CapacityScheduler queue configuration management.
@SuppressWarnings("rawtypes")
@Private
@Unstable
public class QueueStateManager<T extends SchedulerQueue,
E extends ReservationSchedulerConfiguration> {
private static final Log LOG = LogFactory.getLog(QueueStateManager.class);
private SchedulerQueueManager<T, E> queueManager;
public synchronized void initialize(SchedulerQueueManager<T, E>
newQueueManager) {
this.queueManager = newQueueManager;
}
/**
* Stop the queue.
* @param queueName the queue name
* @throws YarnException if the queue does not exist
*/
@SuppressWarnings("unchecked")
public synchronized void stopQueue(String queueName) throws YarnException {
SchedulerQueue<T> queue = queueManager.getQueue(queueName);
if (queue == null) {
throw new YarnException("The specified queue:" + queueName
+ " does not exist!");
}
queue.stopQueue();
}
/**
* Active the queue.
* @param queueName the queue name
* @throws YarnException if the queue does not exist
* or the queue can not be activated.
*/
@SuppressWarnings("unchecked")
public synchronized void activateQueue(String queueName)
throws YarnException {
SchedulerQueue<T> queue = queueManager.getQueue(queueName);
if (queue == null) {
throw new YarnException("The specified queue:" + queueName
+ " does not exist!");
}
queue.activeQueue();
}
/**
* Whether this queue can be deleted.
* @param queueName the queue name
* @return true if the queue can be deleted
*/
@SuppressWarnings("unchecked")
public boolean canDelete(String queueName) {
SchedulerQueue<T> queue = queueManager.getQueue(queueName);
if (queue == null) {
LOG.info("The specified queue:" + queueName + " does not exist!");
return false;
}
if (queue.getState() == QueueState.STOPPED){
return true;
}
LOG.info("Need to stop the specific queue:" + queueName + " first.");
return false;
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
*
* Represents a queue in Scheduler.
*
*/
@SuppressWarnings("rawtypes")
@LimitedPrivate("yarn")
public interface SchedulerQueue<T extends SchedulerQueue> extends Queue {
/**
* Get list of child queues.
* @return a list of child queues
*/
List<T> getChildQueues();
/**
* Get the parent queue.
* @return the parent queue
*/
T getParent();
/**
* Get current queue state.
* @return the queue state
*/
QueueState getState();
/**
* Update the queue state.
* @param state the queue state
*/
void updateQueueState(QueueState state);
/**
* Stop the queue.
*/
void stopQueue();
/**
* Active the queue.
* @throws YarnException if the queue can not be activated.
*/
void activeQueue() throws YarnException;
}

View File

@ -29,9 +29,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSche
* Context of the Queues in Scheduler.
*
*/
@SuppressWarnings("rawtypes")
@Private
@Unstable
public interface SchedulerQueueManager<T extends Queue,
public interface SchedulerQueueManager<T extends SchedulerQueue,
E extends ReservationSchedulerConfiguration> {
/**

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessRequest;
@ -77,7 +78,7 @@ public abstract class AbstractCSQueue implements CSQueue {
final Resource minimumAllocation;
volatile Resource maximumAllocation;
volatile QueueState state;
private volatile QueueState state = null;
final CSQueueMetrics metrics;
protected final PrivilegedEntity queueEntity;
@ -292,9 +293,15 @@ public abstract class AbstractCSQueue implements CSQueue {
csContext.getConfiguration().getMaximumAllocationPerQueue(
getQueuePath());
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
// initialized the queue state based on previous state, configured state
// and its parent state.
QueueState previous = getState();
QueueState configuredState = csContext.getConfiguration()
.getConfiguredState(getQueuePath());
QueueState parentState = (parent == null) ? null : parent.getState();
initializeQueueState(previous, configuredState, parentState);
initializeQueueState();
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
this.acls = csContext.getConfiguration().getAcls(getQueuePath());
@ -334,26 +341,53 @@ public abstract class AbstractCSQueue implements CSQueue {
}
}
private void initializeQueueState() {
// inherit from parent if state not set, only do this when we are not root
if (parent != null) {
QueueState configuredState = csContext.getConfiguration()
.getConfiguredState(getQueuePath());
QueueState parentState = parent.getState();
if (configuredState == null) {
this.state = parentState;
} else if (configuredState == QueueState.RUNNING
&& parentState == QueueState.STOPPED) {
throw new IllegalArgumentException(
"The parent queue:" + parent.getQueueName() + " state is STOPPED, "
+ "child queue:" + queueName + " state cannot be RUNNING.");
private void initializeQueueState(QueueState previousState,
QueueState configuredState, QueueState parentState) {
// verify that we can not any value for State other than RUNNING/STOPPED
if (configuredState != null && configuredState != QueueState.RUNNING
&& configuredState != QueueState.STOPPED) {
throw new IllegalArgumentException("Invalid queue state configuration."
+ " We can only use RUNNING or STOPPED.");
}
// If we did not set state in configuration, use Running as default state
QueueState defaultState = QueueState.RUNNING;
if (previousState == null) {
// If current state of the queue is null, we would inherit the state
// from its parent. If this queue does not has parent, such as root queue,
// we would use the configured state.
if (parentState == null) {
updateQueueState((configuredState == null) ? defaultState
: configuredState);
} else {
this.state = configuredState;
if (configuredState == null) {
updateQueueState((parentState == QueueState.DRAINING) ?
QueueState.STOPPED : parentState);
} else if (configuredState == QueueState.RUNNING
&& parentState != QueueState.RUNNING) {
throw new IllegalArgumentException(
"The parent queue:" + parent.getQueueName()
+ " state is STOPPED, child queue:" + queueName
+ " state cannot be RUNNING.");
} else {
updateQueueState(configuredState);
}
}
} else {
// if this is the root queue, get the state from the configuration.
// if the state is not set, use RUNNING as default state.
this.state = csContext.getConfiguration().getState(getQueuePath());
// when we get a refreshQueue request from AdminService,
if (previousState == QueueState.RUNNING) {
if (configuredState == QueueState.STOPPED) {
stopQueue();
}
} else {
if (configuredState == QueueState.RUNNING) {
try {
activeQueue();
} catch (YarnException ex) {
throw new IllegalArgumentException(ex.getMessage());
}
}
}
}
}
@ -367,7 +401,7 @@ public abstract class AbstractCSQueue implements CSQueue {
queueInfo.setAccessibleNodeLabels(accessibleLabels);
queueInfo.setCapacity(queueCapacities.getCapacity());
queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
queueInfo.setQueueState(state);
queueInfo.setQueueState(getState());
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
queueInfo.setCurrentCapacity(getUsedCapacity());
queueInfo.setQueueStatistics(getQueueStatistics());
@ -846,4 +880,47 @@ public abstract class AbstractCSQueue implements CSQueue {
String userName, String queue) throws AccessControlException {
// Dummy implementation
}
@Override
public void updateQueueState(QueueState queueState) {
this.state = queueState;
}
@Override
public void activeQueue() throws YarnException {
try {
this.writeLock.lock();
if (getState() == QueueState.RUNNING) {
LOG.info("The specified queue:" + queueName
+ " is already in the RUNNING state.");
} else if (getState() == QueueState.DRAINING) {
throw new YarnException(
"The queue:" + queueName + " is in the Stopping process. "
+ "Please wait for the queue getting fully STOPPED.");
} else {
CSQueue parent = getParent();
if (parent == null || parent.getState() == QueueState.RUNNING) {
updateQueueState(QueueState.RUNNING);
} else {
throw new YarnException("The parent Queue:" + parent.getQueueName()
+ " is not running. Please activate the parent queue first");
}
}
} finally {
this.writeLock.unlock();
}
}
protected void appFinished() {
try {
this.writeLock.lock();
if (getState() == QueueState.DRAINING) {
if (getNumApplications() == 0) {
updateQueueState(QueueState.STOPPED);
}
}
} finally {
this.writeLock.unlock();
}
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -56,8 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleP
*/
@Stable
@Private
public interface CSQueue
extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
public interface CSQueue extends SchedulerQueue<CSQueue> {
/**
* Get the parent <code>Queue</code>.
* @return the parent queue

View File

@ -2475,4 +2475,9 @@ public class CapacityScheduler extends
}
return 0;
}
@Override
public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() {
return this.queueManager;
}
}

View File

@ -83,4 +83,6 @@ public interface CapacitySchedulerContext {
ResourceUsage getClusterResourceUsage();
ActivitiesManager getActivitiesManager();
CapacitySchedulerQueueManager getCapacitySchedulerQueueManager();
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
@ -86,6 +87,9 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
private CSQueue root;
private final RMNodeLabelsManager labelManager;
private QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
queueStateManager;
/**
* Construct the service.
* @param conf the configuration
@ -95,6 +99,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
RMNodeLabelsManager labelManager) {
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
this.labelManager = labelManager;
this.queueStateManager = new QueueStateManager<>();
}
@Override
@ -142,6 +147,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
setQueueAcls(authorizer, queues);
labelManager.reinitializeQueueLabels(getQueueToLabels());
this.queueStateManager.initialize(this);
LOG.info("Initialized root queue " + root);
}
@ -170,6 +176,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
clusterResource));
labelManager.reinitializeQueueLabels(getQueueToLabels());
this.queueStateManager.initialize(this);
}
/**
@ -358,4 +365,10 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
}
return queueToLabels;
}
@Private
public QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
getQueueStateManager() {
return this.queueStateManager;
}
}

View File

@ -282,7 +282,7 @@ public class LeafQueue extends AbstractCSQueue {
+ "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
+ maximumAllocation + " [= configuredMaxAllocation ]" + "\n"
+ "numContainers = " + numContainers
+ " [= currentNumContainers ]" + "\n" + "state = " + state
+ " [= currentNumContainers ]" + "\n" + "state = " + getState()
+ " [= configuredState ]" + "\n" + "acls = " + aclsString
+ " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = "
+ nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder
@ -888,6 +888,9 @@ public class LeafQueue extends AbstractCSQueue {
public void finishApplication(ApplicationId application, String user) {
// Inform the activeUsersManager
activeUsersManager.deactivateApplication(user, application);
appFinished();
// Inform the parent queue
getParent().finishApplication(application, user);
}
@ -2434,4 +2437,18 @@ public class LeafQueue extends AbstractCSQueue {
return clusterResource;
}
}
@Override
public void stopQueue() {
try {
writeLock.lock();
if (getNumApplications() > 0) {
updateQueueState(QueueState.DRAINING);
} else {
updateQueueState(QueueState.STOPPED);
}
} finally {
writeLock.unlock();
}
}
}

View File

@ -133,7 +133,7 @@ public class ParentQueue extends AbstractCSQueue {
+ ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
+ ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
+ ", absoluteMaxCapacity=" + this.queueCapacities
.getAbsoluteMaximumCapacity() + ", state=" + state + ", acls="
.getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls="
+ aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
+ ", reservationsContinueLooking=" + reservationsContinueLooking);
} finally {
@ -369,7 +369,7 @@ public class ParentQueue extends AbstractCSQueue {
"Cannot submit application " + "to non-leaf queue: " + queueName);
}
if (state != QueueState.RUNNING) {
if (getState() != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: "
+ applicationId);
@ -412,6 +412,8 @@ public class ParentQueue extends AbstractCSQueue {
removeApplication(application, user);
appFinished();
// Inform the parent queue
if (parent != null) {
parent.finishApplication(application, user);
@ -1049,4 +1051,23 @@ public class ParentQueue extends AbstractCSQueue {
parent.apply(cluster, request);
}
}
@Override
public void stopQueue() {
try {
this.writeLock.lock();
if (getNumApplications() > 0) {
updateQueueState(QueueState.DRAINING);
} else {
updateQueueState(QueueState.STOPPED);
}
if (getChildQueues() != null) {
for(CSQueue child : getChildQueues()) {
child.stopQueue();
}
}
} finally {
this.writeLock.unlock();
}
}
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@ -73,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
@ -953,6 +955,7 @@ public class TestLeafQueue {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testComputeUserLimitAndSetHeadroom() throws IOException {
LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B));
@ -974,6 +977,14 @@ public class TestLeafQueue {
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
CapacitySchedulerQueueManager mockCapacitySchedulerQueueManager
= mock(CapacitySchedulerQueueManager.class);
QueueStateManager mockQueueStateManager = mock(QueueStateManager.class);
when(mockCapacitySchedulerQueueManager.getQueueStateManager()).thenReturn(
mockQueueStateManager);
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(
mockCapacitySchedulerQueueManager);
//our test plan contains three cases
//1. single user dominate the queue, we test the headroom
//2. two users, but user_0 is assigned 100% of the queue resource,

View File

@ -18,10 +18,23 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
@ -32,11 +45,14 @@ public class TestQueueState {
private static final String Q1 = "q1";
private static final String Q2 = "q2";
private static final String Q3 = "q3";
private final static String Q1_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q1;
private final static String Q2_PATH =
Q1_PATH + "." + Q2;
private final static String Q3_PATH =
Q1_PATH + "." + Q3;
private CapacityScheduler cs;
private YarnConfiguration conf;
@ -93,4 +109,92 @@ public class TestQueueState {
+ "child queue:q2 state cannot be RUNNING."));
}
}
@Test(timeout = 15000)
public void testQueueStateTransit() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1});
csConf.setQueues(Q1_PATH, new String[] {Q2, Q3});
csConf.setCapacity(Q1_PATH, 100);
csConf.setCapacity(Q2_PATH, 50);
csConf.setCapacity(Q3_PATH, 50);
conf = new YarnConfiguration(csConf);
cs = new CapacityScheduler();
RMContext rmContext = TestUtils.getMockRMContext();
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
//by default, the state of ALL queues should be RUNNING
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
// submit an application to Q2
ApplicationId appId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
String userName = "testUser";
cs.getQueue(Q2).submitApplication(appId, userName, Q2);
FiCaSchedulerApp app = getMockApplication(appId, userName,
Resources.createResource(4, 0));
cs.getQueue(Q2).submitApplicationAttempt(app, userName);
// set Q2 state to stop and do reinitialize.
csConf.setState(Q2_PATH, QueueState.STOPPED);
conf = new YarnConfiguration(csConf);
cs.reinitialize(conf, rmContext);
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
// set Q1 state to stop and do reinitialize.
csConf.setState(Q1_PATH, QueueState.STOPPED);
conf = new YarnConfiguration(csConf);
cs.reinitialize(conf, rmContext);
Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
// Active Q3, should fail
csConf.setState(Q3_PATH, QueueState.RUNNING);
conf = new YarnConfiguration(csConf);
try {
cs.reinitialize(conf, rmContext);
Assert.fail("Should throw an Exception.");
} catch (Exception ex) {
// Do Nothing
}
// stop the app running in q2
cs.getQueue(Q2).finishApplicationAttempt(app, Q2);
cs.getQueue(Q2).finishApplication(appId, userName);
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
}
private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user,
Resource amResource) {
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
doReturn(applicationAttemptId.getApplicationId()).
when(application).getApplicationId();
doReturn(applicationAttemptId).when(application).getApplicationAttemptId();
doReturn(user).when(application).getUser();
doReturn(amResource).when(application).getAMResource();
doReturn(Priority.newInstance(0)).when(application).getPriority();
doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
.getAppAMNodePartitionName();
doReturn(amResource).when(application).getAMResource(
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
.thenCallRealMethod();
return application;
}
}

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
/**
* Test QueueStateManager.
*
*/
public class TestQueueStateManager {
private static final String Q1 = "q1";
private static final String Q2 = "q2";
private static final String Q3 = "q3";
private final static String Q1_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q1;
private final static String Q2_PATH =
Q1_PATH + "." + Q2;
private final static String Q3_PATH =
Q1_PATH + "." + Q3;
private CapacityScheduler cs;
private YarnConfiguration conf;
@Test
public void testQueueStateManager() throws AccessControlException,
YarnException {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1});
csConf.setQueues(Q1_PATH, new String[] {Q2, Q3});
csConf.setCapacity(Q1_PATH, 100);
csConf.setCapacity(Q2_PATH, 50);
csConf.setCapacity(Q3_PATH, 50);
conf = new YarnConfiguration(csConf);
cs = new CapacityScheduler();
RMContext rmContext = TestUtils.getMockRMContext();
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
@SuppressWarnings("rawtypes")
QueueStateManager stateManager = cs.getCapacitySchedulerQueueManager()
.getQueueStateManager();
//by default, the state of both queues should be RUNNING
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
// Stop Q2, and verify that Q2 transmits to STOPPED STATE
stateManager.stopQueue(Q2);
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
// Stop Q1, and verify that Q1, as well as its child: Q3,
// transmits to STOPPED STATE
stateManager.stopQueue(Q1);
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
Assert.assertTrue(stateManager.canDelete(Q1));
Assert.assertTrue(stateManager.canDelete(Q2));
Assert.assertTrue(stateManager.canDelete(Q3));
// Active Q2, it will fail.
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
// Now active Q1
stateManager.activateQueue(Q1);
// Q1 should be in RUNNING state. Its children: Q2 and Q3
// should still be in STOPPED state.
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
// Now active Q2 and Q3
stateManager.activateQueue(Q2);
stateManager.activateQueue(Q3);
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
Assert.assertFalse(stateManager.canDelete(Q1));
Assert.assertFalse(stateManager.canDelete(Q2));
Assert.assertFalse(stateManager.canDelete(Q3));
ApplicationId appId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
String userName = "testUser";
cs.getQueue(Q2).submitApplication(appId, userName, Q2);
FiCaSchedulerApp app = getMockApplication(appId, userName,
Resources.createResource(4, 0));
cs.getQueue(Q2).submitApplicationAttempt(app, userName);
stateManager.stopQueue(Q1);
Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
cs.getQueue(Q2).finishApplicationAttempt(app, Q2);
cs.getQueue(Q2).finishApplication(appId, userName);
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
}
private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user,
Resource amResource) {
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
doReturn(applicationAttemptId.getApplicationId()).
when(application).getApplicationId();
doReturn(applicationAttemptId).when(application).getApplicationAttemptId();
doReturn(user).when(application).getUser();
doReturn(amResource).when(application).getAMResource();
doReturn(Priority.newInstance(0)).when(application).getPriority();
doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
.getAppAMNodePartitionName();
doReturn(amResource).when(application).getAMResource(
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
.thenCallRealMethod();
return application;
}
}