YARN-8967. Change FairScheduler to use PlacementRule interface. Contributed by Wilfred Spiegelenburg.

This commit is contained in:
yufei 2019-03-25 22:47:24 -07:00
parent c99b107772
commit 5257f50abb
27 changed files with 1337 additions and 1215 deletions

View File

@ -68,6 +68,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Times;
@ -418,7 +420,8 @@ private RMAppImpl createAndPopulateNewRMApp(
// We only replace the queue when it's a new application
if (!isRecovery) {
replaceQueueFromPlacementContext(placementContext, submissionContext);
copyPlacementQueueToSubmissionContext(placementContext,
submissionContext);
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(
@ -443,38 +446,60 @@ private RMAppImpl createAndPopulateNewRMApp(
submissionContext.setPriority(appPriority);
}
// Since FairScheduler queue mapping is done inside scheduler,
// if FairScheduler is used and the queue doesn't exist, we should not
// fail here because queue will be created inside FS. Ideally, FS queue
// mapping should be done outside scheduler too like CS.
// For now, exclude FS for the acl check.
if (!isRecovery && YarnConfiguration.isAclEnabled(conf)
&& scheduler instanceof CapacityScheduler) {
String queueName = submissionContext.getQueue();
String appName = submissionContext.getApplicationName();
CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
if (!isRecovery && YarnConfiguration.isAclEnabled(conf)) {
if (scheduler instanceof CapacityScheduler) {
String queueName = submissionContext.getQueue();
String appName = submissionContext.getApplicationName();
CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
if (csqueue == null && placementContext != null) {
//could be an auto created queue through queue mapping. Validate
// parent queue exists and has valid acls
String parentQueueName = placementContext.getParentQueue();
csqueue = ((CapacityScheduler) scheduler).getQueue(parentQueueName);
if (csqueue == null && placementContext != null) {
//could be an auto created queue through queue mapping. Validate
// parent queue exists and has valid acls
String parentQueueName = placementContext.getParentQueue();
csqueue = ((CapacityScheduler) scheduler).getQueue(parentQueueName);
}
if (csqueue != null
&& !authorizer.checkPermission(
new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
SchedulerUtils.toAccessType(QueueACL.SUBMIT_APPLICATIONS),
applicationId.toString(), appName, Server.getRemoteAddress(),
null))
&& !authorizer.checkPermission(
new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
SchedulerUtils.toAccessType(QueueACL.ADMINISTER_QUEUE),
applicationId.toString(), appName, Server.getRemoteAddress(),
null))) {
throw RPCUtil.getRemoteException(new AccessControlException(
"User " + user + " does not have permission to submit "
+ applicationId + " to queue "
+ submissionContext.getQueue()));
}
}
if (csqueue != null
&& !authorizer.checkPermission(
new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
SchedulerUtils.toAccessType(QueueACL.SUBMIT_APPLICATIONS),
applicationId.toString(), appName, Server.getRemoteAddress(),
null))
&& !authorizer.checkPermission(
new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
SchedulerUtils.toAccessType(QueueACL.ADMINISTER_QUEUE),
applicationId.toString(), appName, Server.getRemoteAddress(),
null))) {
throw RPCUtil.getRemoteException(new AccessControlException(
"User " + user + " does not have permission to submit "
+ applicationId + " to queue " + submissionContext.getQueue()));
if (scheduler instanceof FairScheduler) {
// if we have not placed the app just skip this, the submit will be
// rejected in the scheduler.
if (placementContext != null) {
// The queue might not be created yet. Walk up the tree to check the
// parent ACL. The queueName is assured root which always exists
String queueName = submissionContext.getQueue();
FSQueue queue = ((FairScheduler) scheduler).getQueueManager().
getQueue(queueName);
while (queue == null) {
int sepIndex = queueName.lastIndexOf(".");
queueName = queueName.substring(0, sepIndex);
queue = ((FairScheduler) scheduler).getQueueManager().
getQueue(queueName);
}
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) &&
!queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
throw RPCUtil.getRemoteException(new AccessControlException(
"User " + user + " does not have permission to submit " +
applicationId + " to queue " +
submissionContext.getQueue() +
" denied by ACL for queue " + queueName));
}
}
}
}
@ -841,34 +866,38 @@ ApplicationPlacementContext placeApplication(
// Placement could also fail if the user doesn't exist in system
// skip if the user is not found during recovery.
if (isRecovery) {
LOG.warn("PlaceApplication failed,skipping on recovery of rm");
LOG.warn("Application placement failed for user " + user +
" and application " + context.getApplicationId() +
", skipping placement on recovery of rm", e);
return placementContext;
}
throw e;
}
}
if (placementContext == null && (context.getQueue() == null) || context
.getQueue().isEmpty()) {
// The submission context when created often has a queue set. In case of
// the FairScheduler a null placement context is still considered as a
// failure, even when a queue is provided on submit. This case handled in
// the scheduler.
if (placementContext == null && (context.getQueue() == null) ||
context.getQueue().isEmpty()) {
String msg = "Failed to place application " + context.getApplicationId()
+ " to queue and specified " + "queue is invalid : " + context
.getQueue();
+ " in a queue and submit context queue is null or empty";
LOG.error(msg);
throw new YarnException(msg);
}
return placementContext;
}
void replaceQueueFromPlacementContext(
private void copyPlacementQueueToSubmissionContext(
ApplicationPlacementContext placementContext,
ApplicationSubmissionContext context) {
// Set it to ApplicationSubmissionContext
//apply queue mapping only to new application submissions
// Set the queue from the placement in the ApplicationSubmissionContext
// Placement rule are only considered for new applications
if (placementContext != null && !StringUtils.equalsIgnoreCase(
context.getQueue(), placementContext.getQueue())) {
LOG.info("Placed application=" + context.getApplicationId() +
" to queue=" + placementContext.getQueue() + ", original queue="
+ context
.getQueue());
LOG.info("Placed application with ID " + context.getApplicationId() +
" in queue: " + placementContext.getQueue() +
", original submission queue was: " + context.getQueue());
context.setQueue(placementContext.getQueue());
}
}

View File

@ -58,10 +58,12 @@ QueueManager getQueueManager() {
}
/**
* Set a rule to generate the parent queue dynamically.
* Set a rule to generate the parent queue dynamically. The parent rule
* should only be called on rule creation when the policy is read from the
* configuration.
* @param parent A PlacementRule
*/
void setParentRule(PlacementRule parent) {
public void setParentRule(PlacementRule parent) {
this.parentRule = parent;
}
@ -69,7 +71,8 @@ void setParentRule(PlacementRule parent) {
* Get the rule that is set to generate the parent queue dynamically.
* @return The rule set or <code>null</code> if not set.
*/
PlacementRule getParentRule() {
@VisibleForTesting
public PlacementRule getParentRule() {
return parentRule;
}
@ -149,6 +152,14 @@ boolean configuredQueue(String queueName) {
return (queue != null && !queue.isDynamic());
}
/**
* Get the create flag as set during the config setup.
* @return The value of the {@link #createQueue} flag
*/
public boolean getCreateFlag() {
return createQueue;
}
/**
* Get the create flag from the xml configuration element.
* @param conf The FS configuration element for the queue
@ -159,7 +170,7 @@ boolean configuredQueue(String queueName) {
boolean getCreateFlag(Element conf) {
if (conf != null) {
String create = conf.getAttribute("create");
return Boolean.parseBoolean(create);
return create.isEmpty() || Boolean.parseBoolean(create);
}
return true;
}

View File

@ -28,7 +28,7 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
/**
* Utility class for QueuePlacementRule.
* Utility class for Capacity Scheduler queue PlacementRules.
*/
public final class QueuePlacementRuleUtils {

View File

@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
@ -94,10 +93,6 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
//Map for maximum container resource allocation per queues by queue name
private final Map<String, Resource> queueMaxContainerAllocationMap;
// Policy for mapping apps to queues
@VisibleForTesting
QueuePlacementPolicy placementPolicy;
//Configured queues in the alloc xml
@VisibleForTesting
Map<FSQueueType, Set<String>> configuredQueues;
@ -107,9 +102,16 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
private final Set<String> nonPreemptableQueues;
/**
* Create a fully initialised configuration for the scheduler.
* @param queueProperties The list of queues and their properties from the
* configuration.
* @param allocationFileParser The allocation file parser
* @param globalReservationQueueConfig The reservation queue config
* @throws AllocationConfigurationException
*/
public AllocationConfiguration(QueueProperties queueProperties,
AllocationFileParser allocationFileParser,
QueuePlacementPolicy newPlacementPolicy,
ReservationQueueConfiguration globalReservationQueueConfig)
throws AllocationConfigurationException {
this.minQueueResources = queueProperties.getMinQueueResources();
@ -138,14 +140,19 @@ public AllocationConfiguration(QueueProperties queueProperties,
this.resAcls = queueProperties.getReservationAcls();
this.reservableQueues = queueProperties.getReservableQueues();
this.globalReservationQueueConfig = globalReservationQueueConfig;
this.placementPolicy = newPlacementPolicy;
this.configuredQueues = queueProperties.getConfiguredQueues();
this.nonPreemptableQueues = queueProperties.getNonPreemptableQueues();
this.queueMaxContainerAllocationMap =
queueProperties.getMaxContainerAllocation();
}
public AllocationConfiguration(Configuration conf) {
/**
* Create a base scheduler configuration with just the defaults set.
* Should only be called to init a basic setup on scheduler init.
* @param scheduler The {@link FairScheduler} to create and initialise the
* placement policy.
*/
public AllocationConfiguration(FairScheduler scheduler) {
minQueueResources = new HashMap<>();
maxChildQueueResources = new HashMap<>();
maxQueueResources = new HashMap<>();
@ -169,8 +176,7 @@ public AllocationConfiguration(Configuration conf) {
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<>());
}
placementPolicy =
QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
QueuePlacementPolicy.fromConfiguration(scheduler);
nonPreemptableQueues = new HashSet<>();
queueMaxContainerAllocationMap = new HashMap<>();
}
@ -309,10 +315,6 @@ public Map<FSQueueType, Set<String>> getConfiguredQueues() {
return configuredQueues;
}
public QueuePlacementPolicy getPlacementPolicy() {
return placementPolicy;
}
@Override
public boolean isReservable(String queue) {
return reservableQueues.contains(queue);

View File

@ -78,6 +78,7 @@ public class AllocationFileLoaderService extends AbstractService {
"(?i)(hdfs)|(file)|(s3a)|(viewfs)";
private final Clock clock;
private final FairScheduler scheduler;
// Last time we successfully reloaded queues
private volatile long lastSuccessfulReload;
@ -95,14 +96,15 @@ public class AllocationFileLoaderService extends AbstractService {
private Thread reloadThread;
private volatile boolean running = true;
public AllocationFileLoaderService() {
this(SystemClock.getInstance());
AllocationFileLoaderService(FairScheduler scheduler) {
this(SystemClock.getInstance(), scheduler);
}
private List<Permission> defaultPermissions;
public AllocationFileLoaderService(Clock clock) {
AllocationFileLoaderService(Clock clock, FairScheduler scheduler) {
super(AllocationFileLoaderService.class.getName());
this.scheduler = scheduler;
this.clock = clock;
}
@ -254,17 +256,15 @@ public synchronized void reloadAllocations()
new AllocationFileQueueParser(allocationFileParser.getQueueElements());
QueueProperties queueProperties = queueParser.parse();
// Load placement policy and pass it configured queues
Configuration conf = getConfig();
QueuePlacementPolicy newPlacementPolicy =
getQueuePlacementPolicy(allocationFileParser, queueProperties, conf);
// Load placement policy
getQueuePlacementPolicy(allocationFileParser);
setupRootQueueProperties(allocationFileParser, queueProperties);
ReservationQueueConfiguration globalReservationQueueConfig =
createReservationQueueConfig(allocationFileParser);
AllocationConfiguration info = new AllocationConfiguration(queueProperties,
allocationFileParser, newPlacementPolicy, globalReservationQueueConfig);
allocationFileParser, globalReservationQueueConfig);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
@ -272,17 +272,15 @@ public synchronized void reloadAllocations()
reloadListener.onReload(info);
}
private QueuePlacementPolicy getQueuePlacementPolicy(
AllocationFileParser allocationFileParser,
QueueProperties queueProperties, Configuration conf)
private void getQueuePlacementPolicy(
AllocationFileParser allocationFileParser)
throws AllocationConfigurationException {
if (allocationFileParser.getQueuePlacementPolicy().isPresent()) {
return QueuePlacementPolicy.fromXml(
QueuePlacementPolicy.fromXml(
allocationFileParser.getQueuePlacementPolicy().get(),
queueProperties.getConfiguredQueues(), conf);
scheduler);
} else {
return QueuePlacementPolicy.fromConfiguration(conf,
queueProperties.getConfiguredQueues());
QueuePlacementPolicy.fromConfiguration(scheduler);
}
}

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -214,7 +215,7 @@ public class FairScheduler extends
public FairScheduler() {
super(FairScheduler.class.getName());
context = new FSContext(this);
allocsLoader = new AllocationFileLoaderService();
allocsLoader = new AllocationFileLoaderService(this);
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
@ -223,6 +224,10 @@ public FSContext getContext() {
return context;
}
public RMContext getRMContext() {
return rmContext;
}
public boolean isAtLeastReservationThreshold(
ResourceCalculator resourceCalculator, Resource resource) {
return Resources.greaterThanOrEqual(resourceCalculator,
@ -455,32 +460,52 @@ public int getContinuousSchedulingSleepMs() {
* configured limits, but the app will not be marked as runnable.
*/
protected void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering) {
if (queueName == null || queueName.isEmpty()) {
String message =
"Reject application " + applicationId + " submitted by user " + user
+ " with an empty queue name.";
rejectApplicationWithMessage(applicationId, message);
return;
}
if (queueName.startsWith(".") || queueName.endsWith(".")) {
String message =
"Reject application " + applicationId + " submitted by user " + user
+ " with an illegal queue name " + queueName + ". "
+ "The queue name cannot start/end with period.";
String queueName, String user, boolean isAppRecovering,
ApplicationPlacementContext placementContext) {
// If the placement was rejected the placementContext will be null.
// We ignore placement rules on recovery.
if (!isAppRecovering && placementContext == null) {
String message = "Reject application " + applicationId +
" submitted by user " + user +
" application rejected by placement rules.";
rejectApplicationWithMessage(applicationId, message);
return;
}
writeLock.lock();
try {
RMApp rmApp = rmContext.getRMApps().get(applicationId);
FSLeafQueue queue = assignToQueue(rmApp, queueName, user, applicationId);
// Assign the app to the queue creating and prevent queue delete.
FSLeafQueue queue = queueMgr.getLeafQueue(queueName, true,
applicationId);
if (queue == null) {
rejectApplicationWithMessage(applicationId,
queueName + " is not a leaf queue");
return;
}
// Enforce ACLs: 2nd check, there could be a time laps between the app
// creation in the RMAppManager and getting here. That means we could
// have a configuration change (prevent race condition)
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
user);
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) &&
!queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
String msg = "User " + user + " does not have permission to submit " +
applicationId + " to queue " + queueName;
rejectApplicationWithMessage(applicationId, msg);
queue.removeAssignedApp(applicationId);
return;
}
RMApp rmApp = rmContext.getRMApps().get(applicationId);
if (rmApp != null) {
rmApp.setQueue(queueName);
} else {
LOG.error("Couldn't find RM app for " + applicationId +
" to set queue name on");
}
if (rmApp != null && rmApp.getAMResourceRequests() != null) {
// Resources.fitsIn would always return false when queueMaxShare is 0
// for any resource, but only using Resources.fitsIn is not enough
@ -499,7 +524,7 @@ protected void addApplication(ApplicationId applicationId,
+ "it has zero amount of resource for a requested "
+ "resource! Invalid requested AM resources: %s, "
+ "maximum queue resources: %s",
applicationId, queue.getName(),
applicationId, queueName,
invalidAMResourceRequests, queue.getMaxShare());
rejectApplicationWithMessage(applicationId, msg);
queue.removeAssignedApp(applicationId);
@ -507,27 +532,13 @@ protected void addApplication(ApplicationId applicationId,
}
}
// Enforce ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
user);
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue
.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
String msg = "User " + userUgi.getUserName()
+ " cannot submit applications to queue " + queue.getName()
+ "(requested queuename is " + queueName + ")";
rejectApplicationWithMessage(applicationId, msg);
queue.removeAssignedApp(applicationId);
return;
}
SchedulerApplication<FSAppAttempt> application =
new SchedulerApplication<FSAppAttempt>(queue, user);
new SchedulerApplication<>(queue, user);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queue.getName()
+ ", in queue: " + queueName
+ ", currently num of applications: " + applications.size());
if (isAppRecovering) {
LOG.debug("{} is recovering. Skip notifying APP_ACCEPTED",
@ -596,60 +607,6 @@ protected void addApplicationAttempt(
}
}
/**
* Helper method for the tests to assign the app to a queue.
*/
@VisibleForTesting
FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
return assignToQueue(rmApp, queueName, user, null);
}
/**
* Helper method that attempts to assign the app to a queue. The method is
* responsible to call the appropriate event-handler if the app is rejected.
*/
private FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user,
ApplicationId applicationId) {
FSLeafQueue queue = null;
String appRejectMsg = null;
try {
QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
queueName = placementPolicy.assignAppToQueue(queueName, user);
if (queueName == null) {
appRejectMsg = "Application rejected by queue placement policy";
} else {
queue = queueMgr.getLeafQueue(queueName, true, applicationId);
if (queue == null) {
appRejectMsg = queueName + " is not a leaf queue";
}
}
} catch (IllegalStateException se) {
appRejectMsg = "Unable to match app " + rmApp.getApplicationId() +
" to a queue placement policy, and no valid terminal queue " +
" placement rule is configured. Please contact an administrator " +
" to confirm that the fair scheduler configuration contains a " +
" valid terminal queue placement rule.";
} catch (InvalidQueueNameException qne) {
appRejectMsg = qne.getMessage();
} catch (IOException ioe) {
// IOException should only happen for a user without groups
appRejectMsg = "Error assigning app to a queue: " + ioe.getMessage();
}
if (appRejectMsg != null && rmApp != null) {
rejectApplicationWithMessage(rmApp.getApplicationId(), appRejectMsg);
return null;
}
if (rmApp != null) {
rmApp.setQueue(queue.getName());
} else {
LOG.error("Couldn't find RM app to set queue name on");
}
return queue;
}
private void removeApplication(ApplicationId applicationId,
RMAppState finalState) {
SchedulerApplication<FSAppAttempt> application = applications.remove(
@ -1265,7 +1222,8 @@ public void handle(SchedulerEvent event) {
if (queueName != null) {
addApplication(appAddedEvent.getApplicationId(),
queueName, appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
appAddedEvent.getIsAppRecovering(),
appAddedEvent.getPlacementContext());
}
break;
case APP_REMOVED:
@ -1442,12 +1400,8 @@ private void initScheduler(Configuration conf) throws IOException {
// This stores per-application scheduling information
this.applications = new ConcurrentHashMap<>();
allocConf = new AllocationConfiguration(conf);
try {
queueMgr.initialize(conf);
} catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
allocConf = new AllocationConfiguration(this);
queueMgr.initialize();
if (continuousSchedulingEnabled) {
// Continuous scheduling is deprecated log it on startup

View File

@ -18,34 +18,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.ParserConfigurationException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.xml.sax.SAXException;
import com.google.common.annotations.VisibleForTesting;
import java.util.Iterator;
import java.util.Set;
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
@ -106,8 +100,7 @@ public FSParentQueue getRootQueue() {
return rootQueue;
}
public void initialize(Configuration conf) throws IOException,
SAXException, AllocationConfigurationException, ParserConfigurationException {
public void initialize() {
// Policies of root and default queue are set to
// SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been
// loaded yet.

View File

@ -23,85 +23,261 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.RejectPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.SecondaryGroupExistingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory.getPlacementRule;
/**
* The FairScheduler rules based policy for placing an application in a queue.
* It parses the configuration and updates the {@link
* org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager}
* with a list of {@link PlacementRule}s to execute in order.
*/
@Private
@Unstable
public class QueuePlacementPolicy {
private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses;
static {
Map<String, Class<? extends QueuePlacementRule>> map =
new HashMap<String, Class<? extends QueuePlacementRule>>();
map.put("user", QueuePlacementRule.User.class);
map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
map.put("secondaryGroupExistingQueue",
QueuePlacementRule.SecondaryGroupExistingQueue.class);
map.put("specified", QueuePlacementRule.Specified.class);
map.put("nestedUserQueue",
QueuePlacementRule.NestedUserQueue.class);
map.put("default", QueuePlacementRule.Default.class);
map.put("reject", QueuePlacementRule.Reject.class);
ruleClasses = Collections.unmodifiableMap(map);
final class QueuePlacementPolicy {
private static final Logger LOG =
LoggerFactory.getLogger(QueuePlacementPolicy.class);
// Simple private class to make the rule mapping simpler.
private static final class RuleMap {
private final Class<? extends PlacementRule> ruleClass;
private final String terminal;
private RuleMap(Class<? extends PlacementRule> clazz, String terminate) {
this.ruleClass = clazz;
this.terminal = terminate;
}
}
private final List<QueuePlacementRule> rules;
private final Map<FSQueueType, Set<String>> configuredQueues;
private final Groups groups;
public QueuePlacementPolicy(List<QueuePlacementRule> rules,
Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
// The list of known rules:
// key to the map is the name in the configuration.
// for each name the mapping contains the class name of the implementation
// and a flag (true, false or create) which describes the terminal state
// see the method getTerminal() for more comments.
private static final Map<String, RuleMap> RULES;
static {
Map<String, RuleMap> map = new HashMap<>();
map.put("user", new RuleMap(UserPlacementRule.class, "create"));
map.put("primaryGroup",
new RuleMap(PrimaryGroupPlacementRule.class, "create"));
map.put("secondaryGroupExistingQueue",
new RuleMap(SecondaryGroupExistingPlacementRule.class, "false"));
map.put("specified", new RuleMap(SpecifiedPlacementRule.class, "false"));
map.put("nestedUserQueue", new RuleMap(UserPlacementRule.class, "create"));
map.put("default", new RuleMap(DefaultPlacementRule.class, "create"));
map.put("reject", new RuleMap(RejectPlacementRule.class, "true"));
RULES = Collections.unmodifiableMap(map);
}
private QueuePlacementPolicy() {
}
/**
* Update the rules in the manager based on this placement policy.
* @param newRules The new list of rules to set in the manager.
* @param newTerminalState The list of terminal states for this set of rules.
* @param fs the reference to the scheduler needed in the rule on init.
* @throws AllocationConfigurationException for any errors
*/
private static void updateRuleSet(List<PlacementRule> newRules,
List<Boolean> newTerminalState,
FairScheduler fs)
throws AllocationConfigurationException {
for (int i = 0; i < rules.size()-1; i++) {
if (rules.get(i).isTerminal()) {
if (newRules.isEmpty()) {
LOG.debug("Empty rule set defined, ignoring update");
return;
}
LOG.debug("Placement rule order check");
for (int i = 0; i < newTerminalState.size()-1; i++) {
if (newTerminalState.get(i)) {
throw new AllocationConfigurationException("Rules after rule "
+ i + " in queue placement policy can never be reached");
+ (i+1) + " in queue placement policy can never be reached");
}
}
if (!rules.get(rules.size()-1).isTerminal()) {
if (!newTerminalState.get(newTerminalState.size()-1)) {
throw new AllocationConfigurationException(
"Could get past last queue placement rule without assigning");
}
this.rules = rules;
this.configuredQueues = configuredQueues;
groups = new Groups(conf);
// Set the scheduler in the rule to get queues etc
LOG.debug("Initialising new rule set");
try {
for (PlacementRule rule: newRules){
rule.initialize(fs);
}
} catch (IOException ioe) {
// We should never throw as we pass in a FS object, however we still
// should consider any exception here a config error.
throw new AllocationConfigurationException(
"Rule initialisation failed with exception", ioe);
}
// Update the placement manager with the new rule list.
// We only get here when all rules are OK.
fs.getRMContext().getQueuePlacementManager().updateRules(newRules);
LOG.debug("PlacementManager active with new rule set");
}
/**
* Builds a QueuePlacementPolicy from an xml element.
* Builds a QueuePlacementPolicy from a xml element.
* @param confElement the placement policy xml snippet from the
* {@link FairSchedulerConfiguration}
* @param fs the reference to the scheduler needed in the rule on init.
* @throws AllocationConfigurationException for any errors
*/
public static QueuePlacementPolicy fromXml(Element el,
Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
static void fromXml(Element confElement, FairScheduler fs)
throws AllocationConfigurationException {
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
NodeList elements = el.getChildNodes();
LOG.debug("Reloading placement policy from allocation config");
if (confElement == null || !confElement.hasChildNodes()) {
throw new AllocationConfigurationException(
"Empty configuration for QueuePlacementPolicy is not allowed");
}
List<PlacementRule> newRules = new ArrayList<>();
List<Boolean> newTerminalState = new ArrayList<>();
NodeList elements = confElement.getChildNodes();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
QueuePlacementRule rule = createAndInitializeRule(node);
rules.add(rule);
if (node instanceof Element &&
node.getNodeName().equalsIgnoreCase("rule")) {
String name = ((Element) node).getAttribute("name");
LOG.debug("Creating new rule: {}", name);
PlacementRule rule = createRule((Element)node);
// The only child node that we currently know is a parent rule
PlacementRule parentRule = null;
String parentName = null;
Element child = getParentRuleElement(node);
if (child != null) {
parentName = child.getAttribute("name");
parentRule = getParentRule(child, fs);
}
// Need to make sure that the nestedUserQueue has a parent for
// backwards compatibility
if (name.equalsIgnoreCase("nestedUserQueue") && parentRule == null) {
throw new AllocationConfigurationException("Rule '" + name
+ "' must have a parent rule set");
}
newRules.add(rule);
if (parentRule == null) {
newTerminalState.add(
getTerminal(RULES.get(name).terminal, rule));
} else {
((FSPlacementRule)rule).setParentRule(parentRule);
newTerminalState.add(
getTerminal(RULES.get(name).terminal, rule) &&
getTerminal(RULES.get(parentName).terminal, parentRule));
}
}
}
return new QueuePlacementPolicy(rules, configuredQueues, conf);
updateRuleSet(newRules, newTerminalState, fs);
}
/**
* Create and initialize a rule given a xml node
* @param node
* @return QueuePlacementPolicy
* @throws AllocationConfigurationException
* Find the element that defines the parent rule.
* @param node the xml node to check for a parent rule
* @return {@link Element} that describes the parent rule or
* <code>null</code> if none is found
*/
public static QueuePlacementRule createAndInitializeRule(Node node)
private static Element getParentRuleElement(Node node)
throws AllocationConfigurationException {
Element parent = null;
// walk over the node list
if (node.hasChildNodes()) {
NodeList childList = node.getChildNodes();
for (int j = 0; j < childList.getLength(); j++) {
Node child = childList.item(j);
if (child instanceof Element &&
child.getNodeName().equalsIgnoreCase("rule")) {
if (parent != null) {
LOG.warn("Rule '{}' has multiple parent rules defined, only the " +
"last parent rule will be used",
((Element) node).getAttribute("name"));
}
parent = ((Element) child);
}
}
}
// sanity check the rule that is configured
if (parent != null) {
String parentName = parent.getAttribute("name");
if (parentName.equals("reject") ||
parentName.equals("nestedUserQueue")) {
throw new AllocationConfigurationException("Rule '"
+ parentName
+ "' is not allowed as a parent rule for any rule");
}
}
return parent;
}
/**
* Retrieve the configured parent rule from the xml config.
* @param parent the xml element that contains the name of the rule to add.
* @param fs the reference to the scheduler needed in the rule on init.
* @return {@link PlacementRule} to set as a parent
* @throws AllocationConfigurationException for any error
*/
private static PlacementRule getParentRule(Element parent,
FairScheduler fs)
throws AllocationConfigurationException {
LOG.debug("Creating new parent rule: {}", parent.getAttribute("name"));
PlacementRule parentRule = createRule(parent);
// Init the rule, we do not want to add it to the list of the
// placement manager
try {
parentRule.initialize(fs);
} catch (IOException ioe) {
// We should never throw as we pass in a FS object, however we
// still should consider any exception here a config error.
throw new AllocationConfigurationException(
"Parent Rule initialisation failed with exception", ioe);
}
return parentRule;
}
/**
* Returns the terminal status of the rule based on the definition and the
* create flag set in the rule.
* @param terminal The definition of the terminal flag
* @param rule The rule to check
* @return <code>true</code> if the rule is terminal <code>false</code> in
* all other cases.
*/
private static Boolean getTerminal(String terminal, PlacementRule rule) {
switch (terminal) {
case "true": // rule is always terminal
return true;
case "false": // rule is never terminal
return false;
default: // rule is terminal based on the create flag
return ((FSPlacementRule)rule).getCreateFlag();
}
}
/**
* Create a rule from a given a xml node.
* @param element the xml element to create the rule from
* @return PlacementRule
* @throws AllocationConfigurationException for any error
*/
@SuppressWarnings("unchecked")
private static PlacementRule createRule(Element element)
throws AllocationConfigurationException {
Element element = (Element) node;
String ruleName = element.getAttribute("name");
if ("".equals(ruleName)) {
@ -109,72 +285,54 @@ public static QueuePlacementRule createAndInitializeRule(Node node)
+ "rule element");
}
Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
if (clazz == null) {
Class<? extends PlacementRule> ruleClass = null;
if (RULES.containsKey(ruleName)) {
ruleClass = RULES.get(ruleName).ruleClass;
}
if (ruleClass == null) {
throw new AllocationConfigurationException("No rule class found for "
+ ruleName);
}
QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
rule.initializeFromXml(element);
return rule;
return getPlacementRule(ruleClass, element);
}
/**
* Build a simple queue placement policy from the allow-undeclared-pools and
* user-as-default-queue configuration options.
* Build a simple queue placement policy from the configuration options
* {@link FairSchedulerConfiguration#ALLOW_UNDECLARED_POOLS} and
* {@link FairSchedulerConfiguration#USER_AS_DEFAULT_QUEUE}.
* @param fs the reference to the scheduler needed in the rule on init.
*/
public static QueuePlacementPolicy fromConfiguration(Configuration conf,
Map<FSQueueType, Set<String>> configuredQueues) {
static void fromConfiguration(FairScheduler fs) {
LOG.debug("Creating base placement policy from config");
Configuration conf = fs.getConfig();
boolean create = conf.getBoolean(
FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
boolean userAsDefaultQueue = conf.getBoolean(
FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE);
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
rules.add(new QueuePlacementRule.Specified().initialize(create, null));
List<PlacementRule> newRules = new ArrayList<>();
List<Boolean> newTerminalState = new ArrayList<>();
Class<? extends PlacementRule> clazz =
RULES.get("specified").ruleClass;
newRules.add(getPlacementRule(clazz, create));
newTerminalState.add(false);
if (userAsDefaultQueue) {
rules.add(new QueuePlacementRule.User().initialize(create, null));
clazz = RULES.get("user").ruleClass;
newRules.add(getPlacementRule(clazz, create));
newTerminalState.add(create);
}
if (!userAsDefaultQueue || !create) {
rules.add(new QueuePlacementRule.Default().initialize(true, null));
clazz = RULES.get("default").ruleClass;
newRules.add(getPlacementRule(clazz, true));
newTerminalState.add(true);
}
try {
return new QueuePlacementPolicy(rules, configuredQueues, conf);
updateRuleSet(newRules, newTerminalState, fs);
} catch (AllocationConfigurationException ex) {
throw new RuntimeException("Should never hit exception when loading" +
"placement policy from conf", ex);
"placement policy from conf", ex);
}
}
/**
* Applies this rule to an app with the given requested queue and user/group
* information.
*
* @param requestedQueue
* The queue specified in the ApplicationSubmissionContext
* @param user
* The user submitting the app
* @return
* The name of the queue to assign the app to. Or null if the app should
* be rejected.
* @throws IOException
* If an exception is encountered while getting the user's groups
*/
public String assignAppToQueue(String requestedQueue, String user)
throws IOException {
for (QueuePlacementRule rule : rules) {
String queue = rule.assignAppToQueue(requestedQueue, user, groups,
configuredQueues);
if (queue == null || !queue.isEmpty()) {
return queue;
}
}
throw new IllegalStateException("Should have applied a rule before " +
"reaching here");
}
public List<QueuePlacementRule> getRules() {
return rules;
}
}

View File

@ -1,366 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import com.google.common.annotations.VisibleForTesting;
@Private
@Unstable
public abstract class QueuePlacementRule {
protected boolean create;
public static final Logger LOG =
LoggerFactory.getLogger(QueuePlacementRule.class.getName());
/**
* Initializes the rule with any arguments.
*
* @param args
* Additional attributes of the rule's xml element other than create.
*/
public QueuePlacementRule initialize(boolean create, Map<String, String> args) {
this.create = create;
return this;
}
/**
*
* @param requestedQueue
* The queue explicitly requested.
* @param user
* The user submitting the app.
* @param groups
* The groups of the user submitting the app.
* @param configuredQueues
* The queues specified in the scheduler configuration.
* @return
* The queue to place the app into. An empty string indicates that we should
* continue to the next rule, and null indicates that the app should be rejected.
*/
public String assignAppToQueue(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
throws IOException {
String queue = getQueueForApp(requestedQueue, user, groups,
configuredQueues);
if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue)
|| configuredQueues.get(FSQueueType.PARENT).contains(queue)) {
return queue;
} else {
return "";
}
}
public void initializeFromXml(Element el)
throws AllocationConfigurationException {
boolean create = true;
NamedNodeMap attributes = el.getAttributes();
Map<String, String> args = new HashMap<String, String>();
for (int i = 0; i < attributes.getLength(); i++) {
Node node = attributes.item(i);
String key = node.getNodeName();
String value = node.getNodeValue();
if (key.equals("create")) {
create = Boolean.parseBoolean(value);
} else {
args.put(key, value);
}
}
initialize(create, args);
}
/**
* Returns true if this rule never tells the policy to continue.
*/
public abstract boolean isTerminal();
/**
* Applies this rule to an app with the given requested queue and user/group
* information.
*
* @param requestedQueue
* The queue specified in the ApplicationSubmissionContext
* @param user
* The user submitting the app.
* @param groups
* The groups of the user submitting the app.
* @return
* The name of the queue to assign the app to, or null to empty string
* continue to the next rule.
*/
protected abstract String getQueueForApp(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
throws IOException;
/**
* Places apps in queues by username of the submitter
*/
public static class User extends QueuePlacementRule {
@Override
protected String getQueueForApp(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return "root." + cleanName(user);
}
@Override
public boolean isTerminal() {
return create;
}
}
/**
* Places apps in queues by primary group of the submitter
*/
public static class PrimaryGroup extends QueuePlacementRule {
@Override
protected String getQueueForApp(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
throws IOException {
final List<String> groupList = groups.getGroups(user);
if (groupList.isEmpty()) {
throw new IOException("No groups returned for user " + user);
}
return "root." + cleanName(groupList.get(0));
}
@Override
public boolean isTerminal() {
return create;
}
}
/**
* Places apps in queues by secondary group of the submitter
*
* Match will be made on first secondary group that exist in
* queues
*/
public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
@Override
protected String getQueueForApp(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
throws IOException {
List<String> groupNames = groups.getGroups(user);
for (int i = 1; i < groupNames.size(); i++) {
String group = cleanName(groupNames.get(i));
if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
|| configuredQueues.get(FSQueueType.PARENT).contains(
"root." + group)) {
return "root." + group;
}
}
return "";
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* Places apps in queues with name of the submitter under the queue
* returned by the nested rule.
*/
public static class NestedUserQueue extends QueuePlacementRule {
@VisibleForTesting
QueuePlacementRule nestedRule;
/**
* Parse xml and instantiate the nested rule
*/
@Override
public void initializeFromXml(Element el)
throws AllocationConfigurationException {
NodeList elements = el.getChildNodes();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
Element element = (Element) node;
if ("rule".equals(element.getTagName())) {
QueuePlacementRule rule = QueuePlacementPolicy
.createAndInitializeRule(node);
if (rule == null) {
throw new AllocationConfigurationException(
"Unable to create nested rule in nestedUserQueue rule");
}
this.nestedRule = rule;
break;
} else {
continue;
}
}
}
if (this.nestedRule == null) {
throw new AllocationConfigurationException(
"No nested rule specified in <nestedUserQueue> rule");
}
super.initializeFromXml(el);
}
@Override
protected String getQueueForApp(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
throws IOException {
// Apply the nested rule
String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
groups, configuredQueues);
if (queueName != null && queueName.length() != 0) {
if (!queueName.startsWith("root.")) {
queueName = "root." + queueName;
}
// Verify if the queue returned by the nested rule is an configured leaf queue,
// if yes then skip to next rule in the queue placement policy
if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
return "";
}
return queueName + "." + cleanName(user);
}
return queueName;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* Places apps in queues by requested queue of the submitter
*/
public static class Specified extends QueuePlacementRule {
@Override
protected String getQueueForApp(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
return "";
} else {
if (!requestedQueue.startsWith("root.")) {
requestedQueue = "root." + requestedQueue;
}
return requestedQueue;
}
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* Places apps in the specified default queue. If no default queue is
* specified the app is placed in root.default queue.
*/
public static class Default extends QueuePlacementRule {
@VisibleForTesting
String defaultQueueName;
@Override
public QueuePlacementRule initialize(boolean create,
Map<String, String> args) {
if (defaultQueueName == null) {
defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
}
return super.initialize(create, args);
}
@Override
public void initializeFromXml(Element el)
throws AllocationConfigurationException {
defaultQueueName = el.getAttribute("queue");
if (defaultQueueName != null && !defaultQueueName.isEmpty()) {
if (!defaultQueueName.startsWith("root.")) {
defaultQueueName = "root." + defaultQueueName;
}
} else {
defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
}
super.initializeFromXml(el);
}
@Override
protected String getQueueForApp(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return defaultQueueName;
}
@Override
public boolean isTerminal() {
return true;
}
}
/**
* Rejects all apps
*/
public static class Reject extends QueuePlacementRule {
@Override
public String assignAppToQueue(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return null;
}
@Override
protected String getQueueForApp(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
throw new UnsupportedOperationException();
}
@Override
public boolean isTerminal() {
return true;
}
}
/**
* Replace the periods in the username or groupname with "_dot_" and
* remove trailing and leading whitespace.
*/
protected String cleanName(String name) {
name = name.trim();
if (name.contains(".")) {
String converted = name.replaceAll("\\.", "_dot_");
LOG.warn("Name " + name + " is converted to " + converted
+ " when it is used as a queue name.");
return converted;
} else {
return name;
}
}
}

View File

@ -169,8 +169,7 @@ public static final class Builder {
private Set<String> nonPreemptableQueues = new HashSet<>();
// Remember all queue names so we can display them on web UI, etc.
// configuredQueues is segregated based on whether it is a leaf queue
// or a parent queue. This information is used for creating queues
// and also for making queue placement decisions(QueuePlacementRule.java).
// or a parent queue. This information is used for creating queues.
private Map<FSQueueType, Set<String>> configuredQueues = new HashMap<>();
Builder() {

View File

@ -18,32 +18,30 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static junit.framework.TestCase.assertTrue;
import static org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException.InvalidResourceType.GREATER_THEN_MAX_ALLOCATION;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Collections;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -51,39 +49,34 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Testing applications being retired from RM with fair scheduler.
*
* Testing RMAppManager application submission with fair scheduler.
*/
public class TestAppManagerWithFairScheduler extends AppManagerTestBase{
public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
private static final String TEST_FOLDER = "test-queues";
private static YarnConfiguration conf = new YarnConfiguration();
private PlacementManager placementMgr;
private TestRMAppManager rmAppManager;
private RMContext rmContext;
private static String allocFileName =
GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath();
@BeforeClass
public static void setup() throws IOException {
String allocFile =
GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath();
int queueMaxAllocation = 512;
PrintWriter out = new PrintWriter(new FileWriter(allocFile));
@Before
public void setup() throws IOException {
// Basic config with one queue (override in test if needed)
PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"queueA\">");
out.println(" <maxContainerAllocation>" + queueMaxAllocation
+ " mb 1 vcores" + "</maxContainerAllocation>");
out.println(" </queue>");
out.println(" <queue name=\"queueB\">");
out.println(" <queue name=\"test\">");
out.println(" </queue>");
out.println("</allocations>");
out.close();
@ -91,87 +84,205 @@ public static void setup() throws IOException {
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFileName);
placementMgr = mock(PlacementManager.class);
MockRM mockRM = new MockRM(conf);
rmContext = mockRM.getRMContext();
rmContext.setQueuePlacementManager(placementMgr);
ApplicationMasterService masterService = new ApplicationMasterService(
rmContext, rmContext.getScheduler());
rmAppManager = new TestRMAppManager(rmContext,
new ClientToAMTokenSecretManagerInRM(), rmContext.getScheduler(),
masterService, new ApplicationACLsManager(conf), conf);
}
@AfterClass
public static void teardown(){
@After
public void teardown(){
File allocFile = GenericTestUtils.getTestDir(TEST_FOLDER);
allocFile.delete();
}
@Test
public void testQueueSubmitWithHighQueueContainerSize()
throws YarnException {
throws YarnException, IOException {
int maxAlloc =
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB;
// scheduler config with a limited queue
PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"root\">");
out.println(" <queue name=\"limited\">");
out.println(" <maxContainerAllocation>" + maxAlloc + " mb 1 vcores");
out.println(" </maxContainerAllocation>");
out.println(" </queue>");
out.println(" <queue name=\"unlimited\">");
out.println(" </queue>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
rmContext.getScheduler().reinitialize(conf, rmContext);
ApplicationId appId = MockApps.newAppID(1);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
ApplicationSubmissionContext asContext =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
asContext.setApplicationId(appId);
asContext.setResource(resource);
asContext.setPriority(Priority.newInstance(0));
asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
asContext.setQueue("queueA");
QueueInfo mockDefaultQueueInfo = mock(QueueInfo.class);
// Setup a PlacementManager returns a new queue
PlacementManager placementMgr = mock(PlacementManager.class);
doAnswer(new Answer<ApplicationPlacementContext>() {
@Override
public ApplicationPlacementContext answer(InvocationOnMock invocation)
throws Throwable {
return new ApplicationPlacementContext("queueA");
}
}).when(placementMgr).placeApplication(
any(ApplicationSubmissionContext.class), matches("test1"));
doAnswer(new Answer<ApplicationPlacementContext>() {
@Override
public ApplicationPlacementContext answer(InvocationOnMock invocation)
throws Throwable {
return new ApplicationPlacementContext("queueB");
}
}).when(placementMgr).placeApplication(
any(ApplicationSubmissionContext.class), matches("test2"));
MockRM newMockRM = new MockRM(conf);
RMContext newMockRMContext = newMockRM.getRMContext();
newMockRMContext.setQueuePlacementManager(placementMgr);
ApplicationMasterService masterService = new ApplicationMasterService(
newMockRMContext, newMockRMContext.getScheduler());
TestRMAppManager newAppMonitor = new TestRMAppManager(newMockRMContext,
new ClientToAMTokenSecretManagerInRM(), newMockRMContext.getScheduler(),
masterService, new ApplicationACLsManager(conf), conf);
// only user test has permission to submit to 'test' queue
Resource res = Resources.createResource(maxAlloc + 1);
ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
// Submit to limited queue
when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("limited"));
try {
newAppMonitor.submitApplication(asContext, "test1");
rmAppManager.submitApplication(asContext, "test");
Assert.fail("Test should fail on too high allocation!");
} catch (InvalidResourceRequestException e) {
Assert.assertEquals(GREATER_THEN_MAX_ALLOCATION,
e.getInvalidResourceType());
}
// Should not throw exception
newAppMonitor.submitApplication(asContext, "test2");
// submit same app but now place it in the unlimited queue
when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("root.unlimited"));
rmAppManager.submitApplication(asContext, "test");
}
private static ContainerLaunchContext mockContainerLaunchContext(
RecordFactory recordFactory) {
ContainerLaunchContext amContainer = recordFactory.newRecordInstance(
ContainerLaunchContext.class);
amContainer
.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
return amContainer;
@Test
public void testQueueSubmitWithPermissionLimits()
throws YarnException, IOException {
conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"root\">");
out.println(" <aclSubmitApps> </aclSubmitApps>");
out.println(" <aclAdministerApps> </aclAdministerApps>");
out.println(" <queue name=\"noaccess\">");
out.println(" </queue>");
out.println(" <queue name=\"submitonly\">");
out.println(" <aclSubmitApps>test </aclSubmitApps>");
out.println(" <aclAdministerApps> </aclAdministerApps>");
out.println(" </queue>");
out.println(" <queue name=\"adminonly\">");
out.println(" <aclSubmitApps> </aclSubmitApps>");
out.println(" <aclAdministerApps>test </aclAdministerApps>");
out.println(" </queue>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
rmContext.getScheduler().reinitialize(conf, rmContext);
ApplicationId appId = MockApps.newAppID(1);
Resource res = Resources.createResource(1024, 1);
ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
// Submit to no access queue
when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("noaccess"));
try {
rmAppManager.submitApplication(asContext, "test");
Assert.fail("Test should have failed with access denied");
} catch (YarnException e) {
assertTrue("Access exception not found",
e.getCause() instanceof AccessControlException);
}
// Submit to submit access queue
when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("submitonly"));
rmAppManager.submitApplication(asContext, "test");
// Submit second app to admin access queue
appId = MockApps.newAppID(2);
asContext = createAppSubmitCtx(appId, res);
when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("adminonly"));
rmAppManager.submitApplication(asContext, "test");
}
@Test
public void testQueueSubmitWithRootPermission()
throws YarnException, IOException {
conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"root\">");
out.println(" <queue name=\"noaccess\">");
out.println(" <aclSubmitApps> </aclSubmitApps>");
out.println(" <aclAdministerApps> </aclAdministerApps>");
out.println(" </queue>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
rmContext.getScheduler().reinitialize(conf, rmContext);
ApplicationId appId = MockApps.newAppID(1);
Resource res = Resources.createResource(1024, 1);
ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
// Submit to noaccess queue should be allowed by root ACL
when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("noaccess"));
rmAppManager.submitApplication(asContext, "test");
}
@Test
public void testQueueSubmitWithAutoCreateQueue()
throws YarnException, IOException {
conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"root\">");
out.println(" <aclSubmitApps> </aclSubmitApps>");
out.println(" <aclAdministerApps> </aclAdministerApps>");
out.println(" <queue name=\"noaccess\" type=\"parent\">");
out.println(" </queue>");
out.println(" <queue name=\"submitonly\" type=\"parent\">");
out.println(" <aclSubmitApps>test </aclSubmitApps>");
out.println(" </queue>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
rmContext.getScheduler().reinitialize(conf, rmContext);
ApplicationId appId = MockApps.newAppID(1);
Resource res = Resources.createResource(1024, 1);
ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
// Submit to noaccess parent with non existent child queue
when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("root.noaccess.child"));
try {
rmAppManager.submitApplication(asContext, "test");
Assert.fail("Test should have failed with access denied");
} catch (YarnException e) {
assertTrue("Access exception not found",
e.getCause() instanceof AccessControlException);
}
// Submit to submitonly parent with non existent child queue
when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("root.submitonly.child"));
rmAppManager.submitApplication(asContext, "test");
}
private ApplicationSubmissionContext createAppSubmitCtx(ApplicationId appId,
Resource res) {
ApplicationSubmissionContext asContext =
Records.newRecord(ApplicationSubmissionContext.class);
asContext.setApplicationId(appId);
ResourceRequest resReg =
ResourceRequest.newInstance(Priority.newInstance(0),
ResourceRequest.ANY, res, 1);
asContext.setAMContainerResourceRequests(
Collections.singletonList(resReg));
asContext.setAMContainerSpec(mock(ContainerLaunchContext.class));
asContext.setQueue("default");
return asContext;
}
}

View File

@ -466,7 +466,7 @@ private void verifyInvalidQueueWithAcl() throws Exception {
if (conf.get(YarnConfiguration.RM_SCHEDULER)
.equals(FairScheduler.class.getName())) {
Assert.assertTrue(appReport.getDiagnostics()
.contains("Application rejected by queue placement policy"));
.contains("user owner application rejected by placement rules."));
} else {
Assert.assertTrue(appReport.getDiagnostics()
.contains("submitted by user owner to unknown queue: InvalidQueue"));

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -38,6 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -106,8 +108,14 @@ protected void testPlanFollower(boolean isMove) throws PlanningException,
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId_0 =
ApplicationAttemptId.newInstance(appId, 0);
AppAddedSchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
AppAddedSchedulerEvent addAppEvent;
if (scheduler instanceof FairScheduler) {
addAppEvent = new AppAddedSchedulerEvent(appId, q.getQueueName(),
user_0, new ApplicationPlacementContext("dedicated"));
} else {
addAppEvent = new AppAddedSchedulerEvent(appId, q.getQueueName(),
user_0);
}
scheduler.handle(addAppEvent);
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);

View File

@ -81,6 +81,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -978,14 +979,17 @@ public static void waitSchedulerApplicationAttemptStopped(
}
public static SchedulerApplication<SchedulerApplicationAttempt>
verifyAppAddedAndRemovedFromScheduler(
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
verifyAppAddedAndRemovedFromScheduler(
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>>
applications,
EventHandler<SchedulerEvent> handler, String queueName) {
ApplicationPlacementContext apc =
new ApplicationPlacementContext(queueName);
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(appId, queueName, "user");
new AppAddedSchedulerEvent(appId, queueName, "user", apc);
handler.handle(appAddedEvent);
SchedulerApplication<SchedulerApplicationAttempt> app =
applications.get(appId);

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -177,7 +178,11 @@ protected ApplicationAttemptId createSchedulingRequest(
Collection<ResourceRequest> requests, String queueId, String userId) {
ApplicationAttemptId id =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
// This fakes the placement which is not part of the scheduler anymore
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext(queueId);
scheduler.addApplication(id.getApplicationId(), queueId, userId, false,
placementCtx);
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if (scheduler.getSchedulerApplications()
@ -212,10 +217,15 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId,
String userId, List<ResourceRequest> ask) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
this.ATTEMPT_ID++);
scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
// This fakes the placement which is not part of the scheduler anymore
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext(queueId);
scheduler.addApplication(id.getApplicationId(), queueId, userId, false,
placementCtx);
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
if (scheduler.getSchedulerApplications().containsKey(
id.getApplicationId())) {
scheduler.addApplicationAttempt(id, false, false);
}
@ -298,8 +308,11 @@ private void addApplication(String queue, String user, ApplicationId appId) {
resourceManager.getRMContext().getRMApps().get(appId).handle(event);
event = new RMAppEvent(appId, RMAppEventType.APP_ACCEPTED);
resourceManager.getRMContext().getRMApps().get(appId).handle(event);
// This fakes the placement which is not part of the scheduler anymore
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext(queue);
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
appId, queue, user);
appId, queue, user, placementCtx);
scheduler.handle(appAddedEvent);
}

View File

@ -25,14 +25,23 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
@ -52,7 +61,12 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test loading the allocation file for the FairScheduler.
*/
public class TestAllocationFileLoaderService {
private static final String A_CUSTOM_RESOURCE = "a-custom-resource";
@ -64,10 +78,28 @@ public class TestAllocationFileLoaderService {
"test-queues").getAbsolutePath();
private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml";
private FairScheduler scheduler;
private Configuration conf;
@Before
public void setup() {
SystemClock clock = SystemClock.getInstance();
PlacementManager placementManager = new PlacementManager();
FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
RMContext rmContext = mock(RMContext.class);
when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
scheduler = mock(FairScheduler.class);
conf = new YarnConfiguration();
when(scheduler.getClock()).thenReturn(clock);
when(scheduler.getConf()).thenReturn(fsConf);
when(scheduler.getConfig()).thenReturn(conf);
when(scheduler.getRMContext()).thenReturn(rmContext);
}
@Test
public void testGetAllocationFileFromFileSystem()
throws IOException, URISyntaxException {
Configuration conf = new YarnConfiguration();
File baseDir =
new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
@ -83,7 +115,8 @@ public void testGetAllocationFileFromFileSystem()
fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(fsAllocPath, allocationFile.toString());
assertTrue(fs.exists(allocationFile));
@ -94,9 +127,9 @@ public void testGetAllocationFileFromFileSystem()
@Test (expected = UnsupportedFileSystemException.class)
public void testDenyGetAllocationFileFromUnsupportedFileSystem()
throws UnsupportedFileSystemException {
Configuration conf = new YarnConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.getAllocationFile(conf);
}
@ -104,12 +137,11 @@ public void testDenyGetAllocationFileFromUnsupportedFileSystem()
@Test
public void testGetAllocationFileFromClasspath() {
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
TEST_FAIRSCHED_XML);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService();
new AllocationFileLoaderService(scheduler);
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
assertTrue(fs.exists(allocationFile));
@ -135,11 +167,10 @@ public void testReload() throws Exception {
ControlledClock clock = new ControlledClock();
clock.setTime(0);
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(
clock);
clock, scheduler);
allocLoader.reloadIntervalMs = 5;
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
@ -148,10 +179,10 @@ public void testReload() throws Exception {
AllocationConfiguration allocConf = confHolder.allocConf;
// Verify conf
QueuePlacementPolicy policy = allocConf.getPlacementPolicy();
List<QueuePlacementRule> rules = policy.getRules();
List<PlacementRule> rules = scheduler.getRMContext()
.getQueuePlacementManager().getPlacementRules();
assertEquals(1, rules.size());
assertEquals(QueuePlacementRule.Default.class, rules.get(0).getClass());
assertEquals(DefaultPlacementRule.class, rules.get(0).getClass());
assertEquals(1, allocConf.getQueueMaxApps("root.queueA"));
assertEquals(2, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.size());
@ -160,6 +191,7 @@ public void testReload() throws Exception {
assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.queueB"));
// reset the conf so we can detect the reload
confHolder.allocConf = null;
// Modify file and advance the clock
@ -174,7 +206,6 @@ public void testReload() throws Exception {
out.println(" <rule name='nestedUserQueue' >");
out.println(" <rule name='primaryGroup' />");
out.println(" </rule>");
out.println(" <rule name='default' />");
out.println(" </queuePlacementPolicy>");
out.println("</allocations>");
out.close();
@ -189,15 +220,13 @@ public void testReload() throws Exception {
// Verify conf
allocConf = confHolder.allocConf;
policy = allocConf.getPlacementPolicy();
rules = policy.getRules();
assertEquals(3, rules.size());
assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
assertEquals(QueuePlacementRule.NestedUserQueue.class, rules.get(1)
.getClass());
assertEquals(QueuePlacementRule.PrimaryGroup.class,
((NestedUserQueue) (rules.get(1))).nestedRule.getClass());
assertEquals(QueuePlacementRule.Default.class, rules.get(2).getClass());
rules = scheduler.getRMContext().getQueuePlacementManager()
.getPlacementRules();
assertEquals(2, rules.size());
assertEquals(SpecifiedPlacementRule.class, rules.get(0).getClass());
assertEquals(UserPlacementRule.class, rules.get(1).getClass());
assertEquals(PrimaryGroupPlacementRule.class,
((FSPlacementRule)(rules.get(1))).getParentRule().getClass());
assertEquals(3, allocConf.getQueueMaxApps("root.queueB"));
assertEquals(1, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.size());
@ -207,11 +236,11 @@ public void testReload() throws Exception {
@Test
public void testAllocationFileParsing() throws Exception {
Configuration conf = new YarnConfiguration();
CustomResourceTypesConfigurationProvider.
initResourceTypes(A_CUSTOM_RESOURCE);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
AllocationFileWriter
.create()
@ -455,9 +484,9 @@ public void testAllocationFileParsing() throws Exception {
@Test
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
@ -569,7 +598,6 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
@Test
public void testSimplePlacementPolicyFromConf() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false);
@ -580,19 +608,20 @@ public void testSimplePlacementPolicyFromConf() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
List<QueuePlacementRule> rules = placementPolicy.getRules();
List<PlacementRule> rules = scheduler.getRMContext()
.getQueuePlacementManager().getPlacementRules();
assertEquals(2, rules.size());
assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
assertEquals(false, rules.get(0).create);
assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass());
assertEquals(SpecifiedPlacementRule.class, rules.get(0).getClass());
assertFalse("Create flag was not set to false",
((FSPlacementRule)rules.get(0)).getCreateFlag());
assertEquals(DefaultPlacementRule.class, rules.get(1).getClass());
}
/**
@ -601,7 +630,6 @@ public void testSimplePlacementPolicyFromConf() throws Exception {
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueAlongsideRoot() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -614,7 +642,8 @@ public void testQueueAlongsideRoot() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@ -627,7 +656,6 @@ public void testQueueAlongsideRoot() throws Exception {
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueNameContainingPeriods() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -638,7 +666,8 @@ public void testQueueNameContainingPeriods() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@ -651,7 +680,6 @@ public void testQueueNameContainingPeriods() throws Exception {
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueNameContainingOnlyWhitespace() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -662,7 +690,8 @@ public void testQueueNameContainingOnlyWhitespace() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@ -671,7 +700,6 @@ public void testQueueNameContainingOnlyWhitespace() throws Exception {
@Test
public void testParentTagWithReservation() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -684,7 +712,8 @@ public void testParentTagWithReservation() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@ -700,7 +729,6 @@ public void testParentTagWithReservation() throws Exception {
@Test
public void testParentWithReservation() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -715,7 +743,8 @@ public void testParentWithReservation() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@ -731,7 +760,6 @@ public void testParentWithReservation() throws Exception {
@Test
public void testParentTagWithChild() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -744,7 +772,8 @@ public void testParentTagWithChild() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@ -763,7 +792,6 @@ public void testParentTagWithChild() throws Exception {
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueNameContainingNBWhitespace() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new OutputStreamWriter(
@ -775,7 +803,8 @@ public void testQueueNameContainingNBWhitespace() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@ -787,17 +816,18 @@ public void testQueueNameContainingNBWhitespace() throws Exception {
*/
@Test (expected = AllocationConfigurationException.class)
public void testDefaultQueueSchedulingModeIsFIFO() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<defaultQueueSchedulingPolicy>fifo</defaultQueueSchedulingPolicy>");
out.println("<defaultQueueSchedulingPolicy>fifo" +
"</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@ -806,7 +836,6 @@ public void testDefaultQueueSchedulingModeIsFIFO() throws Exception {
@Test
public void testReservableQueue() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -823,7 +852,8 @@ public void testReservableQueue() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@ -845,11 +875,9 @@ public void testReservableQueue() throws Exception {
assertTrue(allocConf.getMoveOnExpiry(reservableQueueName));
assertEquals(ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW,
allocConf.getReservationWindow(reservableQueueName));
assertEquals(100, allocConf.getInstantaneousMaxCapacity
(reservableQueueName),
0.0001);
assertEquals(
"DummyAgentName",
assertEquals(100,
allocConf.getInstantaneousMaxCapacity(reservableQueueName), 0.0001);
assertEquals("DummyAgentName",
allocConf.getReservationAgent(reservableQueueName));
assertEquals(100, allocConf.getAverageCapacity(reservableQueueName), 0.001);
assertFalse(allocConf.getShowReservationAsQueues(reservableQueueName));
@ -865,12 +893,11 @@ public void testReservableQueue() throws Exception {
/**
* Verify that you can't have dynamic user queue and reservable queue on
* the same queue
* the same queue.
*/
@Test (expected = AllocationConfigurationException.class)
public void testReservableCannotBeCombinedWithDynamicUserQueue()
throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -883,7 +910,8 @@ public void testReservableCannotBeCombinedWithDynamicUserQueue()
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@ -891,7 +919,7 @@ public void testReservableCannotBeCombinedWithDynamicUserQueue()
}
private class ReloadListener implements AllocationFileLoaderService.Listener {
public AllocationConfiguration allocConf;
private AllocationConfiguration allocConf;
@Override
public void onReload(AllocationConfiguration info) {

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -47,7 +48,7 @@
import org.junit.Before;
import org.junit.Test;
/*
/**
* This class is to test the fair scheduler functionality of
* deciding the number of runnable application under various conditions.
*/
@ -78,7 +79,7 @@ public void testUserAsDefaultQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
createApplicationWithAMResource(appAttemptId, "default", "user1", null);
createApplicationWithAMResource(appAttemptId, "root.user1", "user1", null);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getNumRunnableApps());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
@ -110,9 +111,11 @@ public void testNotUserAsDefaultQueue() throws Exception {
@Test
public void testAppAdditionAndRemoval() throws Exception {
ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
ApplicationPlacementContext apc =
new ApplicationPlacementContext("user1");
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default",
"user1");
new AppAddedSchedulerEvent(attemptId.getApplicationId(), "user1",
"user1", apc);
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attemptAddedEvent =
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
@ -149,7 +152,7 @@ public void testPreemptionVariablesForQueueCreatedRuntime() throws Exception {
// User1 submits one application
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
createApplicationWithAMResource(appAttemptId, "default", "user1", null);
createApplicationWithAMResource(appAttemptId, "user1", "user1", null);
// The user1 queue should inherit the configurations from the root queue
FSLeafQueue userQueue =
@ -184,12 +187,14 @@ public void testDontAllowUndeclaredPools() throws Exception {
FSLeafQueue jerryQueue = queueManager.getLeafQueue("jerry", false);
FSLeafQueue defaultQueue = queueManager.getLeafQueue("default", false);
// NOTE: placement is not inside the scheduler anymore need to fake it here.
// The scheduling request contains the fake placing
// Should get put into jerry
createSchedulingRequest(1024, "jerry", "someuser");
assertEquals(1, jerryQueue.getNumRunnableApps());
// Should get forced into default
createSchedulingRequest(1024, "newqueue", "someuser");
createSchedulingRequest(1024, "default", "someuser");
assertEquals(1, jerryQueue.getNumRunnableApps());
assertEquals(1, defaultQueue.getNumRunnableApps());
@ -200,7 +205,7 @@ public void testDontAllowUndeclaredPools() throws Exception {
assertEquals(2, defaultQueue.getNumRunnableApps());
// Should get put into jerry because of user-as-default-queue
createSchedulingRequest(1024, "default", "jerry");
createSchedulingRequest(1024, "jerry", "jerry");
assertEquals(2, jerryQueue.getNumRunnableApps());
assertEquals(2, defaultQueue.getNumRunnableApps());
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
@ -113,14 +114,18 @@ public void testBasic() throws InterruptedException {
1, Resources.createResource(4096, 4), 1, host);
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
NodeUpdateSchedulerEvent nodeUpdateEvent =
new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdateEvent);
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(appAttemptId);
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("queue11");
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11",
"user11", false, placementCtx);
scheduler.addApplicationAttempt(appAttemptId, false, false);
List<ResourceRequest> ask = new ArrayList<>();
ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true));
@ -148,7 +153,8 @@ public void testSortedNodes() throws Exception {
scheduler.handle(nodeEvent2);
// available resource
Assert.assertEquals(scheduler.getClusterResource().getMemorySize(), 16 * 1024);
Assert.assertEquals(scheduler.getClusterResource().getMemorySize(),
16 * 1024);
Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
// send application request
@ -156,14 +162,17 @@ public void testSortedNodes() throws Exception {
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(appAttemptId);
scheduler.addApplication(appAttemptId.getApplicationId(),
"queue11", "user11", false);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("queue11");
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11",
"user11", false, placementCtx);
scheduler.addApplicationAttempt(appAttemptId, false, false);
List<ResourceRequest> ask = new ArrayList<>();
ResourceRequest request =
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
ask.add(request);
scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
scheduler.allocate(appAttemptId, ask, null, new ArrayList<>(), null, null,
NULL_UPDATE_REQUESTS);
triggerSchedulingAttempt();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
@ -174,10 +183,11 @@ public void testSortedNodes() throws Exception {
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
ask.clear();
ask.add(request);
scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
scheduler.allocate(appAttemptId, ask, null, new ArrayList<>(), null, null,
NULL_UPDATE_REQUESTS);
triggerSchedulingAttempt();
checkAppConsumption(app, Resources.createResource(2048,2));
checkAppConsumption(app, Resources.createResource(2048, 2));
// 2 containers should be assigned to 2 nodes
Set<NodeId> nodes = new HashSet<NodeId>();
@ -353,8 +363,10 @@ public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
id11 = createAppAttemptId(1, 1);
createMockRMApp(id11);
priority = Priority.newInstance(priorityValue);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
false);
false, placementCtx);
scheduler.addApplicationAttempt(id11, false, false);
fsAppAttempt = scheduler.getApplicationAttempt(id11);

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -298,8 +299,10 @@ public void testHeadroomWithBlackListedNodes() {
assertEquals(0, clusterUsage.getVirtualCores());
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
createMockRMApp(id11);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("default");
scheduler.addApplication(id11.getApplicationId(),
"default", "user1", false);
"default", "user1", false, placementCtx);
scheduler.addApplicationAttempt(id11, false, false);
assertNotNull(scheduler.getSchedulerApplications().get(id11.
getApplicationId()));

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.Before;
@ -29,23 +31,27 @@
public class TestFSParentQueue {
private FairSchedulerConfiguration conf;
private QueueManager queueManager;
@Before
public void setUp() throws Exception {
conf = new FairSchedulerConfiguration();
public void setUp() {
FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
RMContext rmContext = mock(RMContext.class);
SystemClock clock = SystemClock.getInstance();
PlacementManager placementManager = new PlacementManager();
FairScheduler scheduler = mock(FairScheduler.class);
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getRMContext()).thenReturn(rmContext);
when(scheduler.getConfig()).thenReturn(conf);
when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock);
when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
queueManager = new QueueManager(scheduler);
FSQueueMetrics.forQueue("root", null, true, conf);
queueManager.initialize(conf);
queueManager.initialize();
}
@Test

View File

@ -58,6 +58,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -85,7 +88,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -108,10 +110,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -426,8 +425,11 @@ public void testNormalizationUsingQueueMaximumAllocation()
private void allocateAppAttempt(String queueName, int id, int memorySize) {
ApplicationAttemptId id11 = createAppAttemptId(id, id);
createMockRMApp(id11);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext(queueName);
scheduler.addApplication(id11.getApplicationId(), queueName, "user1",
false);
false, placementCtx);
scheduler.addApplicationAttempt(id11, false, false);
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 =
@ -1404,8 +1406,10 @@ public void testRackLocalAppReservationThreshold() throws Exception {
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(attemptId);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("queue1");
scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
false);
false, placementCtx);
scheduler.addApplicationAttempt(attemptId, false, false);
List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false));
@ -1814,10 +1818,12 @@ public void testEmptyQueueName() throws Exception {
// only default queue
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
// submit app with empty queue
// Submit app with empty queue
// Submit fails before we reach the placement check.
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "", "user1");
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "",
"user1");
scheduler.handle(appAddedEvent);
// submission rejected
@ -1835,20 +1841,24 @@ public void testQueueuNameWithPeriods() throws Exception {
// only default queue
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
// submit app with queue name (.A)
// Submit app with queue name (.A)
// Submit fails before we reach the placement check.
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
AppAddedSchedulerEvent appAddedEvent1 =
new AppAddedSchedulerEvent(appAttemptId1.getApplicationId(), ".A", "user1");
new AppAddedSchedulerEvent(appAttemptId1.getApplicationId(), ".A",
"user1");
scheduler.handle(appAddedEvent1);
// submission rejected
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
assertNull(scheduler.getSchedulerApp(appAttemptId1));
assertEquals(0, resourceManager.getRMContext().getRMApps().size());
// submit app with queue name (A.)
// Submit app with queue name (A.)
// Submit fails before we reach the placement check.
ApplicationAttemptId appAttemptId2 = createAppAttemptId(2, 1);
AppAddedSchedulerEvent appAddedEvent2 =
new AppAddedSchedulerEvent(appAttemptId2.getApplicationId(), "A.", "user1");
new AppAddedSchedulerEvent(appAttemptId2.getApplicationId(), "A.",
"user1");
scheduler.handle(appAddedEvent2);
// submission rejected
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
@ -1856,9 +1866,11 @@ public void testQueueuNameWithPeriods() throws Exception {
assertEquals(0, resourceManager.getRMContext().getRMApps().size());
// submit app with queue name (A.B)
// Submit does not fail we must have a placement context.
ApplicationAttemptId appAttemptId3 = createAppAttemptId(3, 1);
AppAddedSchedulerEvent appAddedEvent3 =
new AppAddedSchedulerEvent(appAttemptId3.getApplicationId(), "A.B", "user1");
new AppAddedSchedulerEvent(appAttemptId3.getApplicationId(), "A.B",
"user1", new ApplicationPlacementContext("A.B"));
scheduler.handle(appAddedEvent3);
// submission accepted
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
@ -1866,123 +1878,6 @@ public void testQueueuNameWithPeriods() throws Exception {
assertEquals(0, resourceManager.getRMContext().getRMApps().size());
}
@Test
public void testAssignToQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
FSLeafQueue queue1 = scheduler.assignToQueue(rmApp1, "default", "asterix");
FSLeafQueue queue2 = scheduler.assignToQueue(rmApp2, "notdefault", "obelix");
// assert FSLeafQueue's name is the correct name is the one set in the RMApp
assertEquals(rmApp1.getQueue(), queue1.getName());
assertEquals("root.asterix", rmApp1.getQueue());
assertEquals(rmApp2.getQueue(), queue2.getName());
assertEquals("root.notdefault", rmApp2.getQueue());
}
@Test
public void testAssignToBadDefaultQueue() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queuePlacementPolicy>");
out.println("<rule name=\"specified\" create=\"false\" />");
out.println("<rule name=\"default\" create=\"false\" />");
out.println("</queuePlacementPolicy>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
try {
FSLeafQueue queue1 = scheduler.assignToQueue(rmApp1, "default",
"asterix");
} catch (IllegalStateException ise) {
fail("Bad queue placement policy terminal rule should not throw " +
"exception ");
}
}
@Test
public void testAssignToNonLeafQueueReturnsNull() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true);
scheduler.getQueueManager().getLeafQueue("root.child2", true);
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
// Trying to assign to non leaf queue would return null
assertNull(scheduler.assignToQueue(rmApp1, "root.child1", "tintin"));
assertNotNull(scheduler.assignToQueue(rmApp2, "root.child2", "snowy"));
}
@Test
public void testQueuePlacementWithPolicy() throws Exception {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appId;
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
rules.add(new QueuePlacementRule.User().initialize(false, null));
rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
rules.add(new QueuePlacementRule.SecondaryGroupExistingQueue().initialize(false, null));
rules.add(new QueuePlacementRule.Default().initialize(true, null));
Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
"root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
Map<FSQueueType, Set<String>> configuredQueues = new HashMap<FSQueueType, Set<String>>();
configuredQueues.put(FSQueueType.LEAF, queues);
configuredQueues.put(FSQueueType.PARENT, new HashSet<String>());
scheduler.getAllocationConfiguration().placementPolicy =
new QueuePlacementPolicy(rules, configuredQueues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user1");
assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user3");
assertEquals("root.user3group", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user4");
assertEquals("root.user4subgroup1", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user5");
assertEquals("root.user5subgroup2", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "otheruser");
assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName());
// test without specified as first rule
rules = new ArrayList<QueuePlacementRule>();
rules.add(new QueuePlacementRule.User().initialize(false, null));
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
rules.add(new QueuePlacementRule.Default().initialize(true, null));
scheduler.getAllocationConfiguration().placementPolicy =
new QueuePlacementPolicy(rules, configuredQueues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "somequeue", "otheruser");
assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "otheruser");
assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName());
}
@Test
public void testFairShareWithMinAlloc() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@ -2027,38 +1922,6 @@ else if (p.getName().equals("root.queueB")) {
}
}
}
@Test
public void testNestedUserQueue() throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"user1group\" type=\"parent\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queuePlacementPolicy>");
out.println("<rule name=\"specified\" create=\"false\" />");
out.println("<rule name=\"nestedUserQueue\">");
out.println(" <rule name=\"primaryGroup\" create=\"false\" />");
out.println("</rule>");
out.println("<rule name=\"default\" />");
out.println("</queuePlacementPolicy>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default",
"user1");
assertEquals("root.user1group.user1", user1Leaf.getName());
}
@Test
public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
@ -2228,7 +2091,7 @@ public void testSteadyFairShareWithQueueCreatedRuntime() throws Exception {
// Submit one application
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
createApplicationWithAMResource(appAttemptId1, "default", "user1", null);
createApplicationWithAMResource(appAttemptId1, "user1", "user1", null);
assertEquals(3072, scheduler.getQueueManager()
.getLeafQueue("default", false).getSteadyFairShare().getMemorySize());
assertEquals(3072, scheduler.getQueueManager()
@ -2249,8 +2112,10 @@ public void testQueueDemandCalculation() throws Exception {
// First ask, queue1 requests 1 large (minReqSize * 2).
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
createMockRMApp(id11);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(id11.getApplicationId(),
"root.queue1", "user1", false);
"root.queue1", "user1", false, placementCtx);
scheduler.addApplicationAttempt(id11, false, false);
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 = createResourceRequest(minReqSize * 2,
@ -2262,8 +2127,9 @@ public void testQueueDemandCalculation() throws Exception {
// Second ask, queue2 requests 1 large.
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
createMockRMApp(id21);
placementCtx = new ApplicationPlacementContext("root.queue2");
scheduler.addApplication(id21.getApplicationId(),
"root.queue2", "user1", false);
"root.queue2", "user1", false, placementCtx);
scheduler.addApplicationAttempt(id21, false, false);
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ResourceRequest request2 = createResourceRequest(2 * minReqSize,
@ -2279,7 +2145,7 @@ public void testQueueDemandCalculation() throws Exception {
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
createMockRMApp(id22);
scheduler.addApplication(id22.getApplicationId(),
"root.queue2", "user1", false);
"root.queue2", "user1", false, placementCtx);
scheduler.addApplicationAttempt(id22, false, false);
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
ResourceRequest request4 = createResourceRequest(minReqSize,
@ -2886,8 +2752,10 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(attemptId);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("queue1");
scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
false);
false, placementCtx);
scheduler.addApplicationAttempt(attemptId, false, false);
// 1 request with 2 nodes on the same rack. another request with 1 node on
@ -2900,7 +2768,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
scheduler.allocate(attemptId, asks, null, new ArrayList<ContainerId>(), null,
scheduler.allocate(attemptId, asks, null, new ArrayList<>(), null,
null, NULL_UPDATE_REQUESTS);
// node 1 checks in
@ -3202,10 +3070,11 @@ public void testNotAllowSubmitApplication() throws Exception {
submissionContext.setApplicationId(applicationId);
submissionContext.setAMContainerSpec(clc);
RMApp application =
new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user,
queue, submissionContext, scheduler, masterService,
new RMAppImpl(applicationId, resourceManager.getRMContext(), conf,
name, user, queue, submissionContext, scheduler, masterService,
System.currentTimeMillis(), "YARN", null, null);
resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application);
resourceManager.getRMContext().getRMApps().
putIfAbsent(applicationId, application);
application.handle(new RMAppEvent(applicationId, RMAppEventType.START));
final int MAX_TRIES=20;
@ -3222,16 +3091,22 @@ public void testNotAllowSubmitApplication() throws Exception {
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
scheduler.addApplication(attId.getApplicationId(), queue, user, false);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext(queue);
scheduler.addApplication(attId.getApplicationId(), queue, user, false,
placementCtx);
numTries = 0;
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
try {
Thread.sleep(100);
} catch (InterruptedException ex) {ex.printStackTrace();}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
numTries++;
}
assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
assertEquals(FinalApplicationStatus.FAILED,
application.getFinalApplicationStatus());
}
@Test
@ -3845,7 +3720,7 @@ public void testQueueMaxAMShare() throws Exception {
assertEquals("Queue queue1's fair share should be 0", 0, queue1
.getFairShare().getMemorySize());
createSchedulingRequest(1 * 1024, "root.default", "user1");
createSchedulingRequest(1 * 1024, "default", "user1");
scheduler.update();
scheduler.handle(updateEvent);
@ -4559,8 +4434,10 @@ public void testSchedulingOnRemovedNode() throws Exception {
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
createMockRMApp(id11);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
false);
false, placementCtx);
scheduler.addApplicationAttempt(id11, false, false);
List<ResourceRequest> ask1 = new ArrayList<>();
@ -4573,7 +4450,7 @@ public void testSchedulingOnRemovedNode() throws Exception {
String hostName = "127.0.0.1";
RMNode node1 = MockNodes.newNodeInfo(1,
Resources.createResource(8 * 1024, 8), 1, hostName);
Resources.createResource(8 * 1024, 8), 1, hostName);
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@ -4611,12 +4488,12 @@ public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured()
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
List<QueuePlacementRule> rules =
scheduler.allocConf.placementPolicy.getRules();
List<PlacementRule> rules = scheduler.getRMContext()
.getQueuePlacementManager().getPlacementRules();
for (QueuePlacementRule rule : rules) {
if (rule instanceof Default) {
Default defaultRule = (Default) rule;
for (PlacementRule rule : rules) {
if (rule instanceof DefaultPlacementRule) {
DefaultPlacementRule defaultRule = (DefaultPlacementRule) rule;
assertNotNull(defaultRule.defaultQueueName);
}
}
@ -4686,7 +4563,7 @@ public void testGetAppsInQueue() throws Exception {
ApplicationAttemptId appAttId2 =
createSchedulingRequest(1024, 1, "queue1.subqueue2", "user1");
ApplicationAttemptId appAttId3 =
createSchedulingRequest(1024, 1, "default", "user1");
createSchedulingRequest(1024, 1, "user1", "user1");
List<ApplicationAttemptId> apps =
scheduler.getAppsInQueue("queue1.subqueue1");
@ -4888,11 +4765,13 @@ public void testDoubleRemoval() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
// The placement rule will add the app to the user based queue but the
// The placement rule should add it to the user based queue but the
// passed in queue must exist.
ApplicationPlacementContext apc =
new ApplicationPlacementContext(testUser);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(attemptId.getApplicationId(), testUser,
testUser);
testUser, apc);
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attemptAddedEvent =
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
@ -4936,9 +4815,11 @@ public void testMoveAfterRemoval() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
ApplicationPlacementContext apc =
new ApplicationPlacementContext(testUser);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(attemptId.getApplicationId(), testUser,
testUser);
testUser, apc);
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attemptAddedEvent =
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
@ -4990,8 +4871,10 @@ public void testQueueNameWithTrailingSpace() throws Exception {
// submit app with queue name "A"
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
ApplicationPlacementContext apc =
new ApplicationPlacementContext("A");
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
appAttemptId1.getApplicationId(), "A", "user1");
appAttemptId1.getApplicationId(), "A", "user1", apc);
scheduler.handle(appAddedEvent1);
// submission accepted
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
@ -5008,9 +4891,15 @@ public void testQueueNameWithTrailingSpace() throws Exception {
// submit app with queue name "A "
ApplicationAttemptId appAttemptId2 = createAppAttemptId(2, 1);
apc = new ApplicationPlacementContext("A ");
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
appAttemptId2.getApplicationId(), "A ", "user1");
scheduler.handle(appAddedEvent2);
appAttemptId2.getApplicationId(), "A ", "user1", apc);
try {
scheduler.handle(appAddedEvent2);
Assert.fail("Submit should have failed with InvalidQueueNameException");
} catch (InvalidQueueNameException iqne) {
// expected ignore: rules should have filtered this out
}
// submission rejected
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
assertNull(scheduler.getSchedulerApplications().get(appAttemptId2.
@ -5019,8 +4908,9 @@ public void testQueueNameWithTrailingSpace() throws Exception {
// submit app with queue name "B.C"
ApplicationAttemptId appAttemptId3 = createAppAttemptId(3, 1);
apc = new ApplicationPlacementContext("B.C");
AppAddedSchedulerEvent appAddedEvent3 = new AppAddedSchedulerEvent(
appAttemptId3.getApplicationId(), "B.C", "user1");
appAttemptId3.getApplicationId(), "B.C", "user1", apc);
scheduler.handle(appAddedEvent3);
// submission accepted
assertEquals(3, scheduler.getQueueManager().getLeafQueues().size());
@ -5037,9 +4927,14 @@ public void testQueueNameWithTrailingSpace() throws Exception {
// submit app with queue name "A\u00a0" (non-breaking space)
ApplicationAttemptId appAttemptId4 = createAppAttemptId(4, 1);
apc = new ApplicationPlacementContext("A\u00a0");
AppAddedSchedulerEvent appAddedEvent4 = new AppAddedSchedulerEvent(
appAttemptId4.getApplicationId(), "A\u00a0", "user1");
scheduler.handle(appAddedEvent4);
appAttemptId4.getApplicationId(), "A\u00a0", "user1", apc);
try {
scheduler.handle(appAddedEvent4);
} catch (InvalidQueueNameException iqne) {
// expected ignore: rules should have filtered this out
}
// submission rejected
assertEquals(3, scheduler.getQueueManager().getLeafQueues().size());
assertNull(scheduler.getSchedulerApplications().get(appAttemptId4.
@ -5075,17 +4970,17 @@ public void testUserAsDefaultQueueWithLeadingTrailingSpaceUserName()
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
createApplicationWithAMResource(appAttemptId, "default", " user1", null);
ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
createApplicationWithAMResource(attId1, "root.user1", " user1", null);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getNumRunnableApps());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
.getNumRunnableApps());
assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
.get(appAttemptId.getApplicationId()).getQueue());
.get(attId1.getApplicationId()).getQueue());
ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
createApplicationWithAMResource(attId2, "default", "user1 ", null);
createApplicationWithAMResource(attId2, "root.user1", "user1 ", null);
assertEquals(2, scheduler.getQueueManager().getLeafQueue("user1", true)
.getNumRunnableApps());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
@ -5094,7 +4989,7 @@ public void testUserAsDefaultQueueWithLeadingTrailingSpaceUserName()
.get(attId2.getApplicationId()).getQueue());
ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
createApplicationWithAMResource(attId3, "default", "user1", null);
createApplicationWithAMResource(attId3, "root.user1", "user1", null);
assertEquals(3, scheduler.getQueueManager().getLeafQueue("user1", true)
.getNumRunnableApps());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
@ -5526,8 +5421,10 @@ public void testCompletedContainerOnRemovedNode() throws IOException {
// Create application attempt
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
createMockRMApp(appAttemptId);
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(appAttemptId.getApplicationId(), "root.queue1",
"user1", false);
"user1", false, placementCtx);
scheduler.addApplicationAttempt(appAttemptId, false, false);
// Create container request that goes to a specific node.
@ -5612,7 +5509,7 @@ private void testAppRejectedToQueueWithZeroCapacityOfResource(String resource)
ResourceRequest amReqs = ResourceRequest.newBuilder()
.capability(Resource.newInstance(5 * GB, 3)).build();
createApplicationWithAMResource(appAttemptId1, "queueA", "user1",
createApplicationWithAMResource(appAttemptId1, "root.queueA", "user1",
Resource.newInstance(GB, 1), Lists.newArrayList(amReqs));
scheduler.update();

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.After;
import org.junit.Before;
@ -33,6 +35,8 @@
import static org.apache.hadoop.yarn.util.resource.ResourceUtils.MAXIMUM_ALLOCATION;
import static org.apache.hadoop.yarn.util.resource.ResourceUtils.UNITS;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestFairSchedulerWithMultiResourceTypes
extends FairSchedulerTestBase {
@ -44,6 +48,11 @@ public void setUp() throws IOException {
scheduler = new FairScheduler();
conf = createConfiguration();
initResourceTypes(conf);
// since this runs outside of the normal context we need to set one
RMContext rmContext = mock(RMContext.class);
PlacementManager placementManager = new PlacementManager();
when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
scheduler.setRMContext(rmContext);
}
@After

View File

@ -27,10 +27,10 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.Before;
@ -46,26 +46,27 @@ public class TestMaxRunningAppsEnforcer {
private FairScheduler scheduler;
@Before
public void setup() throws Exception {
Configuration conf = new Configuration();
public void setup() {
FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
PlacementManager placementManager = new PlacementManager();
rmContext = mock(RMContext.class);
when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
when(rmContext.getEpoch()).thenReturn(0L);
clock = new ControlledClock();
scheduler = mock(FairScheduler.class);
when(scheduler.getConf()).thenReturn(
new FairSchedulerConfiguration(conf));
when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getConfig()).thenReturn(conf);
when(scheduler.getClock()).thenReturn(clock);
AllocationConfiguration allocConf = new AllocationConfiguration(
conf);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
when(scheduler.getRMContext()).thenReturn(rmContext);
AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
queueManager = new QueueManager(scheduler);
queueManager.initialize(conf);
queueManager.initialize();
userMaxApps = allocConf.userMaxApps;
maxAppsEnforcer = new MaxRunningAppsEnforcer(scheduler);
appNum = 0;
rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(0L);
}
private FSAppAttempt addApp(FSLeafQueue queue, String user) {

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -37,35 +38,43 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
/**
* Test the {@link FairScheduler} queue manager correct queue hierarchies
* management (create, delete and type changes).
*/
public class TestQueueManager {
private FairSchedulerConfiguration conf;
private QueueManager queueManager;
private FairScheduler scheduler;
@Before
public void setUp() throws Exception {
conf = new FairSchedulerConfiguration();
scheduler = mock(FairScheduler.class);
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
@Before
public void setUp() {
PlacementManager placementManager = new PlacementManager();
FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
RMContext rmContext = mock(RMContext.class);
when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
SystemClock clock = SystemClock.getInstance();
scheduler = mock(FairScheduler.class);
when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getConfig()).thenReturn(conf);
when(scheduler.getRMContext()).thenReturn(rmContext);
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
when(scheduler.getClock()).thenReturn(clock);
AllocationConfiguration allocConf =
new AllocationConfiguration(scheduler);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
// Set up some queues to test default child max resource inheritance
allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test");
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.test.childA");
allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test.childB");
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock);
queueManager = new QueueManager(scheduler);
FSQueueMetrics.forQueue("root", null, true, conf);
queueManager.initialize(conf);
queueManager.initialize();
queueManager.updateAllocationConfiguration(allocConf);
}
@ -118,7 +127,8 @@ public void testReloadTurnsLeafQueueIntoParent() {
*/
@Test
public void testReloadTurnsLeafToParentWithNoLeaf() {
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
AllocationConfiguration allocConf =
new AllocationConfiguration(scheduler);
// Create a leaf queue1
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.queue1");
queueManager.updateAllocationConfiguration(allocConf);
@ -130,7 +140,7 @@ public void testReloadTurnsLeafToParentWithNoLeaf() {
FSLeafQueue q1 = queueManager.getLeafQueue("queue1", false);
ApplicationId appId = ApplicationId.newInstance(0, 0);
q1.addAssignedApp(appId);
allocConf = new AllocationConfiguration(conf);
allocConf = new AllocationConfiguration(scheduler);
allocConf.configuredQueues.get(FSQueueType.PARENT)
.add("root.queue1");
@ -169,7 +179,8 @@ public void testCheckQueueNodeName() {
private void updateConfiguredLeafQueues(QueueManager queueMgr,
String... confLeafQueues) {
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
AllocationConfiguration allocConf =
new AllocationConfiguration(scheduler);
allocConf.configuredQueues.get(FSQueueType.LEAF)
.addAll(Sets.newHashSet(confLeafQueues));
queueMgr.updateAllocationConfiguration(allocConf);

View File

@ -18,44 +18,82 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.nio.charset.StandardCharsets;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
/**
* Tests for the queue placement policy for the {@link FairScheduler}.
*/
public class TestQueuePlacementPolicy {
private final static Configuration conf = new Configuration();
private Map<FSQueueType, Set<String>> configuredQueues;
private final static FairSchedulerConfiguration CONF =
new FairSchedulerConfiguration();
// Base setup needed, policy is an intermediate object
private PlacementManager placementManager;
private FairScheduler scheduler;
private QueueManager queueManager;
// Locals used in each assignment
private ApplicationSubmissionContext asc;
private ApplicationPlacementContext context;
@BeforeClass
public static void setup() {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
CONF.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
}
@Before
public void initTest() {
configuredQueues = new HashMap<FSQueueType, Set<String>>();
for (FSQueueType type : FSQueueType.values()) {
configuredQueues.put(type, new HashSet<String>());
}
SystemClock clock = SystemClock.getInstance();
RMContext rmContext = mock(RMContext.class);
placementManager = new PlacementManager();
scheduler = mock(FairScheduler.class);
when(scheduler.getClock()).thenReturn(clock);
when(scheduler.getRMContext()).thenReturn(rmContext);
when(scheduler.getConfig()).thenReturn(CONF);
when(scheduler.getConf()).thenReturn(CONF);
when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
queueManager = new QueueManager(scheduler);
queueManager.initialize();
when(scheduler.getQueueManager()).thenReturn(queueManager);
}
@After
public void cleanTest() {
placementManager = null;
queueManager = null;
scheduler = null;
}
@Test
@ -65,15 +103,18 @@ public void testSpecifiedUserPolicy() throws Exception {
sb.append(" <rule name='specified' />");
sb.append(" <rule name='user' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.specifiedq",
policy.assignAppToQueue("specifiedq", "someuser"));
assertEquals("root.someuser",
policy.assignAppToQueue("default", "someuser"));
assertEquals("root.otheruser",
policy.assignAppToQueue("default", "otheruser"));
createPolicy(sb.toString());
asc = newAppSubmissionContext("specifiedq");
context = placementManager.placeApplication(asc, "someuser");
assertEquals("root.specifiedq", context.getQueue());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "someuser");
assertEquals("root.someuser", context.getQueue());
context = placementManager.placeApplication(asc, "otheruser");
assertEquals("root.otheruser", context.getQueue());
}
@Test
public void testNoCreate() throws Exception {
StringBuffer sb = new StringBuffer();
@ -82,15 +123,24 @@ public void testNoCreate() throws Exception {
sb.append(" <rule name='user' create=\"false\" />");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
configuredQueues.get(FSQueueType.LEAF).add("root.someuser");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "otheruser"));
assertEquals("root.default", policy.assignAppToQueue("default", "otheruser"));
createPolicy(sb.toString());
createQueue(FSQueueType.LEAF, "root.someuser");
asc = newAppSubmissionContext("specifiedq");
context = placementManager.placeApplication(asc, "someuser");
assertEquals("root.specifiedq", context.getQueue());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "someuser");
assertEquals("root.someuser", context.getQueue());
asc = newAppSubmissionContext("specifiedq");
context = placementManager.placeApplication(asc, "otheruser");
assertEquals("root.specifiedq", context.getQueue());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "otheruser");
assertEquals("root.default", context.getQueue());
}
@Test
public void testSpecifiedThenReject() throws Exception {
StringBuffer sb = new StringBuffer();
@ -98,94 +148,173 @@ public void testSpecifiedThenReject() throws Exception {
sb.append(" <rule name='specified' />");
sb.append(" <rule name='reject' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.specifiedq",
policy.assignAppToQueue("specifiedq", "someuser"));
assertEquals(null, policy.assignAppToQueue("default", "someuser"));
createPolicy(sb.toString());
asc = newAppSubmissionContext("specifiedq");
context = placementManager.placeApplication(asc, "someuser");
assertEquals("root.specifiedq", context.getQueue());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "someuser");
assertNull("Assignment should have been rejected and was not", context);
}
@Test (expected = AllocationConfigurationException.class)
public void testOmittedTerminalRule() throws Exception {
@Test
public void testOmittedTerminalRule() {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='user' create=\"false\" />");
sb.append("</queuePlacementPolicy>");
parse(sb.toString());
assertIfExceptionThrown(sb);
}
@Test (expected = AllocationConfigurationException.class)
public void testTerminalRuleInMiddle() throws Exception {
@Test
public void testTerminalRuleInMiddle() {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='default' />");
sb.append(" <rule name='user' />");
sb.append("</queuePlacementPolicy>");
parse(sb.toString());
assertIfExceptionThrown(sb);
}
@Test
public void testTerminals() throws Exception {
// Should make it through without an exception
public void testTerminals() {
// The default rule is no longer considered terminal when the create flag
// is false. The throw now happens when configuring not when assigning the
// application
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='secondaryGroupExistingQueue' create='true'/>");
sb.append(" <rule name='default' queue='otherdefault' create='false'/>");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
try {
policy.assignAppToQueue("root.otherdefault", "user1");
fail("Expect exception from having default rule with create=\'false\'");
} catch (IllegalStateException se) {
}
assertIfExceptionThrown(sb);
}
@Test
public void testDefaultRuleWithQueueAttribute() throws Exception {
// This test covers the use case where we would like default rule
// to point to a different queue by default rather than root.default
configuredQueues.get(FSQueueType.LEAF).add("root.someDefaultQueue");
createQueue(FSQueueType.LEAF, "root.someDefaultQueue");
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' create='false' />");
sb.append(" <rule name='default' queue='root.someDefaultQueue'/>");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.someDefaultQueue",
policy.assignAppToQueue("root.default", "user1"));
createPolicy(sb.toString());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "user1");
assertEquals("root.someDefaultQueue", context.getQueue());
}
@Test
public void testNestedUserQueueParsingErrors() {
// No nested rule specified in hierarchical user queue
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='nestedUserQueue'/>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
assertIfExceptionThrown(sb);
// Specified nested rule is not a QueuePlacementRule
// Specified nested rule is not a FSPlacementRule
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='unknownRule'/>");
sb.append(" <rule name='unknownRule'/>");
sb.append(" </rule>");
sb.append("</queuePlacementPolicy>");
assertIfExceptionThrown(sb);
// Parent rule is rule that cannot be one: reject or nestedUserQueue
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='reject'/>");
sb.append(" </rule>");
sb.append("</queuePlacementPolicy>");
assertIfExceptionThrown(sb);
// If the parent rule does not have the create flag the nested rule is not
// terminal
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup' create='false'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
assertIfExceptionThrown(sb);
}
@Test
public void testMultipleParentRules() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup'/>");
sb.append(" <rule name='default'/>");
sb.append(" </rule>");
sb.append("</queuePlacementPolicy>");
createPolicy(sb.toString());
PlacementRule nested = placementManager.getPlacementRules().get(0);
if (nested instanceof UserPlacementRule) {
PlacementRule parent = ((FSPlacementRule)nested).getParentRule();
assertTrue("Nested rule should have been Default rule",
parent instanceof DefaultPlacementRule);
} else {
fail("Policy parsing failed: rule with multiple parents not set");
}
}
@Test
public void testBrokenRules() throws Exception {
// broken rule should fail configuring
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule />");
sb.append("</queuePlacementPolicy>");
assertIfExceptionThrown(sb);
// policy without rules ignoring policy
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <notarule />");
sb.append("</queuePlacementPolicy>");
createPolicy(sb.toString());
// broken rule should fail configuring
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='user'>");
sb.append(" <rule />");
sb.append(" </rule>");
sb.append("</queuePlacementPolicy>");
assertIfExceptionThrown(sb);
// parent rule not set to something known: no parent rule is required
// required case is only for nestedUserQueue tested earlier
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='user'>");
sb.append(" <notarule />");
sb.append(" </rule>");
sb.append("</queuePlacementPolicy>");
createPolicy(sb.toString());
}
private void assertIfExceptionThrown(StringBuffer sb) {
Throwable th = null;
try {
parse(sb.toString());
createPolicy(sb.toString());
} catch (Exception e) {
th = e;
}
@ -193,6 +322,17 @@ private void assertIfExceptionThrown(StringBuffer sb) {
assertTrue(th instanceof AllocationConfigurationException);
}
private void assertIfExceptionThrown(String user) {
Throwable th = null;
try {
placementManager.placeApplication(asc, user);
} catch (Exception e) {
th = e;
}
assertTrue(th instanceof YarnException);
}
@Test
public void testNestedUserQueueParsing() throws Exception {
StringBuffer sb = new StringBuffer();
@ -201,17 +341,9 @@ public void testNestedUserQueueParsing() throws Exception {
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
Throwable th = null;
try {
parse(sb.toString());
} catch (Exception e) {
th = e;
}
assertNull(th);
createPolicy(sb.toString());
}
@Test
@ -222,24 +354,26 @@ public void testNestedUserQueuePrimaryGroup() throws Exception {
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
// User queue would be created under primary group queue
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.user1group.user1",
policy.assignAppToQueue("root.default", "user1"));
createPolicy(sb.toString());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "user1");
assertEquals("root.user1group.user1", context.getQueue());
// Other rules above and below hierarchical user queue rule should work as
// usual
configuredQueues.get(FSQueueType.LEAF).add("root.specifiedq");
createQueue(FSQueueType.LEAF, "root.specifiedq");
// test if specified rule(above nestedUserQueue rule) works ok
assertEquals("root.specifiedq",
policy.assignAppToQueue("root.specifiedq", "user2"));
asc = newAppSubmissionContext("root.specifiedq");
context = placementManager.placeApplication(asc, "user2");
assertEquals("root.specifiedq", context.getQueue());
// test if default rule(below nestedUserQueue rule) works
configuredQueues.get(FSQueueType.LEAF).add("root.user3group");
assertEquals("root.default",
policy.assignAppToQueue("root.default", "user3"));
// Submit should fail if we cannot create the queue
createQueue(FSQueueType.LEAF, "root.user3group");
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "user3");
assertNull("Submission should have failed and did not", context);
}
@Test
@ -253,18 +387,18 @@ public void testNestedUserQueuePrimaryGroupNoCreate() throws Exception {
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
createPolicy(sb.toString());
// Should return root.default since primary group 'root.user1group' is not
// configured
assertEquals("root.default",
policy.assignAppToQueue("root.default", "user1"));
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "user1");
assertEquals("root.default", context.getQueue());
// Let's configure primary group and check if user queue is created
configuredQueues.get(FSQueueType.PARENT).add("root.user1group");
policy = parse(sb.toString());
assertEquals("root.user1group.user1",
policy.assignAppToQueue("root.default", "user1"));
createQueue(FSQueueType.PARENT, "root.user1group");
context = placementManager.placeApplication(asc, "user1");
assertEquals("root.user1group.user1", context.getQueue());
// Both Primary group and nestedUserQueue rule has create='false'
sb = new StringBuffer();
@ -277,16 +411,16 @@ public void testNestedUserQueuePrimaryGroupNoCreate() throws Exception {
// Should return root.default since primary group and user queue for user 2
// are not configured.
assertEquals("root.default",
policy.assignAppToQueue("root.default", "user2"));
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "user2");
assertEquals("root.default", context.getQueue());
// Now configure both primary group and the user queue for user2
configuredQueues.get(FSQueueType.PARENT).add("root.user2group");
configuredQueues.get(FSQueueType.LEAF).add("root.user2group.user2");
policy = parse(sb.toString());
createQueue(FSQueueType.LEAF, "root.user2group.user2");
assertEquals("root.user2group.user2",
policy.assignAppToQueue("root.default", "user2"));
// Try placing the same app again
context = placementManager.placeApplication(asc, "user2");
assertEquals("root.user2group.user2", context.getQueue());
}
@Test
@ -299,17 +433,18 @@ public void testNestedUserQueueSecondaryGroup() throws Exception {
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
createPolicy(sb.toString());
// Should return root.default since secondary groups are not configured
assertEquals("root.default",
policy.assignAppToQueue("root.default", "user1"));
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "user1");
assertEquals("root.default", context.getQueue());
// configure secondary group for user1
configuredQueues.get(FSQueueType.PARENT).add("root.user1subgroup1");
policy = parse(sb.toString());
createQueue(FSQueueType.PARENT, "root.user1subgroup1");
createPolicy(sb.toString());
// user queue created should be created under secondary group
assertEquals("root.user1subgroup1.user1",
policy.assignAppToQueue("root.default", "user1"));
context = placementManager.placeApplication(asc, "user1");
assertEquals("root.user1subgroup1.user1", context.getQueue());
}
@Test
@ -325,33 +460,66 @@ public void testNestedUserQueueSpecificRule() throws Exception {
sb.append("</queuePlacementPolicy>");
// Let's create couple of parent queues
configuredQueues.get(FSQueueType.PARENT).add("root.parent1");
configuredQueues.get(FSQueueType.PARENT).add("root.parent2");
createQueue(FSQueueType.PARENT, "root.parent1");
createQueue(FSQueueType.PARENT, "root.parent2");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.parent1.user1",
policy.assignAppToQueue("root.parent1", "user1"));
assertEquals("root.parent2.user2",
policy.assignAppToQueue("root.parent2", "user2"));
createPolicy(sb.toString());
asc = newAppSubmissionContext("root.parent1");
context = placementManager.placeApplication(asc, "user1");
assertEquals("root.parent1.user1", context.getQueue());
asc = newAppSubmissionContext("root.parent2");
context = placementManager.placeApplication(asc, "user2");
assertEquals("root.parent2.user2", context.getQueue());
}
@Test
public void testNestedUserQueueDefaultRule() throws Exception {
// This test covers the use case where we would like user queues to be
// created under a default parent queue
configuredQueues.get(FSQueueType.PARENT).add("root.parentq");
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' create='false' />");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='default' queue='root.parentq'/>");
sb.append(" <rule name='default' queue='root.parent'/>");
sb.append(" </rule>");
sb.append("</queuePlacementPolicy>");
createPolicy(sb.toString());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "user1");
assertEquals("root.parent.user1", context.getQueue());
// Same as above but now with the create flag false for the parent
createQueue(FSQueueType.PARENT, "root.parent");
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' create='false' />");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='default' queue='root.parent' create='false'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.parentq.user1",
policy.assignAppToQueue("root.default", "user1"));
createPolicy(sb.toString());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "user1");
assertEquals("root.parent.user1", context.getQueue());
// Parent queue returned is already a configured LEAF, should fail and the
// context is null.
createQueue(FSQueueType.LEAF, "root.parent");
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' create='false' />");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='default' queue='root.parent' />");
sb.append(" </rule>");
sb.append("</queuePlacementPolicy>");
createPolicy(sb.toString());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "user1");
assertNull("Submission should have failed and did not", context);
}
@Test
@ -361,9 +529,10 @@ public void testUserContainsPeriod() throws Exception {
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='user' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.first_dot_last",
policy.assignAppToQueue("default", "first.last"));
createPolicy(sb.toString());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "first.last");
assertEquals("root.first_dot_last", context.getQueue());
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
@ -371,11 +540,14 @@ public void testUserContainsPeriod() throws Exception {
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='default'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
policy = parse(sb.toString());
assertEquals("root.default.first_dot_last",
policy.assignAppToQueue("root.default", "first.last"));
// specified create is false, bypass the rule
// default rule has create which requires a PARENT queue: remove the LEAF
queueManager.removeLeafQueue("root.default");
createPolicy(sb.toString());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "first_dot_last");
assertEquals("root.default.first_dot_last", context.getQueue());
}
@Test
@ -386,22 +558,22 @@ public void testGroupContainsPeriod() throws Exception {
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
CONF.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
PeriodGroupsMapping.class, GroupMappingServiceProvider.class);
// User queue would be created under primary group queue, and the period
// in the group name should be converted into _dot_
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.user1_dot_group.user1",
policy.assignAppToQueue("root.default", "user1"));
createPolicy(sb.toString());
asc = newAppSubmissionContext("default");
context = placementManager.placeApplication(asc, "user1");
assertEquals("root.user1_dot_group.user1", context.getQueue());
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
CONF.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
}
@Test(expected=IOException.class)
@Test
public void testEmptyGroupsPrimaryGroupRule() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
@ -410,20 +582,67 @@ public void testEmptyGroupsPrimaryGroupRule() throws Exception {
sb.append("</queuePlacementPolicy>");
// Add a static mapping that returns empty groups for users
conf.setStrings(CommonConfigurationKeys
CONF.setStrings(CommonConfigurationKeys
.HADOOP_USER_GROUP_STATIC_OVERRIDES, "emptygroupuser=");
QueuePlacementPolicy policy = parse(sb.toString());
policy.assignAppToQueue(null, "emptygroupuser");
createPolicy(sb.toString());
asc = newAppSubmissionContext("root.fake");
assertIfExceptionThrown("emptygroupuser");
}
private QueuePlacementPolicy parse(String str) throws Exception {
@Test
public void testSpecifiedQueueWithSpaces() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified'/>");
sb.append(" <rule name='default'/>");
sb.append("</queuePlacementPolicy>");
createPolicy(sb.toString());
asc = newAppSubmissionContext("A ");
assertIfExceptionThrown("user1");
asc = newAppSubmissionContext("A\u00a0");
assertIfExceptionThrown("user1");
}
private void createPolicy(String str)
throws AllocationConfigurationException {
// Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
.newInstance();
docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = builder.parse(IOUtils.toInputStream(str));
Element root = doc.getDocumentElement();
return QueuePlacementPolicy.fromXml(root, configuredQueues, conf);
Element root = null;
try {
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
.newInstance();
docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = builder.parse(IOUtils.toInputStream(str,
StandardCharsets.UTF_8));
root = doc.getDocumentElement();
} catch (Exception ex) {
// Don't really want to test the xml parsing side,
// let it fail with a null config below.
}
QueuePlacementPolicy.fromXml(root, scheduler);
}
private ApplicationSubmissionContext newAppSubmissionContext(String queue) {
ApplicationId appId = ApplicationId.newInstance(1L, 1);
Priority prio = Priority.UNDEFINED;
Resource resource = Resource.newInstance(1, 1);
ContainerLaunchContext amContainer =
ContainerLaunchContext.newInstance(null, null, null, null, null, null);
return ApplicationSubmissionContext.newInstance(appId, "test", queue,
prio, amContainer, false, false, 1, resource, "testing");
}
private void createQueue(FSQueueType type, String name) {
// Create a queue as if it is in the config.
FSQueue queue = queueManager.createQueue(name, type);
assertNotNull("Queue not created", queue);
// walk up the list till we have a non dynamic queue
// root is always non dynamic
do {
queue.setDynamic(false);
queue = queue.parent;
} while (queue.isDynamic());
}
}

View File

@ -30,6 +30,8 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@ -40,6 +42,8 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestSchedulingPolicy {
private static final Logger LOG =
@ -53,6 +57,11 @@ public class TestSchedulingPolicy {
public void setUp() throws Exception {
scheduler = new FairScheduler();
conf = new FairSchedulerConfiguration();
// since this runs outside of the normal context we need to set one
RMContext rmContext = mock(RMContext.class);
PlacementManager placementManager = new PlacementManager();
when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
scheduler.setRMContext(rmContext);
}
public void testParseSchedulingPolicy()

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
@ -37,19 +39,25 @@
public class TestFairSchedulerQueueInfo {
@Test
public void testEmptyChildQueues() throws Exception {
FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
public void testEmptyChildQueues() {
FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
RMContext rmContext = mock(RMContext.class);
PlacementManager placementManager = new PlacementManager();
SystemClock clock = SystemClock.getInstance();
FairScheduler scheduler = mock(FairScheduler.class);
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1));
when(scheduler.getConf()).thenReturn(fsConf);
when(scheduler.getConfig()).thenReturn(fsConf);
when(scheduler.getRMContext()).thenReturn(rmContext);
when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
when(scheduler.getClusterResource()).thenReturn(
Resource.newInstance(1, 1));
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock);
AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
QueueManager queueManager = new QueueManager(scheduler);
queueManager.initialize(conf);
queueManager.initialize();
FSQueue testQueue = queueManager.getLeafQueue("test", true);
FairSchedulerQueueInfo queueInfo =