YARN-10374. Create Actions for CS mapping rules. Contributed by Gergely Pollak.

This commit is contained in:
Peter Bacsko 2020-09-01 17:38:49 +02:00
parent 73a0d149e2
commit c7dab2b23e
11 changed files with 1438 additions and 1 deletions

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
/**
* This is a helper class which represents a queue path, and has easy access
* methods to get the path's parent or leaf part, or as a whole.
*/
public class MappingQueuePath {
/**
* The parent part of the queue path.
*/
private String parent;
/**
* The leaf part of the parent path.
*/
private String leaf;
/**
* Constructor to create mapping queue path from parent path and leaf name.
* @param parent Parent path of the queue
* @param leaf Name of the leaf queue
*/
public MappingQueuePath(String parent, String leaf) {
this.parent = parent;
this.leaf = leaf;
}
/**
* Constructor creates a MappingQueuePath object using the queue's full path.
* @param fullPath Full path of the queue
*/
public MappingQueuePath(String fullPath) {
setFromFullPath(fullPath);
}
/**
* This method is responsible for splitting up a full queue path into parent
* path and leaf name.
* @param fullPath Full path of the queue to be processed
*/
private void setFromFullPath(String fullPath) {
parent = null;
leaf = fullPath;
int lastDotIdx = fullPath.lastIndexOf(DOT);
if (lastDotIdx > -1) {
parent = fullPath.substring(0, lastDotIdx).trim();
leaf = fullPath.substring(lastDotIdx + 1).trim();
}
}
/**
* Getter for the parent part of the path.
* @return Parent path of the queue, null if there is no parent.
*/
public String getParent() {
return parent;
}
/**
* Getter for the leaf part of the path.
* @return The leaf queue name
*/
public String getLeafName() {
return leaf;
}
/**
* Getter for the full path of the queue.
* @return Full path of the queue
*/
public String getFullPath() {
return hasParent() ? (parent + DOT + leaf) : leaf;
}
/**
* Convenience getter to check if the queue has a parent path defined.
* @return True if there is a parent path provided
*/
public boolean hasParent() {
return parent != null;
}
@Override
public String toString() {
return getFullPath();
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* This interface represents the action part of a MappingRule, action are
* responsible to decide what should happen with the actual application
* submission.
*/
public interface MappingRuleAction {
/**
* Returns the fallback action to be taken if the main action (result returned
* by the execute method) fails.
* e.g. Target queue does not exist, or reference is ambiguous
* @return The fallback action to be taken if the main action fails
*/
MappingRuleResult getFallback();
/**
* This method is the main logic of the action, it shall determine based on
* the mapping context, what should be the action's result.
* @param variables The variable context, which contains all the variables
* @return The result of the action
*/
MappingRuleResult execute(VariableContext variables);
/**
* Sets the fallback method to reject, if the action cannot be executed the
* application will get rejected.
* @return MappingRuleAction The same object for method chaining.
*/
MappingRuleAction setFallbackReject();
/**
* Sets the fallback method to skip, if the action cannot be executed
* We move onto the next rule, ignoring this one.
* @return MappingRuleAction The same object for method chaining.
*/
MappingRuleAction setFallbackSkip();
/**
* Sets the fallback method to place to default, if the action cannot be
* executed. The application will be placed into the default queue, if the
* default queue does not exist the application will get rejected
* @return MappingRuleAction The same object for method chaining.
*/
MappingRuleAction setFallbackDefaultPlacement();
/**
* This method is responsible for config validation, the context contains all
* information required for validation, method should throw an exception on
* detectable setup errors.
* @param ctx Validation context with all the necessary objects and helper
* methods required during validation
* @throws YarnException is thrown on validation error
*/
void validate(MappingRuleValidationContext ctx) throws YarnException;
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
/**
* This class implements the fallback logic for MappingRuleActions, this can
* be extended to implement the actual logic of the actions, this should be
* a base class for most actions.
*/
public abstract class MappingRuleActionBase implements MappingRuleAction {
/**
* The default fallback method is reject, so if the action fails
* We will reject the application. However this behaviour can be overridden
* on a per rule basis
*/
private MappingRuleResult fallback = MappingRuleResult.createRejectResult();
/**
* Returns the fallback action to be taken if the main action (result returned
* by the execute method) fails.
* e.g. Target queue does not exist, or reference is ambiguous
* @return The fallback action to be taken if the main action fails
*/
public MappingRuleResult getFallback() {
return fallback;
}
/**
* Sets the fallback method to reject, if the action cannot be executed the
* application will get rejected.
* @return MappingRuleAction The same object for method chaining.
*/
public MappingRuleAction setFallbackReject() {
fallback = MappingRuleResult.createRejectResult();
return this;
}
/**
* Sets the fallback method to skip, if the action cannot be executed
* We move onto the next rule, ignoring this one.
* @return MappingRuleAction The same object for method chaining.
*/
public MappingRuleAction setFallbackSkip() {
fallback = MappingRuleResult.createSkipResult();
return this;
}
/**
* Sets the fallback method to place to default, if the action cannot be
* executed the application will be placed into the default queue, if the
* default queue does not exist the application will get rejected.
* @return MappingRuleAction The same object for method chaining.
*/
public MappingRuleAction setFallbackDefaultPlacement() {
fallback = MappingRuleResult.createDefaultPlacementResult();
return this;
}
/**
* This method is the main logic of the action, it shall determine based on
* the mapping context, what should be the action's result.
* @param variables The variable context, which contains all the variables
* @return The result of the action
*/
public abstract MappingRuleResult execute(VariableContext variables);
}

View File

@ -0,0 +1,236 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* This class contains all the actions and some helper methods to generate them.
*/
public final class MappingRuleActions {
public static final String DEFAULT_QUEUE_VARIABLE = "%default";
/**
* Utility class, hiding constructor.
*/
private MappingRuleActions() {}
/**
* PlaceToQueueAction represents a placement action, contains the pattern of
* the queue name or path in which the path variables will be substituted
* with the variable context's respective values.
*/
public static class PlaceToQueueAction extends MappingRuleActionBase {
/**
* We store the queue pattern in this variable, it may contain substitutable
* variables.
*/
private String queuePattern;
/**
* Constructor.
* @param queuePattern The queue pattern in which the application will be
* placed if this action is fired. The pattern may
* contain variables. eg. root.%primary_group.%user
*/
PlaceToQueueAction(String queuePattern) {
this.queuePattern = queuePattern == null ? "" : queuePattern;
}
/**
* This method is the main logic of the action, it will replace all the
* variables in the queuePattern with their respective values, then returns
* a placementResult with the final queue name.
* @param variables The variable context, which contains all the variables
* @return The result of the action
*/
@Override
public MappingRuleResult execute(VariableContext variables) {
String substituted = variables.replacePathVariables(queuePattern);
return MappingRuleResult.createPlacementResult(substituted);
}
/**
* This method is responsible for config validation, we use the validation
* context's helper method to validate if our path is valid. From the
* point of the action all paths are valid, that is why we need to use
* an external component which is aware of the queue structure and know
* when a queue placement is valid in that context. This way this calass can
* stay independent of the capacity scheduler's internal queue placement
* logic, yet it is able to obey it's rules.
* @param ctx Validation context with all the necessary objects and helper
* methods required during validation
* @throws YarnException is thrown on validation error
*/
@Override
public void validate(MappingRuleValidationContext ctx)
throws YarnException {
ctx.validateQueuePath(this.queuePattern);
}
@Override
public String toString() {
return "PlaceToQueueAction{" +
"queueName='" + queuePattern + '\'' +
'}';
}
}
/**
* RejectAction represents the action when the application is rejected, this
* simply will throw an error on the user's side letting it know the
* submission was rejected.
*/
public static class RejectAction extends MappingRuleActionBase {
/**
* Reject action will unconditionally return a reject result.
* @param variables The variable context, which contains all the variables
* @return Always a REJECT MappingRuleResut
*/
@Override
public MappingRuleResult execute(VariableContext variables) {
return MappingRuleResult.createRejectResult();
}
/**
* Reject action is always valid, so it is just an empty implementation
* of the defined interface method.
* @param ctx Validation context with all the necessary objects and helper
* methods required during validation
* @throws YarnException is thrown on validation error
*/
@Override
public void validate(MappingRuleValidationContext ctx) throws
YarnException {}
@Override
public String toString() {
return "RejectAction";
}
}
/**
* VariableUpdateAction represents the action which alters one of the
* mutable variables in the variable context, but doesn't do anything with
* the application. This can be used to change the default queue or define
* custom variables to be used later.
*/
public static class VariableUpdateAction extends MappingRuleActionBase {
/**
* Name of the variable to be updated (in it's full form) eg. %custom
*/
private final String variableName;
/**
* The variable's new value pattern, this may contain additional variables
* which will be evaluated on execution.
*/
private final String variableValue;
/**
* Constructor.
* @param variableName Name of the variable to be updated in the variable
* context
* @param variableValue
*/
VariableUpdateAction(String variableName, String variableValue) {
this.variableName = variableName;
this.variableValue = variableValue;
}
/**
* This execute is a bit special, compared to other actions, since it does
* not affect the placement of the application, but changes the variable
* context. So it always returns a skip result in order to ensure the
* rule evalutaion continues after the variable update.
* The exectute method will do the update to the variable context the
* variable name stored in variableName will be updated with the value
* stored in variableValue, but all variables in the variableValue will
* gets resolved first, so this way dynamic updates are possible.
* @param variables The variable context, which contains all the variables
* @return Always a skip result.
*/
@Override
public MappingRuleResult execute(VariableContext variables) {
variables.put(variableName, variables.replaceVariables(variableValue));
return MappingRuleResult.createSkipResult();
}
/**
* During the validation process we add the variable set by this action
* to the known variables, to make sure the context is aware that we might
* introduce a new custom variable. All rules after this may use this
* variable. If the variable cannot be added (eg. it is already added as
* immutable), an exception will be thrown, and the validation will fail.
* @param ctx Validation context with all the necessary objects and helper
* methods required during validation
* @throws YarnException If the variable cannot be added to the context
*/
@Override
public void validate(MappingRuleValidationContext ctx)
throws YarnException {
ctx.addVariable(this.variableName);
}
@Override
public String toString() {
return "VariableUpdateAction{" +
"variableName='" + variableName + '\'' +
", variableValue='" + variableValue + '\'' +
'}';
}
}
/**
* Convenience method to create an action which changes the default queue.
* @param queue The new value of the default queue
* @return VariableUpdateAction which will change the default queue on execute
*/
public static MappingRuleAction createUpdateDefaultAction(String queue) {
return new VariableUpdateAction(DEFAULT_QUEUE_VARIABLE, queue);
}
/**
* Convenience method to create an action which places the application to a
* queue.
* @param queue The name of the queue the application should be placed to
* @return PlaceToQueueAction which will place the application to the
* specified queue on execute
*/
public static MappingRuleAction createPlaceToQueueAction(String queue) {
return new PlaceToQueueAction(queue);
}
/**
* Convenience method to create an action which places the application to the
* DEFAULT queue.
* @return PlaceToQueueAction which will place the application to the
* DEFAULT queue on execute
*/
public static MappingRuleAction createPlaceToDefaultAction() {
return createPlaceToQueueAction(DEFAULT_QUEUE_VARIABLE);
}
/**
* Convenience method to create an action rejects the application.
* @return RejectAction which will reject the application on execute
*/
public static MappingRuleAction createRejectAction() {
return new RejectAction();
}
}

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
/**
* This class represents the outcome of an action.
*/
public final class MappingRuleResult {
/**
* The name of the queue we should place our application into.
* Only valid if result == PLACE.
*/
private final String queue;
/**
* The normalized name of the queue, since CS allows users to reference queues
* by only their leaf name, we need to normalize those queues to have full
* reference.
*/
private String normalizedQueue;
/**
* The result of the action.
*/
private MappingRuleResultType result;
/**
* To the reject result has no variable field, so we don't have to create
* a new instance all the time.
* This is THE instance which will be used to represent REJECT
*/
private static final MappingRuleResult RESULT_REJECT
= new MappingRuleResult(null, MappingRuleResultType.REJECT);
/**
* To the skip result has no variable field, so we don't have to create
* a new instance all the time.
* This is THE instance which will be used to represent SKIP
*/
private static final MappingRuleResult RESULT_SKIP
= new MappingRuleResult(null, MappingRuleResultType.SKIP);
/**
* To the default placement result has no variable field, so we don't have to
* create a new instance all the time.
* This is THE instance which will be used to represent default placement
*/
private static final MappingRuleResult RESULT_DEFAULT_PLACEMENT
= new MappingRuleResult(null, MappingRuleResultType.PLACE_TO_DEFAULT);
/**
* Constructor is private to force the user to use the predefined generator
* methods to create new instances in order to avoid inconsistent states.
* @param queue Name of the queue in which the application is supposed to be
* placed, only valid if result == PLACE
* otherwise it should be null
* @param result The type of the result
*/
private MappingRuleResult(String queue, MappingRuleResultType result) {
this.queue = queue;
this.normalizedQueue = queue;
this.result = result;
}
/**
* This method returns the result queue. Currently only makes sense when
* result == PLACE.
* @return the queue this result is about
*/
public String getQueue() {
return queue;
}
/**
* External interface for setting the normalized version of the queue. This
* class cannot normalize on it's own, but provides a way to store the
* normalized name of the target queue.
* @param normalizedQueueName The normalized name of the queue
*/
public void updateNormalizedQueue(String normalizedQueueName) {
this.normalizedQueue = normalizedQueueName;
}
/**
* This method returns the normalized name of the result queue.
* Currently only makes sense when result == PLACE
* Normalized value must be set externally, this class cannot normalize
* it just provides a way to store the normalized name of a queue
* @return the queue name this result is about
*/
public String getNormalizedQueue() {
return normalizedQueue;
}
/**
* Returns the type of the result.
* @return the type of the result.
*/
public MappingRuleResultType getResult() {
return result;
}
/**
* Generator method for place results.
* @param queue The name of the queue in which we shall place the application
* @return The generated MappingRuleResult
*/
public static MappingRuleResult createPlacementResult(String queue) {
return new MappingRuleResult(queue, MappingRuleResultType.PLACE);
}
/**
* Generator method for reject results.
* @return The generated MappingRuleResult
*/
public static MappingRuleResult createRejectResult() {
return RESULT_REJECT;
}
/**
* Generator method for skip results.
* @return The generated MappingRuleResult
*/
public static MappingRuleResult createSkipResult() {
return RESULT_SKIP;
}
/**
* Generator method for default placement results. It is a specialized
* placement result which will only use the "%default" as a queue name.
* @return The generated MappingRuleResult
*/
public static MappingRuleResult createDefaultPlacementResult() {
return RESULT_DEFAULT_PLACEMENT;
}
/**
* Returns the string representation of the object.
* @return the string representation of the object
*/
@Override
public String toString() {
if (result == MappingRuleResultType.PLACE) {
return result.name() + ": '" + normalizedQueue + "' ('" + queue + "')";
} else {
return result.name();
}
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
public enum MappingRuleResultType {
/**
* Represents a result where we simply ignore the current rule
* and move onto the next one.
*/
SKIP,
/**
* Represents a result where the application gets rejected.
*/
REJECT,
/**
* Represents a result where the application gets placed into a queue.
*/
PLACE,
/**
* Special placement, which means the application is to be placed to the
* queue marked by %default variable.
*/
PLACE_TO_DEFAULT
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.util.Set;
/**
* This interface represents a context which contains all methods and data
* required by the mapping rules to validate the initial configuration. The
* reason this is moved to a separate interface is to minimize the dependencies
* of the MappingRules, MappingRuleMatchers and MappingRule actions. This
* interface should contain all validation related data and functions, this way
* schedulers or engines can be changed without changing the MappingRules.
*/
interface MappingRuleValidationContext {
/**
* This method should determine if the provided queue path can result in
* a possible placement. It should fail if the provided path cannot be placed
* into any of the known queues regardless of the variable context.
* @param queuePath The path to check
* @return true if the validation was successful
* @throws YarnException if the provided queue path is invalid
*/
boolean validateQueuePath(String queuePath) throws YarnException;
/**
* Method to determine if the provided queue path contains any dynamic parts
* A part is dynamic if a known variable is referenced in it.
* @param queuePath The path to check
* @return true if no dynamic parts were found
*/
boolean isPathStatic(String queuePath);
/**
* This method will add a known variable to the validation context, known
* variables can be used to determine if a path is static or dynamic.
* @param variable Name of the variable
* @throws YarnException If the variable to be added has already added as an
* immutable one, an exception is thrown
*/
void addVariable(String variable) throws YarnException;
/**
* This method will add a known immutable variable to the validation context,
* known variables can be used to determine if a path is static or dynamic.
* @param variable Name of the immutable variable
* @throws YarnException If the variable to be added has already added as a
* regular, mutable variable an exception is thrown
*/
void addImmutableVariable(String variable) throws YarnException;
/**
* This method will return all the known variables.
* @return Set of the known variables
*/
Set<String> getVariables();
}

View File

@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
import java.util.Set;
public class MappingRuleValidationContextImpl
implements MappingRuleValidationContext {
/**
* We store all known variables in this set.
*/
private Set<String> knownVariables = Sets.newHashSet();
/**
* This set is to determine which variables are immutable.
*/
private Set<String> immutableVariables = Sets.newHashSet();
/**
* For queue path validations we need an instance of the queue manager
* to look up queues and their parents.
*/
private final CapacitySchedulerQueueManager queueManager;
MappingRuleValidationContextImpl(CapacitySchedulerQueueManager qm) {
queueManager = qm;
}
/**
* This method will determine if a static queue path is valid.
* @param path The static path of the queue
* @return true of the path is valid
* @throws YarnException if the path is invalid
*/
private boolean validateStaticQueuePath(MappingQueuePath path)
throws YarnException {
//Try getting queue by its full path name, if it exists it is a static
//leaf queue indeed, without any auto creation magic
CSQueue queue = queueManager.getQueue(path.getFullPath());
if (queue == null) {
//We might not be able to find the queue, because the reference was
// ambiguous this should only happen if the queue was referenced by
// leaf name only
if (queueManager.isAmbiguous(path.getFullPath())) {
throw new YarnException(
"Target queue is an ambiguous leaf queue '" +
path.getFullPath() + "'");
}
//if leaf queue does not exist,
//we need to check if the parent exists and is a managed parent
if (!path.hasParent()) {
throw new YarnException(
"Target queue does not exist and has no parent defined '" +
path.getFullPath() + "'");
}
CSQueue parentQueue = queueManager.getQueue(path.getParent());
if (parentQueue == null) {
if (queueManager.isAmbiguous(path.getParent())) {
throw new YarnException("Target queue path '" + path +
"' contains an ambiguous parent queue '" +
path.getParent() + "' reference");
} else {
throw new YarnException("Target queue path '" + path + "' " +
"contains an invalid parent queue '" + path.getParent() + "'.");
}
}
if (!(parentQueue instanceof ManagedParentQueue)) {
//If the parent path was referenced by short name, and it is not
// managed, we look up if there is a queue under it with the leaf
// queue's name
String normalizedParentPath = parentQueue.getQueuePath() + "."
+ path.getLeafName();
CSQueue normalizedQueue = queueManager.getQueue(normalizedParentPath);
if (normalizedQueue instanceof LeafQueue) {
return true;
}
if (normalizedQueue == null) {
throw new YarnException(
"Target queue '" + path.getFullPath() + "' does not exist" +
" and has a non-managed parent queue defined.");
} else {
throw new YarnException("Target queue '" + path + "' references" +
"a non-leaf queue, target queues must always be " +
"leaf queues.");
}
}
} else {
// if queue exists, validate if its an instance of leaf queue
if (!(queue instanceof LeafQueue)) {
throw new YarnException("Target queue '" + path + "' references" +
"a non-leaf queue, target queues must always be " +
"leaf queues.");
}
}
return true;
}
/**
* This method will determine if a dynamic queue path (a path which contains
* variables) is valid.
* @param path The dynamic path of the queue
* @return true of the path is valid
* @throws YarnException if the path is invalid
*/
private boolean validateDynamicQueuePath(MappingQueuePath path)
throws YarnException{
//if the queue is dynamic and we don't have a parent path, we cannot do
//any validation, since the dynamic part can be substituted to anything
//and that is the only part
if (!path.hasParent()) {
return true;
}
String parent = path.getParent();
//if the parent path has dynamic parts, we cannot do any more validations
if (!isPathStatic(parent)) {
return true;
}
//We check if the parent queue exists
CSQueue parentQueue = queueManager.getQueue(parent);
if (parentQueue == null) {
throw new YarnException("Target queue path '" + path + "' contains an " +
"invalid parent queue");
}
if (!(parentQueue instanceof ManagedParentQueue)) {
for (CSQueue queue : parentQueue.getChildQueues()) {
if (queue instanceof LeafQueue) {
//if a non managed parent queue has at least one leaf queue, this
//mapping can be valid, we cannot do any more checks
return true;
}
}
//There is no way we can place anything into the queue referenced by the
// rule, because we cannot auto create, and we don't have any leaf queues
//Actually this branch is not accessibe with the current queue hierarchy,
//there should be no parents without any leaf queues. This condition says
//for sanity checks
throw new YarnException("Target queue path '" + path + "' has" +
"a non-managed parent queue which has no LeafQueues either.");
}
return true;
}
/**
* This method should determine if the provided queue path can result in
* a possible placement. It should fail if the provided path cannot be placed
* into any of the known queues regardless of the variable context.
* @param queuePath The path to check
* @return true if the validation was successful
* @throws YarnException if the provided queue path is invalid
*/
public boolean validateQueuePath(String queuePath) throws YarnException {
MappingQueuePath path = new MappingQueuePath(queuePath);
if (isPathStatic(queuePath)) {
return validateStaticQueuePath(path);
} else {
return validateDynamicQueuePath(path);
}
}
/**
* Method to determine if the provided queue path contains any dynamic parts
* A part is dynamic if a known variable is referenced in it.
* @param queuePath The path to check
* @return true if no dynamic parts were found
*/
public boolean isPathStatic(String queuePath) {
String[] parts = queuePath.split("\\.");
for (int i = 0; i < parts.length; i++) {
if (knownVariables.contains(parts[i])) {
return false;
}
}
return true;
}
/**
* This method will add a known variable to the validation context, known
* variables can be used to determine if a path is static or dynamic.
* @param variable Name of the variable
* @throws YarnException If the variable to be added has already added as an
* immutable one, an exception is thrown
*/
public void addVariable(String variable) throws YarnException {
if (immutableVariables.contains(variable)) {
throw new YarnException("Variable '" + variable + "' is immutable " +
"cannot add to the modified variable list.");
}
knownVariables.add(variable);
}
/**
* This method will add a known immutable variable to the validation context,
* known variables can be used to determine if a path is static or dynamic.
* @param variable Name of the immutable variable
* @throws YarnException If the variable to be added has already added as a
* regular, mutable variable an exception is thrown
*/
public void addImmutableVariable(String variable) throws YarnException {
if (knownVariables.contains(variable) &&
!immutableVariables.contains(variable)) {
throw new YarnException("Variable '" + variable + "' already " +
"added as a mutable variable cannot set it to immutable.");
}
knownVariables.add(variable);
immutableVariables.add(variable);
}
/**
* This method will return all the known variables.
* @return Set of the known variables
*/
public Set<String> getVariables() {
return ImmutableSet.copyOf(knownVariables);
}
}

View File

@ -23,6 +23,7 @@
import org.apache.commons.compress.utils.Lists;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -38,6 +39,7 @@ class MockQueueHierarchyBuilder {
private Set<String> ambiguous = Sets.newHashSet();
private Map<String, String> shortNameMapping = Maps.newHashMap();
private CapacitySchedulerQueueManager queueManager;
private Map<String, List<CSQueue>> childrenMap = Maps.newHashMap();
public static MockQueueHierarchyBuilder create() {
return new MockQueueHierarchyBuilder();
@ -164,12 +166,20 @@ private AbstractCSQueue addLeafQueueAsChildOf(ParentQueue parent,
private void setQueueFields(ParentQueue parent, AbstractCSQueue newQueue,
String queueName) {
String fullPathOfQueue = parent.getQueuePath() + QUEUE_SEP + queueName;
String fullPathOfParent = parent.getQueuePath();
String fullPathOfQueue = fullPathOfParent + QUEUE_SEP + queueName;
addQueueToQueueManager(queueName, newQueue, fullPathOfQueue);
if (childrenMap.get(fullPathOfParent) == null) {
childrenMap.put(fullPathOfParent, new ArrayList<>());
}
childrenMap.get(fullPathOfParent).add(newQueue);
when(parent.getChildQueues()).thenReturn(childrenMap.get(fullPathOfParent));
when(newQueue.getParent()).thenReturn(parent);
when(newQueue.getQueuePath()).thenReturn(fullPathOfQueue);
when(newQueue.getQueueName()).thenReturn(queueName);
}
private void addQueueToQueueManager(String queueName, AbstractCSQueue queue,

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestMappingRuleActions {
void assertRejectResult(MappingRuleResult result) {
assertSame(MappingRuleResultType.REJECT, result.getResult());
}
void assertSkipResult(MappingRuleResult result) {
assertSame(MappingRuleResultType.SKIP, result.getResult());
}
void assertPlaceDefaultResult(MappingRuleResult result) {
assertSame(MappingRuleResultType.PLACE_TO_DEFAULT, result.getResult());
}
void assertPlaceResult(MappingRuleResult result, String queue) {
assertSame(MappingRuleResultType.PLACE, result.getResult());
assertEquals(queue, result.getQueue());
}
@Test
public void testRejectAction() {
VariableContext variables = new VariableContext();
MappingRuleAction reject = new MappingRuleActions.RejectAction();
MappingRuleAction rejectHelper = MappingRuleActions.createRejectAction();
assertRejectResult(reject.execute(variables));
assertRejectResult(rejectHelper.execute(variables));
}
@Test
public void testActionFallbacks() {
MappingRuleActionBase action =
new MappingRuleActions.PlaceToQueueAction("a");
action.setFallbackDefaultPlacement();
assertPlaceDefaultResult(action.getFallback());
action.setFallbackReject();
assertRejectResult(action.getFallback());
action.setFallbackSkip();
assertSkipResult(action.getFallback());
}
@Test
public void testVariableUpdateAction() {
VariableContext variables = new VariableContext();
variables.put("%default", "root.default");
variables.put("%immutable", "immutable");
variables.put("%empty", "");
variables.put("%null", null);
variables.put("%sub", "xxx");
variables.setImmutables("%immutable");
MappingRuleAction updateDefaultManual =
new MappingRuleActions.VariableUpdateAction("%default", "root.%sub");
MappingRuleAction updateDefaultHelper =
MappingRuleActions.createUpdateDefaultAction("root.%sub%sub");
MappingRuleAction updateImmutable =
new MappingRuleActions.VariableUpdateAction("%immutable", "changed");
MappingRuleAction updateEmpty =
new MappingRuleActions.VariableUpdateAction("%empty", "something");
MappingRuleAction updateNull =
new MappingRuleActions.VariableUpdateAction("%null", "non-null");
MappingRuleResult result;
result = updateDefaultManual.execute(variables);
assertSkipResult(result);
assertEquals("root.xxx", variables.get("%default"));
result = updateDefaultHelper.execute(variables);
assertSkipResult(result);
assertEquals("root.xxxxxx", variables.get("%default"));
result = updateEmpty.execute(variables);
assertSkipResult(result);
assertEquals("something", variables.get("%empty"));
result = updateNull.execute(variables);
assertSkipResult(result);
assertEquals("non-null", variables.get("%null"));
try {
updateImmutable.execute(variables);
fail("Should've failed with exception");
} catch (Exception e){
assertTrue(e instanceof IllegalStateException);
}
}
@Test
public void testPlaceToQueueAction() {
VariableContext variables = new VariableContext();
variables.put("%default", "root.default");
variables.put("%immutable", "immutable");
variables.put("%empty", "");
variables.put("%null", null);
variables.put("%sub", "xxx");
variables.setImmutables("%immutable");
MappingRuleAction placeToStatic =
new MappingRuleActions.PlaceToQueueAction("root.static.queue");
MappingRuleAction placeToDynamic =
new MappingRuleActions.PlaceToQueueAction("root.%sub.%immutable");
MappingRuleAction placeToDynamicDoubleSub =
MappingRuleActions.createPlaceToQueueAction("root.%sub%sub.%immutable");
MappingRuleAction placeToNull =
MappingRuleActions.createPlaceToQueueAction(null);
MappingRuleAction placeToEmpty =
MappingRuleActions.createPlaceToQueueAction("");
MappingRuleAction placeToNulRef =
new MappingRuleActions.PlaceToQueueAction("%null");
MappingRuleAction placeToEmptyRef =
new MappingRuleActions.PlaceToQueueAction("%empty");
MappingRuleAction placeToDefaultRef =
new MappingRuleActions.PlaceToQueueAction("%default");
assertPlaceResult(placeToStatic.execute(variables), "root.static.queue");
assertPlaceResult(placeToDynamic.execute(variables), "root.xxx.immutable");
assertPlaceResult(placeToDynamicDoubleSub.execute(variables),
"root.%sub%sub.immutable");
assertPlaceResult(placeToNull.execute(variables), "");
assertPlaceResult(placeToEmpty.execute(variables), "");
assertPlaceResult(placeToNulRef.execute(variables), "");
assertPlaceResult(placeToEmptyRef.execute(variables), "");
assertPlaceResult(placeToDefaultRef.execute(variables), "root.default");
}
@Test
public void testToStrings() {
MappingRuleAction place = new MappingRuleActions.PlaceToQueueAction(
"queue");
MappingRuleAction varUpdate = new MappingRuleActions.VariableUpdateAction(
"%var", "value");
MappingRuleAction reject = new MappingRuleActions.RejectAction();
assertEquals("PlaceToQueueAction{queueName='queue'}", place.toString());
assertEquals("VariableUpdateAction{variableName='%var'" +
", variableValue='value'}", varUpdate.toString());
assertEquals("RejectAction", reject.toString());
}
}

View File

@ -0,0 +1,216 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestMappingRuleValidationContextImpl {
@Test
public void testContextVariables() {
//Setting up queue manager and emulated queue hierarchy
CapacitySchedulerQueueManager qm =
mock(CapacitySchedulerQueueManager.class);
MockQueueHierarchyBuilder.create()
.withQueueManager(qm)
.withQueue("root.unmanaged")
.build();
when(qm.getQueue(isNull())).thenReturn(null);
MappingRuleValidationContextImpl ctx =
new MappingRuleValidationContextImpl(qm);
//in the beginning there were no variables
assertEquals(0, ctx.getVariables().size());
//hence all quese were considered static
assertTrue(ctx.isPathStatic("root.%user"));
try {
//but then suddenly there was a variable
ctx.addVariable("%user");
ctx.addVariable("%user");
assertEquals(1, ctx.getVariables().size());
//and suddenly previously static queues became dynamic
assertFalse(ctx.isPathStatic("root.%user"));
//as time passed, more and more variables joined the void
ctx.addVariable("%primary_group");
ctx.addVariable("%default");
} catch (YarnException e) {
fail("We don't expect the add variable to fail: " + e.getMessage());
}
assertEquals(3, ctx.getVariables().size());
//making more and more dynamic queues possible
assertFalse(ctx.isPathStatic("root.%primary_group.something"));
assertFalse(ctx.isPathStatic("root.something.%default"));
//but the majority of the queues remained static
assertTrue(ctx.isPathStatic("root.static"));
assertTrue(ctx.isPathStatic("root.static.%nothing"));
assertTrue(ctx.isPathStatic("root"));
assertTrue(ctx.getVariables().contains("%user"));
assertTrue(ctx.getVariables().contains("%primary_group"));
assertTrue(ctx.getVariables().contains("%default"));
assertFalse(ctx.getVariables().contains("%nothing"));
}
void assertValidPath(MappingRuleValidationContext ctx, String path) {
try {
ctx.validateQueuePath(path);
} catch (YarnException e) {
fail("Path '" + path + "' should be VALID");
}
}
void assertInvalidPath(MappingRuleValidationContext ctx, String path) {
try {
ctx.validateQueuePath(path);
fail("Path '" + path + "' should be INVALID");
} catch (YarnException e) {
//Exception is expected
}
}
@Test
public void testDynamicQueueValidation() {
//Setting up queue manager and emulated queue hierarchy
CapacitySchedulerQueueManager qm =
mock(CapacitySchedulerQueueManager.class);
MockQueueHierarchyBuilder.create()
.withQueueManager(qm)
.withQueue("root.unmanaged")
.withManagedParentQueue("root.managed")
.withQueue("root.unmanagedwithchild.child")
.withQueue("root.leaf")
.build();
when(qm.getQueue(isNull())).thenReturn(null);
MappingRuleValidationContextImpl ctx =
new MappingRuleValidationContextImpl(qm);
try {
ctx.addVariable("%dynamic");
ctx.addVariable("%user");
} catch (YarnException e) {
fail("We don't expect the add variable to fail: " + e.getMessage());
}
assertValidPath(ctx, "%dynamic");
assertValidPath(ctx, "root.%dynamic");
assertValidPath(ctx, "%user.%dynamic");
assertValidPath(ctx, "root.managed.%dynamic");
assertValidPath(ctx, "managed.%dynamic");
assertInvalidPath(ctx, "root.invalid.%dynamic");
assertInvalidPath(ctx, "root.umanaged.%dynamic");
assertValidPath(ctx, "root.unmanagedwithchild.%user");
assertValidPath(ctx, "unmanagedwithchild.%user");
}
@Test
public void testStaticQueueValidation() {
//Setting up queue manager and emulated queue hierarchy
CapacitySchedulerQueueManager qm =
mock(CapacitySchedulerQueueManager.class);
MockQueueHierarchyBuilder.create()
.withQueueManager(qm)
.withQueue("root.unmanaged")
.withManagedParentQueue("root.managed")
.withQueue("root.deep.queue.path")
.withQueue("root.ambi.ambileaf")
.withQueue("root.deep.ambi.ambileaf")
.build();
when(qm.getQueue(isNull())).thenReturn(null);
MappingRuleValidationContextImpl ctx =
new MappingRuleValidationContextImpl(qm);
assertValidPath(ctx, "root.unmanaged");
assertValidPath(ctx, "unmanaged");
assertInvalidPath(ctx, "root");
assertInvalidPath(ctx, "managed");
assertInvalidPath(ctx, "root.managed");
assertInvalidPath(ctx, "fail");
assertInvalidPath(ctx, "ambi");
assertInvalidPath(ctx, "ambileaf");
assertInvalidPath(ctx, "ambi.ambileaf");
assertValidPath(ctx, "root.ambi.ambileaf");
assertValidPath(ctx, "root.managed.a");
assertInvalidPath(ctx, "root.deep");
assertInvalidPath(ctx, "deep");
assertInvalidPath(ctx, "deep.queue");
assertInvalidPath(ctx, "root.deep.queue");
assertInvalidPath(ctx, "deep.queue.path");
assertValidPath(ctx, "queue.path");
assertInvalidPath(ctx, "queue.invalidPath");
assertValidPath(ctx, "path");
assertValidPath(ctx, "root.deep.queue.path");
}
@Test
public void testImmutableVariablesInContext() {
CapacitySchedulerQueueManager qm =
mock(CapacitySchedulerQueueManager.class);
MappingRuleValidationContextImpl ctx =
new MappingRuleValidationContextImpl(qm);
try {
ctx.addVariable("mutable");
ctx.addVariable("mutable");
} catch (YarnException e) {
fail("We should be able to add a mutable variable multiple times: "
+ e.getMessage());
}
try {
ctx.addImmutableVariable("mutable");
fail("We should receive an exception if an already added mutable" +
" variable is being marked as immutable");
} catch (YarnException e) {
//An exception is expected
}
try {
ctx.addImmutableVariable("immutable");
ctx.addImmutableVariable("immutable");
} catch (YarnException e) {
fail("We should be able to add a immutable variable multiple times: "
+ e.getMessage());
}
try {
ctx.addVariable("immutable");
fail("We should receive an exception if we try to add a variable as " +
"mutable when it was previously added as immutable");
} catch (YarnException e) {
//An exception is expected
}
}
}