YARN-9825. Changes for initializing placement rules with ResourceScheduler in branch-2. Contributed by Jonathan Hung.

This commit is contained in:
Varun Saxena 2019-09-13 17:03:47 +05:30
parent c412fab728
commit c9a46308a1
3 changed files with 78 additions and 37 deletions

View File

@ -18,20 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import java.util.Map;
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
public abstract class PlacementRule {
public String getName() {
return this.getClass().getName();
}
public void initialize(Map<String, String> parameters, RMContext rmContext)
throws YarnException {
}
public abstract boolean initialize(
ResourceScheduler scheduler) throws IOException;
/**
* Get queue for a given application

View File

@ -32,6 +32,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
public class UserGroupMappingPlacementRule extends PlacementRule {
private static final Log LOG = LogFactory
@ -95,6 +103,10 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
}
}
public UserGroupMappingPlacementRule(){
this(false, null, null);
}
public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
List<QueueMapping> newMappings, Groups groups) {
this.mappings = newMappings;
@ -156,7 +168,55 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
return queueName;
}
@VisibleForTesting
@Override
public boolean initialize(ResourceScheduler scheduler)
throws IOException {
if (!(scheduler instanceof CapacityScheduler)) {
throw new IOException(
"UserGroupMappingPlacementRule can be configured only for "
+ "CapacityScheduler");
}
CapacitySchedulerContext schedulerContext =
(CapacitySchedulerContext) scheduler;
CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info(
"Initialized queue mappings, override: " + overrideWithQueueMappings);
// Get new user/group mappings
List<QueueMapping> newMappings = conf.getQueueMappings();
CapacitySchedulerQueueManager queueManager =
schedulerContext.getCapacitySchedulerQueueManager();
// check if mappings refer to valid queues
for (QueueMapping mapping : newMappings) {
String mappingQueue = mapping.getQueue();
if (!mappingQueue.equals(
UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
.equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
CSQueue queue = queueManager.getQueue(mappingQueue);
if (queue == null || !(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mappingQueue);
}
}
}
// initialize groups if mappings are present
if (newMappings.size() > 0) {
Groups groups = new Groups(conf);
this.mappings = newMappings;
this.groups = groups;
this.overrideWithQueueMappings = overrideWithQueueMappings;
return true;
}
return false;
}
@VisibleForTesting
public List<QueueMapping> getQueueMappings() {
return mappings;

View File

@ -569,38 +569,12 @@ public class CapacityScheduler extends
}
@VisibleForTesting
public UserGroupMappingPlacementRule
getUserGroupMappingPlacementRule() throws IOException {
public PlacementRule getUserGroupMappingPlacementRule() throws IOException {
try {
readLock.lock();
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info(
"Initialized queue mappings, override: " + overrideWithQueueMappings);
// Get new user/group mappings
List<QueueMapping> newMappings = conf.getQueueMappings();
// check if mappings refer to valid queues
for (QueueMapping mapping : newMappings) {
String mappingQueue = mapping.getQueue();
if (!mappingQueue.equals(
UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
.equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
CSQueue queue = getQueue(mappingQueue);
if (queue == null || !(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mappingQueue);
}
}
}
// initialize groups if mappings are present
if (newMappings.size() > 0) {
Groups groups = new Groups(conf);
return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
newMappings, groups);
}
return null;
UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule();
ugRule.initialize(this);
return ugRule;
} finally {
readLock.unlock();
}
@ -626,11 +600,19 @@ public class CapacityScheduler extends
}
break;
default:
boolean isMappingNotEmpty;
try {
PlacementRule rule = PlacementFactory.getPlacementRule(
placementRuleStr, conf);
if (null != rule) {
placementRules.add(rule);
try {
isMappingNotEmpty = rule.initialize(this);
} catch (IOException ie) {
throw new IOException(ie);
}
if (isMappingNotEmpty) {
placementRules.add(rule);
}
}
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);