YARN-10947. Simplify AbstractCSQueue#initializeQueueState. Contributed by Andras Gyori

This commit is contained in:
Szilard Nemeth 2022-03-03 16:44:12 +01:00
parent 1c27c69f44
commit 379baa5eb6
3 changed files with 101 additions and 67 deletions

View File

@ -349,7 +349,7 @@ protected void setupQueueConfigs(Resource clusterResource) throws
// Initialize the queue state based on previous state, configured state
// and its parent state
initializeQueueState();
QueueStateHelper.setQueueState(this);
authorizer = YarnAuthorizationProvider.getInstance(configuration);
@ -553,60 +553,6 @@ public QueueCapacityVector getConfiguredCapacityVector(
return configuredCapacityVectors.get(label);
}
private void initializeQueueState() {
QueueState previousState = getState();
QueueState configuredState = queueContext.getConfiguration()
.getConfiguredState(getQueuePath());
QueueState parentState = (parent == null) ? null : parent.getState();
// verify that we can not any value for State other than RUNNING/STOPPED
if (configuredState != null && configuredState != QueueState.RUNNING
&& configuredState != QueueState.STOPPED) {
throw new IllegalArgumentException("Invalid queue state configuration."
+ " We can only use RUNNING or STOPPED.");
}
// If we did not set state in configuration, use Running as default state
QueueState defaultState = QueueState.RUNNING;
if (previousState == null) {
// If current state of the queue is null, we would inherit the state
// from its parent. If this queue does not has parent, such as root queue,
// we would use the configured state.
if (parentState == null) {
updateQueueState((configuredState == null) ? defaultState
: configuredState);
} else {
if (configuredState == null) {
updateQueueState((parentState == QueueState.DRAINING) ?
QueueState.STOPPED : parentState);
} else if (configuredState == QueueState.RUNNING
&& parentState != QueueState.RUNNING) {
throw new IllegalArgumentException(
"The parent queue:" + parent.getQueuePath()
+ " cannot be STOPPED as the child queue:" + getQueuePath()
+ " is in RUNNING state.");
} else {
updateQueueState(configuredState);
}
}
} else {
// when we get a refreshQueue request from AdminService,
if (previousState == QueueState.RUNNING) {
if (configuredState == QueueState.STOPPED) {
stopQueue();
}
} else {
if (configuredState == QueueState.RUNNING) {
try {
activateQueue();
} catch (YarnException ex) {
throw new IllegalArgumentException(ex.getMessage());
}
}
}
}
}
protected QueueInfo getQueueInfo() {
// Deliberately doesn't use lock here, because this method will be invoked
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice

View File

@ -19,11 +19,12 @@
import java.util.Set;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -302,15 +303,4 @@ public static void updateAbsoluteCapacitiesByNodeLabels(QueueCapacities queueCap
}
}
}
public static ApplicationPlacementContext extractQueuePath(String queuePath) {
int parentQueueNameEndIndex = queuePath.lastIndexOf(".");
if (parentQueueNameEndIndex > -1) {
String parent = queuePath.substring(0, parentQueueNameEndIndex).trim();
String leaf = queuePath.substring(parentQueueNameEndIndex + 1).trim();
return new ApplicationPlacementContext(leaf, parent);
} else{
return new ApplicationPlacementContext(queuePath);
}
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.util.Set;
/**
* Collects all logic that are handling queue state transitions.
*/
public final class QueueStateHelper {
private static final Set<QueueState> VALID_STATE_CONFIGURATIONS = ImmutableSet.of(
QueueState.RUNNING, QueueState.STOPPED);
private static final QueueState DEFAULT_STATE = QueueState.RUNNING;
private QueueStateHelper() {}
/**
* Sets the current state of the queue based on its previous state, its parent's state and its
* configured state.
* @param queue the queue whose state is set
*/
public static void setQueueState(AbstractCSQueue queue) {
QueueState previousState = queue.getState();
QueueState configuredState = queue.getQueueContext().getConfiguration().getConfiguredState(
queue.getQueuePath());
QueueState parentState = (queue.getParent() == null) ? null : queue.getParent().getState();
// verify that we can not any value for State other than RUNNING/STOPPED
if (configuredState != null && !VALID_STATE_CONFIGURATIONS.contains(configuredState)) {
throw new IllegalArgumentException("Invalid queue state configuration."
+ " We can only use RUNNING or STOPPED.");
}
if (previousState == null) {
initializeState(queue, configuredState, parentState);
} else {
reinitializeState(queue, previousState, configuredState);
}
}
private static void reinitializeState(
AbstractCSQueue queue, QueueState previousState, QueueState configuredState) {
// when we get a refreshQueue request from AdminService,
if (previousState == QueueState.RUNNING) {
if (configuredState == QueueState.STOPPED) {
queue.stopQueue();
}
} else {
if (configuredState == QueueState.RUNNING) {
try {
queue.activateQueue();
} catch (YarnException ex) {
throw new IllegalArgumentException(ex.getMessage());
}
}
}
}
private static void initializeState(
AbstractCSQueue queue, QueueState configuredState, QueueState parentState) {
QueueState currentState = configuredState == null ? DEFAULT_STATE : configuredState;
if (parentState != null) {
if (configuredState == QueueState.RUNNING && parentState != QueueState.RUNNING) {
throw new IllegalArgumentException(
"The parent queue:" + queue.getParent().getQueuePath()
+ " cannot be STOPPED as the child queue:" + queue.getQueuePath()
+ " is in RUNNING state.");
}
if (configuredState == null) {
currentState = parentState == QueueState.DRAINING ? QueueState.STOPPED : parentState;
}
}
queue.updateQueueState(currentState);
}
}