From d58fca0102c6246ced3fa1cd893f963e4a0fb1b9 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 2 Dec 2016 16:17:31 -0800 Subject: [PATCH] YARN-5746. The state of the parentQueue and its childQueues should be synchronized. Contributed by Xuan Gong --- .../scheduler/capacity/AbstractCSQueue.java | 26 ++++- .../CapacitySchedulerConfiguration.java | 22 ++++- .../scheduler/capacity/TestQueueState.java | 96 +++++++++++++++++++ 3 files changed, 139 insertions(+), 5 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 3daabafd53f..dd2f0d9ee49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -291,7 +291,8 @@ public abstract class AbstractCSQueue implements CSQueue { authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); - this.state = csContext.getConfiguration().getState(getQueuePath()); + initializeQueueState(); + this.acls = csContext.getConfiguration().getAcls(getQueuePath()); // Update metrics @@ -330,6 +331,29 @@ public abstract class AbstractCSQueue implements CSQueue { } } + private void initializeQueueState() { + // inherit from parent if state not set, only do this when we are not root + if (parent != null) { + QueueState configuredState = csContext.getConfiguration() + .getConfiguredState(getQueuePath()); + QueueState parentState = parent.getState(); + if (configuredState == null) { + this.state = parentState; + } else if (configuredState == QueueState.RUNNING + && parentState == QueueState.STOPPED) { + throw new IllegalArgumentException( + "The parent queue:" + parent.getQueueName() + " state is STOPPED, " + + "child queue:" + queueName + " state cannot be RUNNING."); + } else { + this.state = configuredState; + } + } else { + // if this is the root queue, get the state from the configuration. + // if the state is not set, use RUNNING as default state. + this.state = csContext.getConfiguration().getState(getQueuePath()); + } + } + protected QueueInfo getQueueInfo() { // Deliberately doesn't use lock here, because this method will be invoked // from schedulerApplicationAttempt, to avoid deadlock, sacrifice diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index f8335a8d52d..bfaeba47753 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -448,12 +448,26 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur setFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, userLimitFactor); } - public QueueState getState(String queue) { + public QueueState getConfiguredState(String queue) { String state = get(getQueuePrefix(queue) + STATE); - return (state != null) ? - QueueState.valueOf(StringUtils.toUpperCase(state)) : QueueState.RUNNING; + if (state == null) { + return null; + } else { + return QueueState.valueOf(StringUtils.toUpperCase(state)); + } } - + + public QueueState getState(String queue) { + QueueState state = getConfiguredState(queue); + return (state == null) ? QueueState.RUNNING : state; + } + + @Private + @VisibleForTesting + public void setState(String queue, QueueState state) { + set(getQueuePrefix(queue) + STATE, state.name()); + } + public void setAccessibleNodeLabels(String queue, Set labels) { if (labels == null) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java new file mode 100644 index 00000000000..bd878b7b01f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test Queue States. + */ +public class TestQueueState { + + private static final String Q1 = "q1"; + private static final String Q2 = "q2"; + + private final static String Q1_PATH = + CapacitySchedulerConfiguration.ROOT + "." + Q1; + private final static String Q2_PATH = + Q1_PATH + "." + Q2; + private CapacityScheduler cs; + private YarnConfiguration conf; + + @Test (timeout = 15000) + public void testQueueState() throws IOException { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1}); + csConf.setQueues(Q1_PATH, new String[] {Q2}); + + csConf.setCapacity(Q1_PATH, 100); + csConf.setCapacity(Q2_PATH, 100); + + conf = new YarnConfiguration(csConf); + cs = new CapacityScheduler(); + + RMContext rmContext = TestUtils.getMockRMContext(); + cs.setConf(conf); + cs.setRMContext(rmContext); + cs.init(conf); + + //by default, the state of both queues should be RUNNING + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); + + // Change the state of Q1 to STOPPED, and re-initiate the CS + csConf.setState(Q1_PATH, QueueState.STOPPED); + conf = new YarnConfiguration(csConf); + cs.reinitialize(conf, rmContext); + // The state of Q1 and its child: Q2 should be STOPPED + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + + // Change the state of Q1 to RUNNING, and change the state of Q2 to STOPPED + csConf.setState(Q1_PATH, QueueState.RUNNING); + csConf.setState(Q2_PATH, QueueState.STOPPED); + conf = new YarnConfiguration(csConf); + // reinitialize the CS, the operation should be successful + cs.reinitialize(conf, rmContext); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + + // Change the state of Q1 to STOPPED, and change the state of Q2 to RUNNING + csConf.setState(Q1_PATH, QueueState.STOPPED); + csConf.setState(Q2_PATH, QueueState.RUNNING); + conf = new YarnConfiguration(csConf); + // reinitialize the CS, the operation should be failed. + try { + cs.reinitialize(conf, rmContext); + Assert.fail("Should throw an Exception."); + } catch (Exception ex) { + Assert.assertTrue(ex.getCause().getMessage().contains( + "The parent queue:q1 state is STOPPED, " + + "child queue:q2 state cannot be RUNNING.")); + } + } +}