YARN-9298. Implement FS placement rules using PlacementRule interface. Contributed by Wilfred Spiegelenburg.
This commit is contained in:
parent
4b7313e640
commit
0aefe2846f
|
@ -0,0 +1,113 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.w3c.dom.Element;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.assureRoot;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.isValidQueueName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Places apps in the specified default queue. If no default queue is
|
||||||
|
* specified the app is placed in root.default queue.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class DefaultPlacementRule extends FSPlacementRule {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(DefaultPlacementRule.class);
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public String defaultQueueName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the rule config from the xml config.
|
||||||
|
* @param conf An xml element from the {@link FairScheduler#conf}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setConfig(Element conf) {
|
||||||
|
// Get the flag from the config (defaults to true if not set)
|
||||||
|
createQueue = getCreateFlag(conf);
|
||||||
|
// No config can be set when no policy is defined and we use defaults
|
||||||
|
if (conf != null) {
|
||||||
|
defaultQueueName = conf.getAttribute("queue");
|
||||||
|
// A queue read from the config could be illegal check it: fall back to
|
||||||
|
// the config default if it is the case
|
||||||
|
// However we cannot clean the name as a nested name is allowed.
|
||||||
|
if (!isValidQueueName(defaultQueueName)) {
|
||||||
|
LOG.error("Default rule configured with an illegal queue name: '{}'",
|
||||||
|
defaultQueueName);
|
||||||
|
defaultQueueName = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// The queue name does not have to be set and we really use "default"
|
||||||
|
if (defaultQueueName == null || defaultQueueName.isEmpty()) {
|
||||||
|
defaultQueueName = assureRoot(YarnConfiguration.DEFAULT_QUEUE_NAME);
|
||||||
|
} else {
|
||||||
|
defaultQueueName = assureRoot(defaultQueueName);
|
||||||
|
}
|
||||||
|
LOG.debug("Default rule instantiated with queue name: {}, " +
|
||||||
|
"and create flag: {}", defaultQueueName, createQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the rule config just setting the create flag.
|
||||||
|
* @param create flag to allow queue creation for this rule
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setConfig(Boolean create) {
|
||||||
|
createQueue = create;
|
||||||
|
// No config so fall back to the real default.
|
||||||
|
defaultQueueName = assureRoot(YarnConfiguration.DEFAULT_QUEUE_NAME);
|
||||||
|
LOG.debug("Default rule instantiated with default queue name: {}, " +
|
||||||
|
"and create flag: {}", defaultQueueName, createQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean initialize(ResourceScheduler scheduler) throws IOException {
|
||||||
|
super.initialize(scheduler);
|
||||||
|
if (getParentRule() != null) {
|
||||||
|
throw new IOException(
|
||||||
|
"Parent rule must not be configured for Default rule.");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationPlacementContext getPlacementForApp(
|
||||||
|
ApplicationSubmissionContext asc, String user) {
|
||||||
|
|
||||||
|
// If we can create the queue in the rule or the queue exists return it
|
||||||
|
if (createQueue || configuredQueue(defaultQueueName)) {
|
||||||
|
return new ApplicationPlacementContext(defaultQueueName);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,166 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.w3c.dom.Element;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract base for all {@link FairScheduler} Placement Rules.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public abstract class FSPlacementRule extends PlacementRule {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(FSPlacementRule.class);
|
||||||
|
|
||||||
|
// Flag to show if the rule can create a queue
|
||||||
|
@VisibleForTesting
|
||||||
|
protected boolean createQueue = true;
|
||||||
|
private QueueManager queueManager;
|
||||||
|
private PlacementRule parentRule;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the {@link QueueManager} loaded from the scheduler when the rule is
|
||||||
|
* initialised. All rules are initialised before the can be called to place
|
||||||
|
* an application.
|
||||||
|
* @return The queue manager from the scheduler, this can never be
|
||||||
|
* <code>null</code> for an initialised rule.
|
||||||
|
*/
|
||||||
|
QueueManager getQueueManager() {
|
||||||
|
return queueManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a rule to generate the parent queue dynamically.
|
||||||
|
* @param parent A PlacementRule
|
||||||
|
*/
|
||||||
|
void setParentRule(PlacementRule parent) {
|
||||||
|
this.parentRule = parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the rule that is set to generate the parent queue dynamically.
|
||||||
|
* @return The rule set or <code>null</code> if not set.
|
||||||
|
*/
|
||||||
|
PlacementRule getParentRule() {
|
||||||
|
return parentRule;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the config based on the type of object passed in.
|
||||||
|
* @param initArg the config to be set
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setConfig(Object initArg) {
|
||||||
|
if (null == initArg) {
|
||||||
|
LOG.debug("Null object passed in: no config set");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (initArg instanceof Element) {
|
||||||
|
LOG.debug("Setting config from XML");
|
||||||
|
setConfig((Element) initArg);
|
||||||
|
} else if (initArg instanceof Boolean) {
|
||||||
|
LOG.debug("Setting config from Boolean");
|
||||||
|
setConfig((Boolean) initArg);
|
||||||
|
} else {
|
||||||
|
LOG.info("Unknown object type passed in as config for rule {}: {}",
|
||||||
|
getName(), initArg.getClass());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the rule config from the xml config.
|
||||||
|
* @param conf An xml element from the {@link FairScheduler#conf}
|
||||||
|
*/
|
||||||
|
protected void setConfig(Element conf) {
|
||||||
|
// Get the flag from the config (defaults to true if not set)
|
||||||
|
createQueue = getCreateFlag(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the rule config just setting the create flag.
|
||||||
|
* @param create flag to allow queue creation for this rule
|
||||||
|
*/
|
||||||
|
protected void setConfig(Boolean create) {
|
||||||
|
createQueue = create;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Standard initialisation for {@link FairScheduler} rules, shared by all
|
||||||
|
* rules. Each rule that extends this abstract and overrides this method must
|
||||||
|
* call <code>super.initialize()</code> to run this basic initialisation.
|
||||||
|
* @param scheduler the scheduler using the rule
|
||||||
|
* @return <code>true</code> in all cases
|
||||||
|
* @throws IOException for any errors
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean initialize(ResourceScheduler scheduler) throws IOException {
|
||||||
|
if (!(scheduler instanceof FairScheduler)) {
|
||||||
|
throw new IOException(getName() +
|
||||||
|
" rule can only be configured for the FairScheduler");
|
||||||
|
}
|
||||||
|
if (getParentRule() != null &&
|
||||||
|
getParentRule().getName().equals(getName())) {
|
||||||
|
throw new IOException("Parent rule may not be the same type as the " +
|
||||||
|
"child rule: " + getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
FairScheduler fs = (FairScheduler) scheduler;
|
||||||
|
queueManager = fs.getQueueManager();
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the queue exists and is part of the configuration i.e. not
|
||||||
|
* a {@link FSQueue#isDynamic()} queue.
|
||||||
|
* @param queueName name of the queue to check
|
||||||
|
* @return <code>true</code> if the queue exists and is a "configured" queue
|
||||||
|
*/
|
||||||
|
boolean configuredQueue(String queueName) {
|
||||||
|
FSQueue queue = queueManager.getQueue(queueName);
|
||||||
|
return (queue != null && !queue.isDynamic());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the create flag from the xml configuration element.
|
||||||
|
* @param conf The FS configuration element for the queue
|
||||||
|
* @return <code>false</code> only if the flag is set in the configuration to
|
||||||
|
* a text that is not case ignored "true", <code>true</code> in all other
|
||||||
|
* cases
|
||||||
|
*/
|
||||||
|
boolean getCreateFlag(Element conf) {
|
||||||
|
if (conf != null) {
|
||||||
|
String create = conf.getAttribute("create");
|
||||||
|
return Boolean.parseBoolean(create);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,100 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerUtilities;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility methods used by Fair scheduler placement rules.
|
||||||
|
* {@link
|
||||||
|
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public final class FairQueuePlacementUtils {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(FairQueuePlacementUtils.class);
|
||||||
|
|
||||||
|
// Constants for name clean up and hierarchy checks
|
||||||
|
protected static final String DOT = ".";
|
||||||
|
protected static final String DOT_REPLACEMENT = "_dot_";
|
||||||
|
protected static final String ROOT_QUEUE = "root";
|
||||||
|
|
||||||
|
private FairQueuePlacementUtils() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replace the periods in the username or group name with "_dot_" and
|
||||||
|
* remove trailing and leading whitespace.
|
||||||
|
*
|
||||||
|
* @param name The name to clean
|
||||||
|
* @return The name with {@link #DOT} replaced with {@link #DOT_REPLACEMENT}
|
||||||
|
*/
|
||||||
|
protected static String cleanName(String name) {
|
||||||
|
name = FairSchedulerUtilities.trimQueueName(name);
|
||||||
|
if (name.contains(DOT)) {
|
||||||
|
String converted = name.replaceAll("\\.", DOT_REPLACEMENT);
|
||||||
|
LOG.warn("Name {} is converted to {} when it is used as a queue name.",
|
||||||
|
name, converted);
|
||||||
|
return converted;
|
||||||
|
} else {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assure root prefix for a queue name.
|
||||||
|
*
|
||||||
|
* @param queueName The queue name to check for the root prefix
|
||||||
|
* @return The root prefixed queue name
|
||||||
|
*/
|
||||||
|
protected static String assureRoot(String queueName) {
|
||||||
|
if (queueName != null && !queueName.isEmpty()) {
|
||||||
|
if (!queueName.startsWith(ROOT_QUEUE + DOT) &&
|
||||||
|
!queueName.equals(ROOT_QUEUE)) {
|
||||||
|
queueName = ROOT_QUEUE + DOT + queueName;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn("AssureRoot: queueName is empty or null.");
|
||||||
|
}
|
||||||
|
return queueName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validate the queue name: it may not start or end with a {@link #DOT}.
|
||||||
|
*
|
||||||
|
* @param queueName The queue name to validate
|
||||||
|
* @return <code>false</code> if the queue name starts or ends with a
|
||||||
|
* {@link #DOT}, <code>true</code>
|
||||||
|
*/
|
||||||
|
protected static boolean isValidQueueName(String queueName) {
|
||||||
|
if (queueName != null) {
|
||||||
|
if (queueName.equals(FairSchedulerUtilities.trimQueueName(queueName)) &&
|
||||||
|
!queueName.startsWith(DOT) &&
|
||||||
|
!queueName.endsWith(DOT)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,12 +20,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory class for creating instances of {@link PlacementRule}.
|
* Factory class for creating instances of {@link PlacementRule}.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
public final class PlacementFactory {
|
public final class PlacementFactory {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
|
@ -35,6 +39,14 @@ public final class PlacementFactory {
|
||||||
// Unused.
|
// Unused.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link PlacementRule} based on the rule class from the
|
||||||
|
* configuration. This is used to instantiate rules by the scheduler which
|
||||||
|
* does not resolve the class before this call.
|
||||||
|
* @param ruleStr The name of the class to instantiate
|
||||||
|
* @param conf The configuration object to set for the rule
|
||||||
|
* @return Created class instance
|
||||||
|
*/
|
||||||
public static PlacementRule getPlacementRule(String ruleStr,
|
public static PlacementRule getPlacementRule(String ruleStr,
|
||||||
Configuration conf)
|
Configuration conf)
|
||||||
throws ClassNotFoundException {
|
throws ClassNotFoundException {
|
||||||
|
@ -43,4 +55,20 @@ public final class PlacementFactory {
|
||||||
LOG.info("Using PlacementRule implementation - " + ruleClass);
|
LOG.info("Using PlacementRule implementation - " + ruleClass);
|
||||||
return ReflectionUtils.newInstance(ruleClass, conf);
|
return ReflectionUtils.newInstance(ruleClass, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link PlacementRule} based on the rule class from the
|
||||||
|
* configuration. This is used to instantiate rules by the scheduler which
|
||||||
|
* resolve the class before this call.
|
||||||
|
* @param ruleClass The specific class reference to instantiate
|
||||||
|
* @param initArg The config to set
|
||||||
|
* @return Created class instance
|
||||||
|
*/
|
||||||
|
public static PlacementRule getPlacementRule(
|
||||||
|
Class<? extends PlacementRule> ruleClass, Object initArg) {
|
||||||
|
LOG.info("Creating PlacementRule implementation: " + ruleClass);
|
||||||
|
PlacementRule rule = ReflectionUtils.newInstance(ruleClass, null);
|
||||||
|
rule.setConfig(initArg);
|
||||||
|
return rule;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -20,35 +20,62 @@ package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract base for all Placement Rules.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
public abstract class PlacementRule {
|
public abstract class PlacementRule {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the config based on the passed in argument. This construct is used to
|
||||||
|
* not pollute this abstract class with implementation specific references.
|
||||||
|
*/
|
||||||
|
public void setConfig(Object initArg) {
|
||||||
|
// Default is a noop
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the name of the rule.
|
||||||
|
* @return The name of the rule, the fully qualified class name.
|
||||||
|
*/
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return this.getClass().getName();
|
return this.getClass().getName();
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract boolean initialize(
|
/**
|
||||||
ResourceScheduler scheduler) throws IOException;
|
* Initialize the rule with the scheduler.
|
||||||
|
* @param scheduler the scheduler using the rule
|
||||||
|
* @return <code>true</code> or <code>false</code> The outcome of the
|
||||||
|
* initialisation, rule dependent response which might not be persisted in
|
||||||
|
* the rule.
|
||||||
|
* @throws IOException for any errors
|
||||||
|
*/
|
||||||
|
public abstract boolean initialize(ResourceScheduler scheduler)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get queue for a given application
|
* Return the scheduler queue name the application should be placed in
|
||||||
|
* wrapped in an {@link ApplicationPlacementContext} object.
|
||||||
|
*
|
||||||
|
* A non <code>null</code> return value places the application in a queue,
|
||||||
|
* a <code>null</code> value means the queue is not yet determined. The
|
||||||
|
* next {@link PlacementRule} in the list maintained in the
|
||||||
|
* {@link PlacementManager} will be executed.
|
||||||
|
*
|
||||||
|
* @param asc The context of the application created on submission
|
||||||
|
* @param user The name of the user submitting the application
|
||||||
*
|
*
|
||||||
* @param asc application submission context
|
* @throws YarnException for any error while executing the rule
|
||||||
* @param user userName
|
|
||||||
*
|
*
|
||||||
* @throws YarnException
|
* @return The queue name wrapped in {@link ApplicationPlacementContext} or
|
||||||
* if any error happens
|
* <code>null</code> if no queue was resolved
|
||||||
*
|
|
||||||
* @return <p>
|
|
||||||
* non-null value means it is determined
|
|
||||||
* </p>
|
|
||||||
* <p>
|
|
||||||
* null value means it is undetermined, so next {@link PlacementRule}
|
|
||||||
* in the {@link PlacementManager} will take care
|
|
||||||
* </p>
|
|
||||||
*/
|
*/
|
||||||
public abstract ApplicationPlacementContext getPlacementForApp(
|
public abstract ApplicationPlacementContext getPlacementForApp(
|
||||||
ApplicationSubmissionContext asc, String user) throws YarnException;
|
ApplicationSubmissionContext asc, String user) throws YarnException;
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.security.Groups;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.DOT;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.assureRoot;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.cleanName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Places apps in queues by the primary group of the submitter.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class PrimaryGroupPlacementRule extends FSPlacementRule {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(PrimaryGroupPlacementRule.class);
|
||||||
|
|
||||||
|
private Groups groupProvider;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean initialize(ResourceScheduler scheduler) throws IOException {
|
||||||
|
super.initialize(scheduler);
|
||||||
|
groupProvider = new Groups(((FairScheduler)scheduler).getConfig());
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationPlacementContext getPlacementForApp(
|
||||||
|
ApplicationSubmissionContext asc, String user) throws YarnException {
|
||||||
|
|
||||||
|
// All users should have at least one group the primary group. If no groups
|
||||||
|
// are returned then there is a real issue.
|
||||||
|
final List<String> groupList;
|
||||||
|
try {
|
||||||
|
groupList = groupProvider.getGroups(user);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new YarnException("Group resolution failed", ioe);
|
||||||
|
}
|
||||||
|
if (groupList.isEmpty()) {
|
||||||
|
LOG.error("Group placement rule failed: No groups returned for user {}",
|
||||||
|
user);
|
||||||
|
throw new YarnException("No groups returned for user " + user);
|
||||||
|
}
|
||||||
|
|
||||||
|
String cleanGroup = cleanName(groupList.get(0));
|
||||||
|
String queueName;
|
||||||
|
PlacementRule parentRule = getParentRule();
|
||||||
|
|
||||||
|
if (getParentRule() != null) {
|
||||||
|
LOG.debug("PrimaryGroup rule: parent rule found: {}",
|
||||||
|
parentRule.getName());
|
||||||
|
ApplicationPlacementContext parent =
|
||||||
|
parentRule.getPlacementForApp(asc, user);
|
||||||
|
if (parent == null || getQueueManager().
|
||||||
|
getQueue(parent.getQueue()) instanceof FSLeafQueue) {
|
||||||
|
LOG.debug("PrimaryGroup rule: parent rule failed");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
LOG.debug("PrimaryGroup rule: parent rule result: {}",
|
||||||
|
parent.getQueue());
|
||||||
|
queueName = parent.getQueue() + DOT + cleanGroup;
|
||||||
|
} else {
|
||||||
|
queueName = assureRoot(cleanGroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we can create the queue in the rule or the queue exists return it
|
||||||
|
if (createQueue || configuredQueue(queueName)) {
|
||||||
|
return new ApplicationPlacementContext(queueName);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rejects all placements.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class RejectPlacementRule extends FSPlacementRule {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(RejectPlacementRule.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Reject rule does not use any configuration. Override and ignore all
|
||||||
|
* configuration.
|
||||||
|
* @param initArg the config to be set
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setConfig(Object initArg) {
|
||||||
|
// This rule ignores all config, just log and return
|
||||||
|
LOG.debug("RejectPlacementRule instantiated");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean initialize(ResourceScheduler scheduler) throws IOException {
|
||||||
|
super.initialize(scheduler);
|
||||||
|
if (getParentRule() != null) {
|
||||||
|
throw new IOException(
|
||||||
|
"Parent rule should not be configured for Reject rule.");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationPlacementContext getPlacementForApp(
|
||||||
|
ApplicationSubmissionContext asc, String user) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,102 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.security.Groups;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.DOT;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.assureRoot;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.cleanName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Places apps in queues by the secondary group of the submitter, if the
|
||||||
|
* submitter is a member of more than one group.
|
||||||
|
* The first "matching" queue based on the group list is returned. The match
|
||||||
|
* takes into account the parent rule and create flag,
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class SecondaryGroupExistingPlacementRule extends FSPlacementRule {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(SecondaryGroupExistingPlacementRule.class);
|
||||||
|
|
||||||
|
private Groups groupProvider;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean initialize(ResourceScheduler scheduler) throws IOException {
|
||||||
|
super.initialize(scheduler);
|
||||||
|
groupProvider = new Groups(((FairScheduler)scheduler).getConfig());
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationPlacementContext getPlacementForApp(
|
||||||
|
ApplicationSubmissionContext asc, String user) throws YarnException {
|
||||||
|
|
||||||
|
// All users should have at least one group the primary group. If no groups
|
||||||
|
// are returned then there is a real issue.
|
||||||
|
final List<String> groupList;
|
||||||
|
try {
|
||||||
|
groupList = groupProvider.getGroups(user);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new YarnException("Group resolution failed", ioe);
|
||||||
|
}
|
||||||
|
|
||||||
|
String parentQueue = null;
|
||||||
|
PlacementRule parentRule = getParentRule();
|
||||||
|
|
||||||
|
if (parentRule != null) {
|
||||||
|
LOG.debug("SecondaryGroupExisting rule: parent rule found: {}",
|
||||||
|
parentRule.getName());
|
||||||
|
ApplicationPlacementContext parent =
|
||||||
|
parentRule.getPlacementForApp(asc, user);
|
||||||
|
if (parent == null || getQueueManager().
|
||||||
|
getQueue(parent.getQueue()) instanceof FSLeafQueue) {
|
||||||
|
LOG.debug("SecondaryGroupExisting rule: parent rule failed");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
parentQueue = parent.getQueue();
|
||||||
|
LOG.debug("SecondaryGroupExisting rule: parent rule result: {}",
|
||||||
|
parentQueue);
|
||||||
|
}
|
||||||
|
// now check the groups inside the parent
|
||||||
|
for (int i = 1; i < groupList.size(); i++) {
|
||||||
|
String group = cleanName(groupList.get(i));
|
||||||
|
String queueName =
|
||||||
|
parentQueue == null ? assureRoot(group) : parentQueue + DOT + group;
|
||||||
|
if (configuredQueue(queueName)) {
|
||||||
|
return new ApplicationPlacementContext(queueName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,77 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.assureRoot;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.isValidQueueName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Places apps in queues by requested queue of the submitter.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class SpecifiedPlacementRule extends FSPlacementRule {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(SpecifiedPlacementRule.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean initialize(ResourceScheduler scheduler) throws IOException {
|
||||||
|
super.initialize(scheduler);
|
||||||
|
if (getParentRule() != null) {
|
||||||
|
throw new IOException(
|
||||||
|
"Parent rule should not be configured for Specified rule.");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationPlacementContext getPlacementForApp(
|
||||||
|
ApplicationSubmissionContext asc, String user) throws YarnException {
|
||||||
|
|
||||||
|
// Sanity check the provided queue
|
||||||
|
String queueName = asc.getQueue();
|
||||||
|
if (!isValidQueueName(queueName)) {
|
||||||
|
LOG.error("Specified queue name not valid: '{}'", queueName);
|
||||||
|
throw new YarnException("Application submitted by user " + user +
|
||||||
|
"with illegal queue name '" + queueName + "'.");
|
||||||
|
}
|
||||||
|
// On submission the requested queue will be set to "default" if no queue
|
||||||
|
// is specified: just check the next rule in that case
|
||||||
|
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
queueName = assureRoot(queueName);
|
||||||
|
// If we can create the queue in the rule or the queue exists return it
|
||||||
|
if (createQueue || configuredQueue(queueName)) {
|
||||||
|
return new ApplicationPlacementContext(queueName);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,70 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.DOT;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.assureRoot;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.cleanName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Places apps in queues by username of the submitter.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class UserPlacementRule extends FSPlacementRule {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(UserPlacementRule.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationPlacementContext getPlacementForApp(
|
||||||
|
ApplicationSubmissionContext asc, String user) throws YarnException {
|
||||||
|
String queueName;
|
||||||
|
|
||||||
|
String cleanUser = cleanName(user);
|
||||||
|
PlacementRule parentRule = getParentRule();
|
||||||
|
if (parentRule != null) {
|
||||||
|
LOG.debug("User rule: parent rule found: {}", parentRule.getName());
|
||||||
|
ApplicationPlacementContext parent =
|
||||||
|
parentRule.getPlacementForApp(asc, user);
|
||||||
|
if (parent == null || getQueueManager().
|
||||||
|
getQueue(parent.getQueue()) instanceof FSLeafQueue) {
|
||||||
|
LOG.debug("User rule: parent rule failed");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
LOG.debug("User rule: parent rule result: {}", parent.getQueue());
|
||||||
|
queueName = parent.getQueue() + DOT + cleanUser;
|
||||||
|
} else {
|
||||||
|
queueName = assureRoot(cleanUser);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we can create the queue in the rule or the queue exists return it
|
||||||
|
if (createQueue || configuredQueue(queueName)) {
|
||||||
|
return new ApplicationPlacementContext(queueName);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,137 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.DOT;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.DOT_REPLACEMENT;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.ROOT_QUEUE;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.assureRoot;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.cleanName;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.isValidQueueName;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests of the utility methods from {@link FairQueuePlacementUtils}.
|
||||||
|
*/
|
||||||
|
public class TestFairQueuePlacementUtils {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test name trimming and dot replacement in names.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCleanName() {
|
||||||
|
// permutations of dot placements
|
||||||
|
final String clean = "clean";
|
||||||
|
final String dotted = "not.clean";
|
||||||
|
final String multiDot = "more.un.clean";
|
||||||
|
final String seqDot = "not..clean";
|
||||||
|
final String unTrimmed = " .invalid. "; // not really a valid queue
|
||||||
|
|
||||||
|
String cleaned = cleanName(clean);
|
||||||
|
assertEquals("Name was changed and it should not", clean, cleaned);
|
||||||
|
cleaned = cleanName(dotted);
|
||||||
|
assertFalse("Cleaned name contains dots and it should not",
|
||||||
|
cleaned.contains(DOT));
|
||||||
|
cleaned = cleanName(multiDot);
|
||||||
|
assertFalse("Cleaned name contains dots and it should not",
|
||||||
|
cleaned.contains(DOT));
|
||||||
|
assertNotEquals("Multi dot failed: wrong replacements found",
|
||||||
|
cleaned.indexOf(DOT_REPLACEMENT),
|
||||||
|
cleaned.lastIndexOf(DOT_REPLACEMENT));
|
||||||
|
cleaned = cleanName(seqDot);
|
||||||
|
assertFalse("Cleaned name contains dots and it should not",
|
||||||
|
cleaned.contains(DOT));
|
||||||
|
assertNotEquals("Sequential dot failed: wrong replacements found",
|
||||||
|
cleaned.indexOf(DOT_REPLACEMENT),
|
||||||
|
cleaned.lastIndexOf(DOT_REPLACEMENT));
|
||||||
|
cleaned = cleanName(unTrimmed);
|
||||||
|
assertTrue("Trimming start failed: space not removed or dot not replaced",
|
||||||
|
cleaned.startsWith(DOT_REPLACEMENT));
|
||||||
|
assertTrue("Trimming end failed: space not removed or dot not replaced",
|
||||||
|
cleaned.endsWith(DOT_REPLACEMENT));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAssureRoot() {
|
||||||
|
// permutations of rooted queue names
|
||||||
|
final String queueName = "base";
|
||||||
|
final String rootOnly = "root";
|
||||||
|
final String rootNoDot = "rootbase";
|
||||||
|
final String alreadyRoot = "root.base";
|
||||||
|
|
||||||
|
String rooted = assureRoot(queueName);
|
||||||
|
assertTrue("Queue should have root prefix (base)",
|
||||||
|
rooted.startsWith(ROOT_QUEUE + DOT));
|
||||||
|
rooted = assureRoot(rootOnly);
|
||||||
|
assertEquals("'root' queue should not have root prefix (root)",
|
||||||
|
rootOnly, rooted);
|
||||||
|
rooted = assureRoot(rootNoDot);
|
||||||
|
assertTrue("Queue should have root prefix (rootbase)",
|
||||||
|
rooted.startsWith(ROOT_QUEUE + DOT));
|
||||||
|
assertEquals("'root' queue base was replaced and not prefixed", 5,
|
||||||
|
rooted.lastIndexOf(ROOT_QUEUE));
|
||||||
|
rooted = assureRoot(alreadyRoot);
|
||||||
|
assertEquals("Root prefixed queue changed and it should not (root.base)",
|
||||||
|
rooted, alreadyRoot);
|
||||||
|
assertNull("Null queue did not return null queue",
|
||||||
|
assureRoot(null));
|
||||||
|
assertEquals("Empty queue did not return empty name", "",
|
||||||
|
assureRoot(""));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIsValidQueueName() {
|
||||||
|
// permutations of valid/invalid names
|
||||||
|
final String valid = "valid";
|
||||||
|
final String validRooted = "root.valid";
|
||||||
|
final String rootOnly = "root";
|
||||||
|
final String startDot = ".invalid";
|
||||||
|
final String endDot = "invalid.";
|
||||||
|
final String startSpace = " invalid";
|
||||||
|
final String endSpace = "invalid ";
|
||||||
|
final String unicodeSpace = "\u00A0invalid";
|
||||||
|
|
||||||
|
assertFalse("'null' queue was not marked as invalid",
|
||||||
|
isValidQueueName(null));
|
||||||
|
assertTrue("empty queue was not tagged valid", isValidQueueName(""));
|
||||||
|
assertTrue("Simple queue name was not tagged valid (valid)",
|
||||||
|
isValidQueueName(valid));
|
||||||
|
assertTrue("Root only queue was not tagged valid (root)",
|
||||||
|
isValidQueueName(rootOnly));
|
||||||
|
assertTrue("Root prefixed queue was not tagged valid (root.valid)",
|
||||||
|
isValidQueueName(validRooted));
|
||||||
|
assertFalse("Queue starting with dot was not tagged invalid (.invalid)",
|
||||||
|
isValidQueueName(startDot));
|
||||||
|
assertFalse("Queue ending with dot was not tagged invalid (invalid.)",
|
||||||
|
isValidQueueName(endDot));
|
||||||
|
assertFalse("Queue starting with space was not tagged invalid ( invalid)",
|
||||||
|
isValidQueueName(startSpace));
|
||||||
|
assertFalse("Queue ending with space was not tagged invalid (invalid )",
|
||||||
|
isValidQueueName(endSpace));
|
||||||
|
// just one for sanity check extensive tests are in the scheduler utils
|
||||||
|
assertFalse("Queue with unicode space was not tagged as invalid (unicode)",
|
||||||
|
isValidQueueName(unicodeSpace));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the {@link PlacementFactory}.
|
||||||
|
*/
|
||||||
|
public class TestPlacementFactory {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that non existing class throws exception.
|
||||||
|
*
|
||||||
|
* @throws ClassNotFoundException
|
||||||
|
*/
|
||||||
|
@Test(expected = ClassNotFoundException.class)
|
||||||
|
public void testGetNonExistRuleText() throws ClassNotFoundException {
|
||||||
|
final String nonExist = "my.placement.Rule";
|
||||||
|
PlacementFactory.getPlacementRule(nonExist, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check existing class using the class name.
|
||||||
|
* Relies on the {@link DefaultPlacementRule} of the FS.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetExistRuleText() {
|
||||||
|
final String exists = DefaultPlacementRule.class.getCanonicalName();
|
||||||
|
PlacementRule rule = null;
|
||||||
|
try {
|
||||||
|
rule = PlacementFactory.getPlacementRule(exists, null);
|
||||||
|
} catch (ClassNotFoundException cnfe) {
|
||||||
|
fail("Class should have been found");
|
||||||
|
}
|
||||||
|
assertNotNull("Rule object is null", rule);
|
||||||
|
assertEquals("Names not equal", rule.getName(), exists);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Existing class using the class reference.
|
||||||
|
* Relies on the {@link DefaultPlacementRule} of the FS.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetRuleClass() {
|
||||||
|
PlacementRule rule = PlacementFactory.getPlacementRule(
|
||||||
|
DefaultPlacementRule.class, null);
|
||||||
|
assertNotNull("Rule object is null", rule);
|
||||||
|
// Should take anything as the second object: ignores unknown types in the
|
||||||
|
// default implementation.
|
||||||
|
rule = PlacementFactory.getPlacementRule(
|
||||||
|
DefaultPlacementRule.class, "");
|
||||||
|
assertNotNull("Rule object is null", rule);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,212 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.w3c.dom.Document;
|
||||||
|
import org.w3c.dom.Element;
|
||||||
|
|
||||||
|
import javax.xml.parsers.DocumentBuilder;
|
||||||
|
import javax.xml.parsers.DocumentBuilderFactory;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory.getPlacementRule;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple tests for FS specific parts of the PlacementRule.
|
||||||
|
*/
|
||||||
|
public class TestPlacementRuleFS {
|
||||||
|
|
||||||
|
// List of rules that are configurable (reject rule is not!)
|
||||||
|
private static final List<Class <? extends PlacementRule>> CONFIG_RULES =
|
||||||
|
new ArrayList<Class <? extends PlacementRule>>() {
|
||||||
|
{
|
||||||
|
add(DefaultPlacementRule.class);
|
||||||
|
add(PrimaryGroupPlacementRule.class);
|
||||||
|
add(SecondaryGroupExistingPlacementRule.class);
|
||||||
|
add(SpecifiedPlacementRule.class);
|
||||||
|
add(UserPlacementRule.class);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// List of rules that are not configurable
|
||||||
|
private static final List<Class <? extends PlacementRule>> NO_CONFIG_RULES =
|
||||||
|
new ArrayList<Class <? extends PlacementRule>>() {
|
||||||
|
{
|
||||||
|
add(RejectPlacementRule.class);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private final static FairSchedulerConfiguration CONF =
|
||||||
|
new FairSchedulerConfiguration();
|
||||||
|
private FairScheduler scheduler;
|
||||||
|
private QueueManager queueManager;
|
||||||
|
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void initTest() {
|
||||||
|
scheduler = mock(FairScheduler.class);
|
||||||
|
// needed for all rules that rely on group info
|
||||||
|
when(scheduler.getConfig()).thenReturn(CONF);
|
||||||
|
// needed by all rules
|
||||||
|
queueManager = new QueueManager(scheduler);
|
||||||
|
when(scheduler.getQueueManager()).thenReturn(queueManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanTest() {
|
||||||
|
queueManager = null;
|
||||||
|
scheduler = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check the create and setting the config on the rule.
|
||||||
|
* This walks over all known rules and check the behaviour:
|
||||||
|
* - no config (null object)
|
||||||
|
* - unknown object type
|
||||||
|
* - boolean object
|
||||||
|
* - xml config ({@link Element})
|
||||||
|
* - calling initialize on the rule
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRuleSetups() {
|
||||||
|
// test for config(s) and init
|
||||||
|
for (Class <? extends PlacementRule> ruleClass: CONFIG_RULES) {
|
||||||
|
ruleCreateNoConfig(ruleClass);
|
||||||
|
ruleCreateWrongObject(ruleClass);
|
||||||
|
ruleCreateBoolean(ruleClass);
|
||||||
|
ruleCreateElement(ruleClass);
|
||||||
|
ruleInit(ruleClass);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check the init of rules that do not use a config.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRuleInitOnly() {
|
||||||
|
// test for init
|
||||||
|
for (Class <? extends PlacementRule> ruleClass: NO_CONFIG_RULES) {
|
||||||
|
ruleInit(ruleClass);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ruleCreateNoConfig(Class <? extends PlacementRule> ruleClass) {
|
||||||
|
PlacementRule rule = getPlacementRule(ruleClass, null);
|
||||||
|
String name = ruleClass.getName();
|
||||||
|
assertNotNull("Rule object should not be null for " + name, rule);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ruleCreateWrongObject(
|
||||||
|
Class <? extends PlacementRule> ruleClass) {
|
||||||
|
PlacementRule rule = getPlacementRule(ruleClass, "a string object");
|
||||||
|
String name = ruleClass.getName();
|
||||||
|
assertNotNull("Rule object should not be null for " + name, rule);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ruleCreateBoolean(Class <? extends PlacementRule> ruleClass) {
|
||||||
|
PlacementRule rule = getPlacementRule(ruleClass, true);
|
||||||
|
String name = ruleClass.getName();
|
||||||
|
assertNotNull("Rule object should not be null for " + name, rule);
|
||||||
|
assertTrue("Create flag was not set to true on " + name,
|
||||||
|
getCreateFlag(rule));
|
||||||
|
rule = getPlacementRule(ruleClass, false);
|
||||||
|
assertNotNull("Rule object should not be null for " + name, rule);
|
||||||
|
assertFalse("Create flag was not set to false on " + name,
|
||||||
|
getCreateFlag(rule));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ruleCreateElement(Class <? extends PlacementRule> ruleClass) {
|
||||||
|
String str = "<rule name='not used' create=\"true\" />";
|
||||||
|
Element conf = createConf(str);
|
||||||
|
PlacementRule rule = getPlacementRule(ruleClass, conf);
|
||||||
|
String name = ruleClass.getName();
|
||||||
|
assertNotNull("Rule object should not be null for " + name, rule);
|
||||||
|
assertTrue("Create flag was not set to true on " + name,
|
||||||
|
getCreateFlag(rule));
|
||||||
|
str = "<rule name='not used' create=\"false\" />";
|
||||||
|
conf = createConf(str);
|
||||||
|
rule = getPlacementRule(ruleClass, conf);
|
||||||
|
assertNotNull("Rule object should not be null for " + name, rule);
|
||||||
|
assertFalse("Create flag was not set to false on " + name,
|
||||||
|
getCreateFlag(rule));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ruleInit(Class <? extends PlacementRule> ruleClass) {
|
||||||
|
PlacementRule rule = getPlacementRule(ruleClass, null);
|
||||||
|
String name = ruleClass.getName();
|
||||||
|
assertNotNull("Rule object should not be null for " + name, rule);
|
||||||
|
try {
|
||||||
|
rule.initialize(scheduler);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
fail("Unexpected exception on initialize of rule " + name);
|
||||||
|
}
|
||||||
|
// now set the parent rule: use the same rule as a child.
|
||||||
|
// always throws: either because parentRule is not allowed or because it
|
||||||
|
// is the same class as the child rule.
|
||||||
|
((FSPlacementRule)rule).setParentRule(rule);
|
||||||
|
boolean exceptionThrown = false;
|
||||||
|
try {
|
||||||
|
rule.initialize(scheduler);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
exceptionThrown = true;
|
||||||
|
}
|
||||||
|
assertTrue("Initialize with parent rule should have thrown exception " +
|
||||||
|
name, exceptionThrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Element createConf(String str) {
|
||||||
|
// Create a simple rule element to use in the rule create
|
||||||
|
DocumentBuilderFactory docBuilderFactory =
|
||||||
|
DocumentBuilderFactory.newInstance();
|
||||||
|
docBuilderFactory.setIgnoringComments(true);
|
||||||
|
Document doc = null;
|
||||||
|
try {
|
||||||
|
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
|
||||||
|
doc = builder.parse(IOUtils.toInputStream(str,
|
||||||
|
Charset.defaultCharset()));
|
||||||
|
} catch (Exception ex) {
|
||||||
|
fail("Element creation failed, failing test");
|
||||||
|
}
|
||||||
|
return doc.getDocumentElement();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean getCreateFlag(PlacementRule rule) {
|
||||||
|
if (rule instanceof FSPlacementRule) {
|
||||||
|
return ((FSPlacementRule)rule).createQueue;
|
||||||
|
}
|
||||||
|
fail("Rule is not a FSPlacementRule");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue