YARN-8016. Refine PlacementRule interface and add a app-name queue mapping rule as an example. (Zian Chen via wangda)

Change-Id: I35caf1480e0f76f5f3a53528af09312e39414bbb
(cherry picked from commit a90471b3e6)
This commit is contained in:
Wangda Tan 2018-03-23 16:43:40 -07:00 committed by Jonathan Hung
parent 244385d7eb
commit 81d63d5ea1
13 changed files with 929 additions and 36 deletions

View File

@ -273,6 +273,8 @@ public class YarnConfiguration extends Configuration {
/** UserGroupMappingPlacementRule configuration string. */
public static final String USER_GROUP_PLACEMENT_RULE = "user-group";
public static final String APP_NAME_PLACEMENT_RULE = "app-name";
/** Enable Resource Manager webapp ui actions */
public static final String RM_WEBAPP_UI_ACTIONS_ENABLED =
RM_PREFIX + "webapp.ui-actions.enabled";

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.extractQueuePath;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.getPlacementContext;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.isStaticQueueMapping;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.validateAndGetAutoCreatedQueueMapping;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.validateAndGetQueueMapping;
public class AppNameMappingPlacementRule extends PlacementRule {
private static final Log LOG = LogFactory
.getLog(AppNameMappingPlacementRule.class);
public static final String CURRENT_APP_MAPPING = "%application";
private static final String QUEUE_MAPPING_NAME = "app-name";
private boolean overrideWithQueueMappings = false;
private List<QueueMappingEntity> mappings = null;
public AppNameMappingPlacementRule() {
this(false, null);
}
public AppNameMappingPlacementRule(boolean overrideWithQueueMappings,
List<QueueMappingEntity> newMappings) {
this.overrideWithQueueMappings = overrideWithQueueMappings;
this.mappings = newMappings;
}
@Override
public boolean initialize(CapacitySchedulerContext schedulerContext)
throws IOException {
CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info(
"Initialized queue mappings, override: " + overrideWithQueueMappings);
List<QueueMappingEntity> queueMappings =
conf.getQueueMappingEntity(QUEUE_MAPPING_NAME);
// Get new user mappings
List<QueueMappingEntity> newMappings = new ArrayList<>();
CapacitySchedulerQueueManager queueManager =
schedulerContext.getCapacitySchedulerQueueManager();
// check if mappings refer to valid queues
for (QueueMappingEntity mapping : queueMappings) {
QueuePath queuePath = extractQueuePath(mapping.getQueue());
if (isStaticQueueMapping(mapping)) {
//Try getting queue by its leaf queue name
// without splitting into parent/leaf queues
CSQueue queue = queueManager.getQueue(mapping.getQueue());
if (ifQueueDoesNotExist(queue)) {
//Try getting the queue by extracting leaf and parent queue names
//Assuming its a potential auto created leaf queue
queue = queueManager.getQueue(queuePath.getLeafQueue());
if (ifQueueDoesNotExist(queue)) {
//if leaf queue does not exist,
// this could be a potential auto created leaf queue
//validate if parent queue is specified,
// then it should exist and
// be an instance of AutoCreateEnabledParentQueue
QueueMappingEntity newMapping =
validateAndGetAutoCreatedQueueMapping(queueManager, mapping,
queuePath);
if (newMapping == null) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mapping
.getQueue());
}
newMappings.add(newMapping);
} else{
QueueMappingEntity newMapping = validateAndGetQueueMapping(
queueManager, queue, mapping, queuePath);
newMappings.add(newMapping);
}
} else{
// if queue exists, validate
// if its an instance of leaf queue
// if its an instance of auto created leaf queue,
// then extract parent queue name and update queue mapping
QueueMappingEntity newMapping = validateAndGetQueueMapping(
queueManager, queue, mapping, queuePath);
newMappings.add(newMapping);
}
} else{
//If it is a dynamic queue mapping,
// we can safely assume leaf queue name does not have '.' in it
// validate
// if parent queue is specified, then
// parent queue exists and an instance of AutoCreateEnabledParentQueue
//
QueueMappingEntity newMapping = validateAndGetAutoCreatedQueueMapping(
queueManager, mapping, queuePath);
if (newMapping != null) {
newMappings.add(newMapping);
} else{
newMappings.add(mapping);
}
}
}
if (newMappings.size() > 0) {
this.mappings = newMappings;
this.overrideWithQueueMappings = overrideWithQueueMappings;
return true;
}
return false;
}
private static boolean ifQueueDoesNotExist(CSQueue queue) {
return queue == null;
}
private ApplicationPlacementContext getAppPlacementContext(String user,
ApplicationId applicationId) throws IOException {
for (QueueMappingEntity mapping : mappings) {
if (mapping.getSource().equals(CURRENT_APP_MAPPING)) {
if (mapping.getQueue().equals(CURRENT_APP_MAPPING)) {
return getPlacementContext(mapping, String.valueOf(applicationId));
} else {
return getPlacementContext(mapping);
}
}
if (mapping.getSource().equals(applicationId.toString())) {
return getPlacementContext(mapping);
}
}
return null;
}
@Override
public ApplicationPlacementContext getPlacementForApp(
ApplicationSubmissionContext asc, String user) throws YarnException {
String queueName = asc.getQueue();
ApplicationId applicationId = asc.getApplicationId();
if (mappings != null && mappings.size() > 0) {
try {
ApplicationPlacementContext mappedQueue = getAppPlacementContext(user,
applicationId);
if (mappedQueue != null) {
// We have a mapping, should we use it?
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
//queueName will be same as mapped queue name in case of recovery
|| queueName.equals(mappedQueue.getQueue())
|| overrideWithQueueMappings) {
LOG.info("Application " + applicationId
+ " mapping [" + queueName + "] to [" + mappedQueue
+ "] override " + overrideWithQueueMappings);
return mappedQueue;
}
}
} catch (IOException ioex) {
String message = "Failed to submit application " + applicationId +
" reason: " + ioex.getMessage();
throw new YarnException(message);
}
}
return null;
}
}

View File

@ -18,11 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import java.util.Map;
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
public abstract class PlacementRule {
@ -30,9 +30,8 @@ public abstract class PlacementRule {
return this.getClass().getName();
}
public void initialize(Map<String, String> parameters, RMContext rmContext)
throws YarnException {
}
public abstract boolean initialize(
CapacitySchedulerContext schedulerContext) throws IOException;
/**
* Get queue for a given application

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
public class QueueMappingEntity {
private String source;
private String queue;
private String parentQueue;
public final static String DELIMITER = ":";
public QueueMappingEntity(String source, String queue) {
this.source = source;
this.queue = queue;
this.parentQueue = null;
}
public QueueMappingEntity(String source, String queue, String parentQueue) {
this.source = source;
this.queue = queue;
this.parentQueue = parentQueue;
}
public String getQueue() {
return queue;
}
public String getParentQueue() {
return parentQueue;
}
public String getSource() {
return source;
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof QueueMappingEntity) {
QueueMappingEntity other = (QueueMappingEntity) obj;
return (other.source.equals(source) &&
other.queue.equals(queue));
} else {
return false;
}
}
public String toString() {
return source + DELIMITER + (parentQueue != null ?
parentQueue + "." + queue :
queue);
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
public class QueuePath {
private String parentQueue;
private String leafQueue;
public QueuePath(final String leafQueue) {
this.leafQueue = leafQueue;
}
public QueuePath(final String parentQueue, final String leafQueue) {
this.parentQueue = parentQueue;
this.leafQueue = leafQueue;
}
public String getParentQueue() {
return parentQueue;
}
public String getLeafQueue() {
return leafQueue;
}
public boolean hasParentQueue() {
return parentQueue != null;
}
@Override
public String toString() {
return parentQueue + DOT + leafQueue;
}
}

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
import java.io.IOException;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
/**
* Utility class for QueuePlacementRule.
*/
public final class QueuePlacementRuleUtils {
public static final String CURRENT_USER_MAPPING = "%user";
public static final String PRIMARY_GROUP_MAPPING = "%primary_group";
private QueuePlacementRuleUtils() {
}
private static void validateQueueMappingUnderParentQueue(CSQueue parentQueue,
String parentQueueName, String leafQueueName) throws IOException {
if (parentQueue == null) {
throw new IOException(
"mapping contains invalid or non-leaf queue [" + leafQueueName
+ "] and invalid parent queue [" + parentQueueName + "]");
} else if (!(parentQueue instanceof ManagedParentQueue)) {
throw new IOException("mapping contains leaf queue [" + leafQueueName
+ "] and invalid parent queue which "
+ "does not have auto creation of leaf queues enabled ["
+ parentQueueName + "]");
} else if (!parentQueue.getQueueName().equals(parentQueueName)) {
throw new IOException(
"mapping contains invalid or non-leaf queue [" + leafQueueName
+ "] and invalid parent queue "
+ "which does not match existing leaf queue's parent : ["
+ parentQueueName + "] does not match [ " + parentQueue
.getQueueName() + "]");
}
}
public static QueueMappingEntity validateAndGetAutoCreatedQueueMapping(
CapacitySchedulerQueueManager queueManager, QueueMappingEntity mapping,
QueuePath queuePath) throws IOException {
if (queuePath.hasParentQueue()) {
//if parent queue is specified,
// then it should exist and be an instance of ManagedParentQueue
validateQueueMappingUnderParentQueue(queueManager.getQueue(
queuePath.getParentQueue()), queuePath.getParentQueue(),
queuePath.getLeafQueue());
return new QueueMappingEntity(mapping.getSource(),
queuePath.getLeafQueue(), queuePath.getParentQueue());
}
return null;
}
public static QueueMappingEntity validateAndGetQueueMapping(
CapacitySchedulerQueueManager queueManager, CSQueue queue,
QueueMappingEntity mapping, QueuePath queuePath) throws IOException {
if (!(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue : " + mapping.getQueue());
}
if (queue instanceof AutoCreatedLeafQueue && queue
.getParent() instanceof ManagedParentQueue) {
QueueMappingEntity newMapping = validateAndGetAutoCreatedQueueMapping(
queueManager, mapping, queuePath);
if (newMapping == null) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mapping.getQueue());
}
return newMapping;
}
return mapping;
}
public static boolean isStaticQueueMapping(QueueMappingEntity mapping) {
return !mapping.getQueue().contains(CURRENT_USER_MAPPING) && !mapping
.getQueue().contains(PRIMARY_GROUP_MAPPING);
}
public static QueuePath extractQueuePath(String queueName)
throws IOException {
int parentQueueNameEndIndex = queueName.lastIndexOf(DOT);
if (parentQueueNameEndIndex > -1) {
final String parentQueue = queueName.substring(0, parentQueueNameEndIndex)
.trim();
final String leafQueue = queueName.substring(parentQueueNameEndIndex + 1)
.trim();
return new QueuePath(parentQueue, leafQueue);
}
return new QueuePath(queueName);
}
public static ApplicationPlacementContext getPlacementContext(
QueueMappingEntity mapping) {
return getPlacementContext(mapping, mapping.getQueue());
}
public static ApplicationPlacementContext getPlacementContext(
QueueMappingEntity mapping, String leafQueueName) {
if (!org.apache.commons.lang.StringUtils.isEmpty(mapping.getParentQueue())) {
return new ApplicationPlacementContext(leafQueueName,
mapping.getParentQueue());
} else{
return new ApplicationPlacementContext(leafQueueName);
}
}
}

View File

@ -141,6 +141,10 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
}
}
public UserGroupMappingPlacementRule(){
this(false, null, null);
}
public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
List<QueueMapping> newMappings, Groups groups) {
this.mappings = newMappings;
@ -225,8 +229,9 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
}
@VisibleForTesting
public static UserGroupMappingPlacementRule get(
CapacitySchedulerContext schedulerContext) throws IOException {
@Override
public boolean initialize(CapacitySchedulerContext schedulerContext)
throws IOException {
CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info(
@ -301,11 +306,12 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
// initialize groups if mappings are present
if (newMappings.size() > 0) {
Groups groups = new Groups(conf);
return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
newMappings, groups);
this.mappings = newMappings;
this.groups = groups;
this.overrideWithQueueMappings = overrideWithQueueMappings;
return true;
}
return null;
return false;
}
private static QueueMapping validateAndGetQueueMapping(

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.AppNameMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
@ -666,45 +667,80 @@ public class CapacityScheduler extends
public PlacementRule getUserGroupMappingPlacementRule() throws IOException {
try {
readLock.lock();
return UserGroupMappingPlacementRule.get(this);
UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule();
ugRule.initialize(this);
return ugRule;
} finally {
readLock.unlock();
}
}
public PlacementRule getAppNameMappingPlacementRule() throws IOException {
try {
readLock.lock();
AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule();
anRule.initialize(this);
return anRule;
} finally {
readLock.unlock();
}
}
@VisibleForTesting
void updatePlacementRules() throws IOException {
public void updatePlacementRules() throws IOException {
// Initialize placement rules
Collection<String> placementRuleStrs = conf.getStringCollection(
YarnConfiguration.QUEUE_PLACEMENT_RULES);
List<PlacementRule> placementRules = new ArrayList<>();
if (placementRuleStrs.isEmpty()) {
PlacementRule ugRule = getUserGroupMappingPlacementRule();
if (null != ugRule) {
placementRules.add(ugRule);
Set<String> distingushRuleSet = new HashSet<>();
// fail the case if we get duplicate placementRule add in
for (String pls : placementRuleStrs) {
if (!distingushRuleSet.add(pls)) {
throw new IOException("Invalid PlacementRule inputs which "
+ "contains duplicate rule strings");
}
} else {
for (String placementRuleStr : placementRuleStrs) {
switch (placementRuleStr) {
case YarnConfiguration.USER_GROUP_PLACEMENT_RULE:
PlacementRule ugRule = getUserGroupMappingPlacementRule();
if (null != ugRule) {
placementRules.add(ugRule);
}
break;
default:
try {
PlacementRule rule = PlacementFactory.getPlacementRule(
placementRuleStr, conf);
if (null != rule) {
}
// add UserGroupMappingPlacementRule if absent
distingushRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
placementRuleStrs = new ArrayList<>(distingushRuleSet);
for (String placementRuleStr : placementRuleStrs) {
switch (placementRuleStr) {
case YarnConfiguration.USER_GROUP_PLACEMENT_RULE:
PlacementRule ugRule = getUserGroupMappingPlacementRule();
if (null != ugRule) {
placementRules.add(ugRule);
}
break;
case YarnConfiguration.APP_NAME_PLACEMENT_RULE:
PlacementRule anRule = getAppNameMappingPlacementRule();
if (null != anRule) {
placementRules.add(anRule);
}
break;
default:
boolean isMappingNotEmpty;
try {
PlacementRule rule = PlacementFactory.getPlacementRule(
placementRuleStr, conf);
if (null != rule) {
try {
isMappingNotEmpty = rule.initialize(this);
} catch (IOException ie) {
throw new IOException(ie);
}
if (isMappingNotEmpty) {
placementRules.add(rule);
}
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
}
}
rmContext.getQueuePlacementManager().updateRules(placementRules);
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMappingEntity;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@ -1011,6 +1012,54 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings);
}
public List<QueueMappingEntity> getQueueMappingEntity(
String queueMappingSuffix) {
String queueMappingName = buildQueueMappingRuleProperty(queueMappingSuffix);
List<QueueMappingEntity> mappings =
new ArrayList<QueueMappingEntity>();
Collection<String> mappingsString =
getTrimmedStringCollection(queueMappingName);
for (String mappingValue : mappingsString) {
String[] mapping =
getTrimmedStringCollection(mappingValue, ":")
.toArray(new String[] {});
if (mapping.length != 2 || mapping[1].length() == 0) {
throw new IllegalArgumentException(
"Illegal queue mapping " + mappingValue);
}
QueueMappingEntity m = new QueueMappingEntity(mapping[0], mapping[1]);
mappings.add(m);
}
return mappings;
}
private String buildQueueMappingRuleProperty (String queueMappingSuffix) {
StringBuilder queueMapping = new StringBuilder();
queueMapping.append(YarnConfiguration.QUEUE_PLACEMENT_RULES)
.append(".").append(queueMappingSuffix);
return queueMapping.toString();
}
@VisibleForTesting
public void setQueueMappingEntities(List<QueueMappingEntity> queueMappings,
String queueMappingSuffix) {
if (queueMappings == null) {
return;
}
List<String> queueMappingStrs = new ArrayList<>();
for (QueueMappingEntity mapping : queueMappings) {
queueMappingStrs.add(mapping.toString());
}
String mappingRuleProp = buildQueueMappingRuleProperty(queueMappingSuffix);
setStrings(mappingRuleProp, StringUtils.join(",", queueMappingStrs));
}
/**
* Returns a collection of strings, trimming leading and trailing whitespeace
* on each value

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
public class TestAppNameMappingPlacementRule {
private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
public static final String APPIDSTRPREFIX = "application";
private static final String APPLICATION_ID_PREFIX = APPIDSTRPREFIX + '_';
private static final String APPLICATION_ID_SUFFIX = '_' + "0001";
private static final String CLUSTER_APP_ID = APPLICATION_ID_PREFIX +
CLUSTER_TIMESTAMP + APPLICATION_ID_SUFFIX;
private YarnConfiguration conf = new YarnConfiguration();
@Before
public void setup() {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
}
private void verifyQueueMapping(QueueMappingEntity queueMapping,
String inputAppId, String expectedQueue) throws YarnException {
verifyQueueMapping(queueMapping, inputAppId,
YarnConfiguration.DEFAULT_QUEUE_NAME, expectedQueue, false);
}
private void verifyQueueMapping(QueueMappingEntity queueMapping,
String inputAppId, String inputQueue, String expectedQueue,
boolean overwrite) throws YarnException {
AppNameMappingPlacementRule rule = new AppNameMappingPlacementRule(
overwrite, Arrays.asList(queueMapping));
ApplicationSubmissionContext asc = Records.newRecord(
ApplicationSubmissionContext.class);
asc.setQueue(inputQueue);
ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP,
Integer.parseInt(inputAppId));
asc.setApplicationId(appId);
ApplicationPlacementContext ctx = rule.getPlacementForApp(asc,
queueMapping.getSource());
Assert.assertEquals(expectedQueue,
ctx != null ? ctx.getQueue() : inputQueue);
}
@Test
public void testMapping() throws YarnException {
// simple base case for mapping user to queue
verifyQueueMapping(new QueueMappingEntity(CLUSTER_APP_ID,
"q1"), "1", "q1");
verifyQueueMapping(new QueueMappingEntity("%application", "q2"), "1", "q2");
verifyQueueMapping(new QueueMappingEntity("%application", "%application"),
"1", CLUSTER_APP_ID);
// specify overwritten, and see if user specified a queue, and it will be
// overridden
verifyQueueMapping(new QueueMappingEntity(CLUSTER_APP_ID,
"q1"), "1", "q2", "q1", true);
// if overwritten not specified, it should be which user specified
verifyQueueMapping(new QueueMappingEntity(CLUSTER_APP_ID,
"q1"), "1", "q2", "q2", false);
}
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.setupQueueConfiguration;
public class TestPlacementManager {
public static final String USER = "user_";
public static final String APP_ID1 = "1";
public static final String USER1 = USER + APP_ID1;
public static final String APP_ID2 = "2";
public static final String USER2 = USER + APP_ID2;
public static final String PARENT_QUEUE = "c";
private MockRM mockRM = null;
private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
private String getQueueMapping(String parentQueue, String leafQueue) {
return parentQueue + DOT + leafQueue;
}
@Test
public void testPlaceApplicationWithPlacementRuleChain() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mockRM = new MockRM(conf);
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
mockRM.start();
cs.start();
PlacementManager pm = cs.getRMContext()
.getQueuePlacementManager();
List<PlacementRule> queuePlacementRules = new ArrayList<>();
UserGroupMappingPlacementRule.QueueMapping userQueueMapping =
new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
USER1,
getQueueMapping(PARENT_QUEUE, USER1));
UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule(
false, Arrays.asList(userQueueMapping), null);
queuePlacementRules.add(ugRule);
pm.updateRules(queuePlacementRules);
ApplicationSubmissionContext asc = Records.newRecord(
ApplicationSubmissionContext.class);
ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP,
Integer.parseInt(APP_ID1));
asc.setApplicationId(appId);
boolean caughtException = false;
try{
pm.placeApplication(asc, USER2);
} catch (Exception e) {
caughtException = true;
}
Assert.assertTrue(caughtException);
QueueMappingEntity queueMappingEntity = new QueueMappingEntity(APP_ID1,
USER1, PARENT_QUEUE);
AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule(false,
Arrays.asList(queueMappingEntity));
queuePlacementRules.add(anRule);
pm.updateRules(queuePlacementRules);
try{
pm.placeApplication(asc, USER2);
} catch (Exception e) {
caughtException = false;
}
Assert.assertFalse(caughtException);
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMappingEntity;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.getQueueMapping;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.setupQueueConfiguration;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertThat;
public class TestCapacitySchedulerQueueMappingFactory {
private static final String QUEUE_MAPPING_NAME = "app-name";
private static final String QUEUE_MAPPING_RULE_APP_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.placement.AppNameMappingPlacementRule";
private static final String QUEUE_MAPPING_RULE_USER_GROUP =
"org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule";
public static final String USER = "user_";
public static final String PARENT_QUEUE = "c";
private MockRM mockRM = null;
public static CapacitySchedulerConfiguration setupQueueMappingsForRules(
CapacitySchedulerConfiguration conf, String parentQueue,
boolean overrideWithQueueMappings, int[] sourceIds) {
List<String> queuePlacementRules = new ArrayList<>();
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
queuePlacementRules.add(QUEUE_MAPPING_RULE_APP_NAME);
conf.setQueuePlacementRules(queuePlacementRules);
List<UserGroupMappingPlacementRule.QueueMapping> existingMappingsForUG =
conf.getQueueMappings();
//set queue mapping
List<UserGroupMappingPlacementRule.QueueMapping> queueMappingsForUG =
new ArrayList<>();
for (int i = 0; i < sourceIds.length; i++) {
//Set C as parent queue name for auto queue creation
UserGroupMappingPlacementRule.QueueMapping userQueueMapping =
new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
USER + sourceIds[i],
getQueueMapping(parentQueue, USER + sourceIds[i]));
queueMappingsForUG.add(userQueueMapping);
}
existingMappingsForUG.addAll(queueMappingsForUG);
conf.setQueueMappings(existingMappingsForUG);
List<QueueMappingEntity> existingMappingsForAN =
conf.getQueueMappingEntity(QUEUE_MAPPING_NAME);
//set queue mapping
List<QueueMappingEntity> queueMappingsForAN =
new ArrayList<>();
for (int i = 0; i < sourceIds.length; i++) {
//Set C as parent queue name for auto queue creation
QueueMappingEntity queueMapping =
new QueueMappingEntity(USER + sourceIds[i],
getQueueMapping(parentQueue, USER + sourceIds[i]));
queueMappingsForAN.add(queueMapping);
}
existingMappingsForAN.addAll(queueMappingsForAN);
conf.setQueueMappingEntities(existingMappingsForAN, QUEUE_MAPPING_NAME);
//override with queue mappings
conf.setOverrideWithQueueMappings(overrideWithQueueMappings);
return conf;
}
@Test
public void testUpdatePlacementRulesFactory() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// init queue mapping for UserGroupMappingRule and AppNameMappingRule
setupQueueMappingsForRules(conf, PARENT_QUEUE, true, new int[] {1, 2, 3});
mockRM = new MockRM(conf);
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
cs.updatePlacementRules();
mockRM.start();
cs.start();
List<PlacementRule> rules = cs.getRMContext()
.getQueuePlacementManager().getPlacementRules();
List<String> placementRuleNames = new ArrayList<>();
for (PlacementRule pr : rules) {
placementRuleNames.add(pr.getName());
}
// verify both placement rules were added successfully
assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_USER_GROUP));
assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_APP_NAME));
}
}

View File

@ -60,7 +60,7 @@ The `CapacityScheduler` supports the following features:
* **Resource-based Scheduling** - Support for resource-intensive applications, where-in a application can optionally specify higher resource-requirements than the default, thereby accommodating applications with differing resource requirements. Currently, *memory* is the resource requirement supported.
* **Queue Mapping based on User or Group** - This feature allows users to map a job to a specific queue based on the user or group.
* **Queue Mapping Interface based on Default or User Defined Placement Rules** - This feature allows users to map a job to a specific queue based on some default placement rule. For instance based on user & group, or application name. User can also define their own placement rule.
* **Priority Scheduling** - This feature allows applications to be submitted and scheduled with different priorities. Higher integer value indicates higher priority for an application. Currently Application priority is supported only for FIFO ordering policy.
@ -155,13 +155,14 @@ Configuration
**Note:** An *ACL* is of the form *user1*,*user2* *space* *group1*,*group2*. The special value of * implies *anyone*. The special value of *space* implies *no one*. The default is * for the root queue if not specified.
* Queue Mapping based on User or Group
* Queue Mapping based on User or Group, Application Name or user defined placement rules
The `CapacityScheduler` supports the following parameters to configure the queue mapping based on user or group:
The `CapacityScheduler` supports the following parameters to configure the queue mapping based on user or group, user & group, or application name. User can also define their own placement rule:
| Property | Description |
|:---- |:---- |
| `yarn.scheduler.capacity.queue-mappings` | This configuration specifies the mapping of user or group to a specific queue. You can map a single user or a list of users to queues. Syntax: `[u or g]:[name]:[queue_name][,next_mapping]*`. Here, *u or g* indicates whether the mapping is for a user or group. The value is *u* for user and *g* for group. *name* indicates the user name or group name. To specify the user who has submitted the application, %user can be used. *queue_name* indicates the queue name for which the application has to be mapped. To specify queue name same as user name, *%user* can be used. To specify queue name same as the name of the primary group for which the user belongs to, *%primary_group* can be used.|
| `yarn.scheduler.queue-placement-rules.app-name` | This configuration specifies the mapping of application_id to a specific queue. You can map a single application or a list of applications to queues. Syntax: `[app_id]:[queue_name][,next_mapping]*`. Here, *app_id* indicates the application id you want to do the mapping. To specify the current application's id as the app_id, %application can be used. *queue_name* indicates the queue name for which the application has to be mapped. To specify queue name same as application id, *%application* can be used.|
| `yarn.scheduler.capacity.queue-mappings-override.enable` | This function is used to specify whether the user specified queues can be overridden. This is a Boolean value and the default value is *false*. |
Example:
@ -177,6 +178,16 @@ Example:
evaluated from left to right, and the first valid mapping will be used.
</description>
</property>
<property>
<name>yarn.scheduler.queue-placement-rules.app-name</name>
<value>appId1:queue1,%application:%application</value>
<description>
Here, <appId1> is mapped to <queue1>, maps applications to queues with
the same name as application respectively. The mappings will be
evaluated from left to right, and the first valid mapping will be used.
</description>
</property>
```
* Queue lifetime for applications