YARN-3635. Refactored current queue mapping implementation in CapacityScheduler to use a generic PlacementManager framework. Contributed by Wangda Tan

This commit is contained in:
Jian He 2015-09-15 15:39:20 +08:00
parent d777757d21
commit 5468baa80a
13 changed files with 584 additions and 279 deletions

View File

@ -455,6 +455,9 @@ Release 2.8.0 - UNRELEASED
YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend
container allocation logic. (Wangda Tan via jianhe)
YARN-3635. Refactored current queue mapping implementation in CapacityScheduler
to use a generic PlacementManager framework. (Wangda Tan via jianhe)
BUG FIXES
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@ -99,9 +100,10 @@ public class RMActiveServiceContext {
private long schedulerRecoveryWaitTime = 0;
private boolean printLog = true;
private boolean isSchedulerReady = false;
private PlacementManager queuePlacementManager = null;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
}
@Private
@ -424,4 +426,16 @@ public void setSystemClock(Clock clock) {
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
return systemCredentials;
}
@Private
@Unstable
public PlacementManager getQueuePlacementManager() {
return queuePlacementManager;
}
@Private
@Unstable
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.queuePlacementManager = placementMgr;
}
}

View File

@ -326,6 +326,15 @@ protected void recoverApplication(ApplicationStateData appState,
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery) throws YarnException {
// Do queue mapping
if (!isRecovery) {
if (rmContext.getQueuePlacementManager() != null) {
// We only do queue mapping when it's a new application
rmContext.getQueuePlacementManager().placeApplication(
submissionContext, user);
}
}
ApplicationId applicationId = submissionContext.getApplicationId();
ResourceRequest amReq =
validateAndCreateResourceRequest(submissionContext, isRecovery);

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -124,4 +125,8 @@ void setRMApplicationHistoryWriter(
boolean isSchedulerReadyForAllocatingContainers();
Configuration getYarnConfiguration();
PlacementManager getQueuePlacementManager();
void setQueuePlacementManager(PlacementManager placementMgr);
}

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -76,7 +77,6 @@ public class RMContextImpl implements RMContext {
* individual fields.
*/
public RMContextImpl() {
}
@VisibleForTesting
@ -438,4 +438,14 @@ public Configuration getYarnConfiguration() {
public void setYarnConfiguration(Configuration yarnConfiguration) {
this.yarnConfiguration=yarnConfiguration;
}
@Override
public PlacementManager getQueuePlacementManager() {
return this.activeServiceContext.getQueuePlacementManager();
}
@Override
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.activeServiceContext.setQueuePlacementManager(placementMgr);
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.annotations.VisibleForTesting;
public class PlacementManager {
private static final Log LOG = LogFactory.getLog(PlacementManager.class);
List<PlacementRule> rules;
ReadLock readLock;
WriteLock writeLock;
public PlacementManager() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public void updateRules(List<PlacementRule> rules) {
try {
writeLock.lock();
this.rules = rules;
} finally {
writeLock.unlock();
}
}
public void placeApplication(ApplicationSubmissionContext asc, String user)
throws YarnException {
try {
readLock.lock();
if (null == rules || rules.isEmpty()) {
return;
}
String newQueueName = null;
for (PlacementRule rule : rules) {
newQueueName = rule.getQueueForApp(asc, user);
if (newQueueName != null) {
break;
}
}
// Failed to get where to place application
if (null == newQueueName && null == asc.getQueue()) {
String msg = "Failed to get where to place application="
+ asc.getApplicationId();
LOG.error(msg);
throw new YarnException(msg);
}
// Set it to ApplicationSubmissionContext
if (!StringUtils.equals(asc.getQueue(), newQueueName)) {
LOG.info("Placed application=" + asc.getApplicationId() + " to queue="
+ newQueueName + ", original queue=" + asc.getQueue());
asc.setQueue(newQueueName);
}
} finally {
readLock.unlock();
}
}
@VisibleForTesting
public List<PlacementRule> getPlacementRules() {
return rules;
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
public abstract class PlacementRule {
public String getName() {
return this.getClass().getName();
}
public void initialize(Map<String, String> parameters, RMContext rmContext)
throws YarnException {
}
/**
* Get queue for a given application
*
* @param asc application submission context
* @param user userName
*
* @throws YarnException
* if any error happens
*
* @return <p>
* non-null value means it is determined
* </p>
* <p>
* null value means it is undetermined, so next {@link PlacementRule}
* in the {@link PlacementManager} will take care
* </p>
*/
public abstract String getQueueForApp(ApplicationSubmissionContext asc,
String user) throws YarnException;
}

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
import com.google.common.annotations.VisibleForTesting;
public class UserGroupMappingPlacementRule extends PlacementRule {
private static final Log LOG = LogFactory
.getLog(UserGroupMappingPlacementRule.class);
public static final String CURRENT_USER_MAPPING = "%user";
public static final String PRIMARY_GROUP_MAPPING = "%primary_group";
private boolean overrideWithQueueMappings = false;
private List<QueueMapping> mappings = null;
private Groups groups;
@Private
public static class QueueMapping {
public enum MappingType {
USER("u"), GROUP("g");
private final String type;
private MappingType(String type) {
this.type = type;
}
public String toString() {
return type;
}
};
MappingType type;
String source;
String queue;
public QueueMapping(MappingType type, String source, String queue) {
this.type = type;
this.source = source;
this.queue = queue;
}
public String getQueue() {
return queue;
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof QueueMapping) {
QueueMapping other = (QueueMapping) obj;
return (other.type.equals(type) &&
other.source.equals(source) &&
other.queue.equals(queue));
} else {
return false;
}
}
}
public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
List<QueueMapping> newMappings, Groups groups) {
this.mappings = newMappings;
this.overrideWithQueueMappings = overrideWithQueueMappings;
this.groups = groups;
}
private String getMappedQueue(String user) throws IOException {
for (QueueMapping mapping : mappings) {
if (mapping.type == MappingType.USER) {
if (mapping.source.equals(CURRENT_USER_MAPPING)) {
if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
return user;
} else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
return groups.getGroups(user).get(0);
} else {
return mapping.queue;
}
}
if (user.equals(mapping.source)) {
return mapping.queue;
}
}
if (mapping.type == MappingType.GROUP) {
for (String userGroups : groups.getGroups(user)) {
if (userGroups.equals(mapping.source)) {
return mapping.queue;
}
}
}
}
return null;
}
@Override
public String getQueueForApp(ApplicationSubmissionContext asc, String user)
throws YarnException {
String queueName = asc.getQueue();
ApplicationId applicationId = asc.getApplicationId();
if (mappings != null && mappings.size() > 0) {
try {
String mappedQueue = getMappedQueue(user);
if (mappedQueue != null) {
// We have a mapping, should we use it?
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
|| overrideWithQueueMappings) {
LOG.info("Application " + applicationId + " user " + user
+ " mapping [" + queueName + "] to [" + mappedQueue
+ "] override " + overrideWithQueueMappings);
return mappedQueue;
}
}
} catch (IOException ioex) {
String message = "Failed to submit application " + applicationId +
" submitted by user " + user + " reason: " + ioex.getMessage();
throw new YarnException(message);
}
}
return queueName;
}
@VisibleForTesting
public List<QueueMapping> getQueueMappings() {
return mappings;
}
}

View File

@ -69,6 +69,9 @@
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
@ -98,8 +101,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -229,16 +230,6 @@ public Configuration getConf() {
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private boolean overrideWithQueueMappings = false;
private List<QueueMapping> mappings = null;
private Groups groups;
@VisibleForTesting
public synchronized String getMappedQueueForTest(String user)
throws IOException {
return getMappedQueue(user);
}
public CapacityScheduler() {
super(CapacityScheduler.class.getName());
}
@ -447,29 +438,52 @@ public CSQueue hook(CSQueue queue) {
}
private static final QueueHook noop = new QueueHook();
private void initializeQueueMappings() throws IOException {
overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
@VisibleForTesting
public synchronized UserGroupMappingPlacementRule
getUserGroupMappingPlacementRule() throws IOException {
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info("Initialized queue mappings, override: "
+ overrideWithQueueMappings);
// Get new user/group mappings
List<QueueMapping> newMappings = conf.getQueueMappings();
//check if mappings refer to valid queues
List<UserGroupMappingPlacementRule.QueueMapping> newMappings =
conf.getQueueMappings();
// check if mappings refer to valid queues
for (QueueMapping mapping : newMappings) {
if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
!mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
CSQueue queue = queues.get(mapping.queue);
String mappingQueue = mapping.getQueue();
if (!mappingQueue
.equals(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING)
&& !mappingQueue
.equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
CSQueue queue = queues.get(mappingQueue);
if (queue == null || !(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mapping.queue);
throw new IOException("mapping contains invalid or non-leaf queue "
+ mappingQueue);
}
}
}
//apply the new mappings since they are valid
mappings = newMappings;
// initialize groups if mappings are present
if (mappings.size() > 0) {
groups = new Groups(conf);
if (newMappings.size() > 0) {
Groups groups = new Groups(conf);
return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
newMappings, groups);
}
return null;
}
private void updatePlacementRules() throws IOException {
List<PlacementRule> placementRules = new ArrayList<>();
// Initialize UserGroupMappingPlacementRule
// TODO, need make this defineable by configuration.
UserGroupMappingPlacementRule ugRule = getUserGroupMappingPlacementRule();
if (null != ugRule) {
placementRules.add(ugRule);
}
rmContext.getQueuePlacementManager().updateRules(placementRules);
}
@Lock(CapacityScheduler.class)
@ -481,7 +495,7 @@ private void initializeQueues(CapacitySchedulerConfiguration conf)
queues, queues, noop);
labelManager.reinitializeQueueLabels(getQueueToLabels());
LOG.info("Initialized root queue " + root);
initializeQueueMappings();
updatePlacementRules();
setQueueAcls(authorizer, queues);
}
@ -502,7 +516,7 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf)
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
initializeQueueMappings();
updatePlacementRules();
// Re-calculate headroom for active applications
root.updateClusterResource(clusterResource, new ResourceLimits(
@ -647,66 +661,8 @@ public CSQueue getQueue(String queueName) {
return queues.get(queueName);
}
private static final String CURRENT_USER_MAPPING = "%user";
private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
private String getMappedQueue(String user) throws IOException {
for (QueueMapping mapping : mappings) {
if (mapping.type == MappingType.USER) {
if (mapping.source.equals(CURRENT_USER_MAPPING)) {
if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
return user;
}
else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
return groups.getGroups(user).get(0);
}
else {
return mapping.queue;
}
}
if (user.equals(mapping.source)) {
return mapping.queue;
}
}
if (mapping.type == MappingType.GROUP) {
for (String userGroups : groups.getGroups(user)) {
if (userGroups.equals(mapping.source)) {
return mapping.queue;
}
}
}
}
return null;
}
private synchronized void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering, Priority priority) {
if (mappings != null && mappings.size() > 0) {
try {
String mappedQueue = getMappedQueue(user);
if (mappedQueue != null) {
// We have a mapping, should we use it?
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
|| overrideWithQueueMappings) {
LOG.info("Application " + applicationId + " user " + user
+ " mapping [" + queueName + "] to [" + mappedQueue
+ "] override " + overrideWithQueueMappings);
queueName = mappedQueue;
RMApp rmApp = rmContext.getRMApps().get(applicationId);
rmApp.setQueue(queueName);
}
}
} catch (IOException ioex) {
String message = "Failed to submit application " + applicationId +
" submitted by user " + user + " reason: " + ioex.getMessage();
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message));
return;
}
}
// sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
@ -212,35 +213,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0;
@Private
public static class QueueMapping {
public enum MappingType {
USER("u"),
GROUP("g");
private final String type;
private MappingType(String type) {
this.type = type;
}
public String toString() {
return type;
}
};
MappingType type;
String source;
String queue;
public QueueMapping(MappingType type, String source, String queue) {
this.type = type;
this.source = source;
this.queue = queue;
}
}
@Private
public static final String AVERAGE_CAPACITY = "average-capacity";
@ -747,7 +719,7 @@ private static Collection<String> getTrimmedStringCollection(String str,
*/
public List<QueueMapping> getQueueMappings() {
List<QueueMapping> mappings =
new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
new ArrayList<QueueMapping>();
Collection<String> mappingsString =
getTrimmedStringCollection(QUEUE_MAPPING);
for (String mappingValue : mappingsString) {

View File

@ -19,16 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -40,8 +34,11 @@
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -57,11 +54,14 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@ -73,8 +73,11 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -659,6 +662,39 @@ public void testEscapeApplicationSummary() {
Assert.assertTrue(msg.contains("applicationType=MAPREDUCE"));
}
@Test
public void testRMAppSubmitWithQueueChanged() throws Exception {
// Setup a PlacementManager returns a new queue
PlacementManager placementMgr = mock(PlacementManager.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ApplicationSubmissionContext ctx =
(ApplicationSubmissionContext) invocation.getArguments()[0];
ctx.setQueue("newQueue");
return null;
}
}).when(placementMgr).placeApplication(any(ApplicationSubmissionContext.class),
any(String.class));
rmContext.setQueuePlacementManager(placementMgr);
asContext.setQueue("oldQueue");
appMonitor.submitApplication(asContext, "test");
RMApp app = rmContext.getRMApps().get(appId);
Assert.assertNotNull("app is null", app);
Assert.assertEquals("newQueue", asContext.getQueue());
// wait for event to be processed
int timeoutSecs = 0;
while ((getAppEventType() == RMAppEventType.KILL) && timeoutSecs++ < 20) {
Thread.sleep(1000);
}
Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
getAppEventType());
}
private static ResourceScheduler mockResourceScheduler() {
ResourceScheduler scheduler = mock(ResourceScheduler.class);
when(scheduler.getMinimumResourceCapability()).thenReturn(

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import java.util.Arrays;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestUserGroupMappingPlacementRule {
YarnConfiguration conf = new YarnConfiguration();
@Before
public void setup() {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
}
private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
String expectedQueue) throws YarnException {
verifyQueueMapping(queueMapping, inputUser,
YarnConfiguration.DEFAULT_QUEUE_NAME, expectedQueue, false);
}
private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
String inputQueue, String expectedQueue, boolean overwrite) throws YarnException {
Groups groups = new Groups(conf);
UserGroupMappingPlacementRule rule =
new UserGroupMappingPlacementRule(overwrite, Arrays.asList(queueMapping),
groups);
ApplicationSubmissionContext asc =
Records.newRecord(ApplicationSubmissionContext.class);
asc.setQueue(inputQueue);
String queue = rule.getQueueForApp(asc, inputUser);
Assert.assertEquals(expectedQueue, queue);
}
@Test
public void testMapping() throws YarnException {
// simple base case for mapping user to queue
verifyQueueMapping(new QueueMapping(MappingType.USER, "a", "q1"), "a", "q1");
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "agroup", "q1"),
"a", "q1");
verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "q2"), "a",
"q2");
verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "%user"),
"a", "a");
verifyQueueMapping(new QueueMapping(MappingType.USER, "%user",
"%primary_group"), "a", "agroup");
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"),
"a", "q1");
// specify overwritten, and see if user specified a queue, and it will be
// overridden
verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
"user", "q2", "q1", true);
// if overwritten not specified, it should be which user specified
verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
"user", "q2", "q2", false);
}
}

View File

@ -18,22 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
import org.junit.After;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestQueueMappings {
@ -48,14 +42,22 @@ public class TestQueueMappings {
private final static String Q2_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q2;
private MockRM resourceManager;
private CapacityScheduler cs;
private YarnConfiguration conf;
@After
public void tearDown() throws Exception {
if (resourceManager != null) {
LOG.info("Stopping the resource manager");
resourceManager.stop();
}
@Before
public void setup() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
conf = new YarnConfiguration(csConf);
cs = new CapacityScheduler();
RMContext rmContext = TestUtils.getMockRMContext();
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
cs.start();
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
@ -68,25 +70,31 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
LOG.info("Setup top-level queues q1 and q2");
}
@Test
public void testQueueMappingSpecifyingNotExistedQueue() {
// if the mapping specifies a queue that does not exist, reinitialize will
// be failed
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:user:non_existent_queue");
boolean fail = false;
try {
cs.reinitialize(conf, null);
} catch (IOException ioex) {
fail = true;
}
Assert.assertTrue("queue initialization failed for non-existent q", fail);
}
@Test
public void testQueueMappingTrimSpaces() throws IOException {
// space trimming
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
cs.reinitialize(conf, null);
checkQMapping(new QueueMapping(MappingType.USER, "a", Q1));
}
@Test (timeout = 60000)
public void testQueueMapping() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
RMContext rmContext = TestUtils.getMockRMContext();
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
cs.start();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
"true");
public void testQueueMappingParsingInvalidCases() throws Exception {
// configuration parsing tests - negative test cases
checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier");
checkInvalidQMapping(conf, cs, "u:a", "no queue specified");
@ -97,119 +105,6 @@ public void testQueueMapping() throws Exception {
checkInvalidQMapping(conf, cs, "u::", "empty source and queue");
checkInvalidQMapping(conf, cs, "u:", "missing source missing queue");
checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q");
// simple base case for mapping user to queue
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// group mapping test
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// %user tests
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2);
cs.reinitialize(conf, null);
checkQMapping("a", Q2, cs);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user");
cs.reinitialize(conf, null);
checkQMapping("a", "a", cs);
// %primary_group tests
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:%user:%primary_group");
cs.reinitialize(conf, null);
checkQMapping("a", "agroup", cs);
// non-primary group mapping
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"g:asubgroup1:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// space trimming
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
csConf = new CapacitySchedulerConfiguration();
csConf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
setupQueueConfiguration(csConf);
conf = new YarnConfiguration(csConf);
resourceManager = new MockRM(csConf);
resourceManager.start();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
"true");
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
resourceManager.getResourceScheduler().reinitialize(conf, null);
// ensure that if the user specifies a Q that is still overriden
checkAppQueue(resourceManager, "user", Q2, Q1);
// toggle admin override and retry
conf.setBoolean(
CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
false);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
setupQueueConfiguration(csConf);
resourceManager.getResourceScheduler().reinitialize(conf, null);
checkAppQueue(resourceManager, "user", Q2, Q2);
// ensure that if a user does not specify a Q, the user mapping is used
checkAppQueue(resourceManager, "user", null, Q1);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2);
setupQueueConfiguration(csConf);
resourceManager.getResourceScheduler().reinitialize(conf, null);
// ensure that if a user does not specify a Q, the group mapping is used
checkAppQueue(resourceManager, "user", null, Q2);
// if the mapping specifies a queue that does not exist, the job is rejected
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:user:non_existent_queue");
setupQueueConfiguration(csConf);
boolean fail = false;
try {
resourceManager.getResourceScheduler().reinitialize(conf, null);
}
catch (IOException ioex) {
fail = true;
}
Assert.assertTrue("queue initialization failed for non-existent q", fail);
resourceManager.stop();
}
private void checkAppQueue(MockRM resourceManager, String user,
String submissionQueue, String expected)
throws Exception {
RMApp app = resourceManager.submitApp(200, "name", user,
new HashMap<ApplicationAccessType, String>(), false, submissionQueue, -1,
null, "MAPREDUCE", false);
RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED
: RMAppState.ACCEPTED;
resourceManager.waitForState(app.getApplicationId(), expectedState);
// get scheduler app
CapacityScheduler cs = (CapacityScheduler)
resourceManager.getResourceScheduler();
SchedulerApplication schedulerApp =
cs.getSchedulerApplications().get(app.getApplicationId());
String queue = "";
if (schedulerApp != null) {
queue = schedulerApp.getQueue().getQueueName();
}
Assert.assertTrue("expected " + expected + " actual " + queue,
expected.equals(queue));
Assert.assertEquals(expected, app.getQueue());
}
private void checkInvalidQMapping(YarnConfiguration conf,
@ -227,10 +122,12 @@ private void checkInvalidQMapping(YarnConfiguration conf,
fail);
}
private void checkQMapping(String user, String expected, CapacityScheduler cs)
private void checkQMapping(QueueMapping expected)
throws IOException {
String actual = cs.getMappedQueueForTest(user);
Assert.assertTrue("expected " + expected + " actual " + actual,
expected.equals(actual));
UserGroupMappingPlacementRule rule =
(UserGroupMappingPlacementRule) cs.getRMContext()
.getQueuePlacementManager().getPlacementRules().get(0);
QueueMapping queueMapping = rule.getQueueMappings().get(0);
Assert.assertEquals(queueMapping, expected);
}
}