YARN-3635. Refactored current queue mapping implementation in CapacityScheduler to use a generic PlacementManager framework. Contributed by Wangda Tan
(cherry picked from commit 5468baa80a
)
This commit is contained in:
parent
df9bb7449c
commit
eacc18677a
|
@ -403,6 +403,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend
|
YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend
|
||||||
container allocation logic. (Wangda Tan via jianhe)
|
container allocation logic. (Wangda Tan via jianhe)
|
||||||
|
|
||||||
|
YARN-3635. Refactored current queue mapping implementation in CapacityScheduler
|
||||||
|
to use a generic PlacementManager framework. (Wangda Tan via jianhe)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena
|
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
||||||
|
@ -99,9 +100,10 @@ public class RMActiveServiceContext {
|
||||||
private long schedulerRecoveryWaitTime = 0;
|
private long schedulerRecoveryWaitTime = 0;
|
||||||
private boolean printLog = true;
|
private boolean printLog = true;
|
||||||
private boolean isSchedulerReady = false;
|
private boolean isSchedulerReady = false;
|
||||||
|
private PlacementManager queuePlacementManager = null;
|
||||||
|
|
||||||
public RMActiveServiceContext() {
|
public RMActiveServiceContext() {
|
||||||
|
queuePlacementManager = new PlacementManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -424,4 +426,16 @@ public class RMActiveServiceContext {
|
||||||
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
|
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
|
||||||
return systemCredentials;
|
return systemCredentials;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public PlacementManager getQueuePlacementManager() {
|
||||||
|
return queuePlacementManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public void setQueuePlacementManager(PlacementManager placementMgr) {
|
||||||
|
this.queuePlacementManager = placementMgr;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -326,6 +326,15 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
private RMAppImpl createAndPopulateNewRMApp(
|
private RMAppImpl createAndPopulateNewRMApp(
|
||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
String user, boolean isRecovery) throws YarnException {
|
String user, boolean isRecovery) throws YarnException {
|
||||||
|
// Do queue mapping
|
||||||
|
if (!isRecovery) {
|
||||||
|
if (rmContext.getQueuePlacementManager() != null) {
|
||||||
|
// We only do queue mapping when it's a new application
|
||||||
|
rmContext.getQueuePlacementManager().placeApplication(
|
||||||
|
submissionContext, user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||||
ResourceRequest amReq =
|
ResourceRequest amReq =
|
||||||
validateAndCreateResourceRequest(submissionContext, isRecovery);
|
validateAndCreateResourceRequest(submissionContext, isRecovery);
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
@ -124,4 +125,8 @@ public interface RMContext {
|
||||||
boolean isSchedulerReadyForAllocatingContainers();
|
boolean isSchedulerReadyForAllocatingContainers();
|
||||||
|
|
||||||
Configuration getYarnConfiguration();
|
Configuration getYarnConfiguration();
|
||||||
|
|
||||||
|
PlacementManager getQueuePlacementManager();
|
||||||
|
|
||||||
|
void setQueuePlacementManager(PlacementManager placementMgr);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
@ -76,7 +77,6 @@ public class RMContextImpl implements RMContext {
|
||||||
* individual fields.
|
* individual fields.
|
||||||
*/
|
*/
|
||||||
public RMContextImpl() {
|
public RMContextImpl() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -438,4 +438,14 @@ public class RMContextImpl implements RMContext {
|
||||||
public void setYarnConfiguration(Configuration yarnConfiguration) {
|
public void setYarnConfiguration(Configuration yarnConfiguration) {
|
||||||
this.yarnConfiguration=yarnConfiguration;
|
this.yarnConfiguration=yarnConfiguration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PlacementManager getQueuePlacementManager() {
|
||||||
|
return this.activeServiceContext.getQueuePlacementManager();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setQueuePlacementManager(PlacementManager placementMgr) {
|
||||||
|
this.activeServiceContext.setQueuePlacementManager(placementMgr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,95 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
public class PlacementManager {
|
||||||
|
private static final Log LOG = LogFactory.getLog(PlacementManager.class);
|
||||||
|
|
||||||
|
List<PlacementRule> rules;
|
||||||
|
ReadLock readLock;
|
||||||
|
WriteLock writeLock;
|
||||||
|
|
||||||
|
public PlacementManager() {
|
||||||
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
readLock = lock.readLock();
|
||||||
|
writeLock = lock.writeLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateRules(List<PlacementRule> rules) {
|
||||||
|
try {
|
||||||
|
writeLock.lock();
|
||||||
|
this.rules = rules;
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void placeApplication(ApplicationSubmissionContext asc, String user)
|
||||||
|
throws YarnException {
|
||||||
|
try {
|
||||||
|
readLock.lock();
|
||||||
|
if (null == rules || rules.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String newQueueName = null;
|
||||||
|
for (PlacementRule rule : rules) {
|
||||||
|
newQueueName = rule.getQueueForApp(asc, user);
|
||||||
|
if (newQueueName != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Failed to get where to place application
|
||||||
|
if (null == newQueueName && null == asc.getQueue()) {
|
||||||
|
String msg = "Failed to get where to place application="
|
||||||
|
+ asc.getApplicationId();
|
||||||
|
LOG.error(msg);
|
||||||
|
throw new YarnException(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set it to ApplicationSubmissionContext
|
||||||
|
if (!StringUtils.equals(asc.getQueue(), newQueueName)) {
|
||||||
|
LOG.info("Placed application=" + asc.getApplicationId() + " to queue="
|
||||||
|
+ newQueueName + ", original queue=" + asc.getQueue());
|
||||||
|
asc.setQueue(newQueueName);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public List<PlacementRule> getPlacementRules() {
|
||||||
|
return rules;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
|
||||||
|
public abstract class PlacementRule {
|
||||||
|
public String getName() {
|
||||||
|
return this.getClass().getName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initialize(Map<String, String> parameters, RMContext rmContext)
|
||||||
|
throws YarnException {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get queue for a given application
|
||||||
|
*
|
||||||
|
* @param asc application submission context
|
||||||
|
* @param user userName
|
||||||
|
*
|
||||||
|
* @throws YarnException
|
||||||
|
* if any error happens
|
||||||
|
*
|
||||||
|
* @return <p>
|
||||||
|
* non-null value means it is determined
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* null value means it is undetermined, so next {@link PlacementRule}
|
||||||
|
* in the {@link PlacementManager} will take care
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
public abstract String getQueueForApp(ApplicationSubmissionContext asc,
|
||||||
|
String user) throws YarnException;
|
||||||
|
}
|
|
@ -0,0 +1,164 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.security.Groups;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(UserGroupMappingPlacementRule.class);
|
||||||
|
|
||||||
|
public static final String CURRENT_USER_MAPPING = "%user";
|
||||||
|
|
||||||
|
public static final String PRIMARY_GROUP_MAPPING = "%primary_group";
|
||||||
|
|
||||||
|
private boolean overrideWithQueueMappings = false;
|
||||||
|
private List<QueueMapping> mappings = null;
|
||||||
|
private Groups groups;
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static class QueueMapping {
|
||||||
|
|
||||||
|
public enum MappingType {
|
||||||
|
|
||||||
|
USER("u"), GROUP("g");
|
||||||
|
private final String type;
|
||||||
|
|
||||||
|
private MappingType(String type) {
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
MappingType type;
|
||||||
|
String source;
|
||||||
|
String queue;
|
||||||
|
|
||||||
|
public QueueMapping(MappingType type, String source, String queue) {
|
||||||
|
this.type = type;
|
||||||
|
this.source = source;
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getQueue() {
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return super.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj instanceof QueueMapping) {
|
||||||
|
QueueMapping other = (QueueMapping) obj;
|
||||||
|
return (other.type.equals(type) &&
|
||||||
|
other.source.equals(source) &&
|
||||||
|
other.queue.equals(queue));
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
|
||||||
|
List<QueueMapping> newMappings, Groups groups) {
|
||||||
|
this.mappings = newMappings;
|
||||||
|
this.overrideWithQueueMappings = overrideWithQueueMappings;
|
||||||
|
this.groups = groups;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMappedQueue(String user) throws IOException {
|
||||||
|
for (QueueMapping mapping : mappings) {
|
||||||
|
if (mapping.type == MappingType.USER) {
|
||||||
|
if (mapping.source.equals(CURRENT_USER_MAPPING)) {
|
||||||
|
if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
|
||||||
|
return user;
|
||||||
|
} else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
|
||||||
|
return groups.getGroups(user).get(0);
|
||||||
|
} else {
|
||||||
|
return mapping.queue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (user.equals(mapping.source)) {
|
||||||
|
return mapping.queue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (mapping.type == MappingType.GROUP) {
|
||||||
|
for (String userGroups : groups.getGroups(user)) {
|
||||||
|
if (userGroups.equals(mapping.source)) {
|
||||||
|
return mapping.queue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getQueueForApp(ApplicationSubmissionContext asc, String user)
|
||||||
|
throws YarnException {
|
||||||
|
String queueName = asc.getQueue();
|
||||||
|
ApplicationId applicationId = asc.getApplicationId();
|
||||||
|
if (mappings != null && mappings.size() > 0) {
|
||||||
|
try {
|
||||||
|
String mappedQueue = getMappedQueue(user);
|
||||||
|
if (mappedQueue != null) {
|
||||||
|
// We have a mapping, should we use it?
|
||||||
|
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
|
||||||
|
|| overrideWithQueueMappings) {
|
||||||
|
LOG.info("Application " + applicationId + " user " + user
|
||||||
|
+ " mapping [" + queueName + "] to [" + mappedQueue
|
||||||
|
+ "] override " + overrideWithQueueMappings);
|
||||||
|
return mappedQueue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException ioex) {
|
||||||
|
String message = "Failed to submit application " + applicationId +
|
||||||
|
" submitted by user " + user + " reason: " + ioex.getMessage();
|
||||||
|
throw new YarnException(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return queueName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public List<QueueMapping> getQueueMappings() {
|
||||||
|
return mappings;
|
||||||
|
}
|
||||||
|
}
|
|
@ -69,6 +69,9 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||||
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
|
||||||
|
@ -98,8 +101,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
@ -229,16 +230,6 @@ public class CapacityScheduler extends
|
||||||
+ ".scheduling-interval-ms";
|
+ ".scheduling-interval-ms";
|
||||||
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
|
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
|
||||||
|
|
||||||
private boolean overrideWithQueueMappings = false;
|
|
||||||
private List<QueueMapping> mappings = null;
|
|
||||||
private Groups groups;
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public synchronized String getMappedQueueForTest(String user)
|
|
||||||
throws IOException {
|
|
||||||
return getMappedQueue(user);
|
|
||||||
}
|
|
||||||
|
|
||||||
public CapacityScheduler() {
|
public CapacityScheduler() {
|
||||||
super(CapacityScheduler.class.getName());
|
super(CapacityScheduler.class.getName());
|
||||||
}
|
}
|
||||||
|
@ -447,29 +438,52 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
private static final QueueHook noop = new QueueHook();
|
private static final QueueHook noop = new QueueHook();
|
||||||
|
|
||||||
private void initializeQueueMappings() throws IOException {
|
@VisibleForTesting
|
||||||
overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
|
public synchronized UserGroupMappingPlacementRule
|
||||||
|
getUserGroupMappingPlacementRule() throws IOException {
|
||||||
|
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
|
||||||
LOG.info("Initialized queue mappings, override: "
|
LOG.info("Initialized queue mappings, override: "
|
||||||
+ overrideWithQueueMappings);
|
+ overrideWithQueueMappings);
|
||||||
|
|
||||||
// Get new user/group mappings
|
// Get new user/group mappings
|
||||||
List<QueueMapping> newMappings = conf.getQueueMappings();
|
List<UserGroupMappingPlacementRule.QueueMapping> newMappings =
|
||||||
//check if mappings refer to valid queues
|
conf.getQueueMappings();
|
||||||
|
// check if mappings refer to valid queues
|
||||||
for (QueueMapping mapping : newMappings) {
|
for (QueueMapping mapping : newMappings) {
|
||||||
if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
|
String mappingQueue = mapping.getQueue();
|
||||||
!mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
|
if (!mappingQueue
|
||||||
CSQueue queue = queues.get(mapping.queue);
|
.equals(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING)
|
||||||
|
&& !mappingQueue
|
||||||
|
.equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
|
||||||
|
CSQueue queue = queues.get(mappingQueue);
|
||||||
if (queue == null || !(queue instanceof LeafQueue)) {
|
if (queue == null || !(queue instanceof LeafQueue)) {
|
||||||
throw new IOException(
|
throw new IOException("mapping contains invalid or non-leaf queue "
|
||||||
"mapping contains invalid or non-leaf queue " + mapping.queue);
|
+ mappingQueue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//apply the new mappings since they are valid
|
|
||||||
mappings = newMappings;
|
|
||||||
// initialize groups if mappings are present
|
// initialize groups if mappings are present
|
||||||
if (mappings.size() > 0) {
|
if (newMappings.size() > 0) {
|
||||||
groups = new Groups(conf);
|
Groups groups = new Groups(conf);
|
||||||
|
return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
|
||||||
|
newMappings, groups);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updatePlacementRules() throws IOException {
|
||||||
|
List<PlacementRule> placementRules = new ArrayList<>();
|
||||||
|
|
||||||
|
// Initialize UserGroupMappingPlacementRule
|
||||||
|
// TODO, need make this defineable by configuration.
|
||||||
|
UserGroupMappingPlacementRule ugRule = getUserGroupMappingPlacementRule();
|
||||||
|
if (null != ugRule) {
|
||||||
|
placementRules.add(ugRule);
|
||||||
|
}
|
||||||
|
|
||||||
|
rmContext.getQueuePlacementManager().updateRules(placementRules);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Lock(CapacityScheduler.class)
|
@Lock(CapacityScheduler.class)
|
||||||
|
@ -481,7 +495,7 @@ public class CapacityScheduler extends
|
||||||
queues, queues, noop);
|
queues, queues, noop);
|
||||||
labelManager.reinitializeQueueLabels(getQueueToLabels());
|
labelManager.reinitializeQueueLabels(getQueueToLabels());
|
||||||
LOG.info("Initialized root queue " + root);
|
LOG.info("Initialized root queue " + root);
|
||||||
initializeQueueMappings();
|
updatePlacementRules();
|
||||||
setQueueAcls(authorizer, queues);
|
setQueueAcls(authorizer, queues);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -502,7 +516,7 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
// Re-configure queues
|
// Re-configure queues
|
||||||
root.reinitialize(newRoot, clusterResource);
|
root.reinitialize(newRoot, clusterResource);
|
||||||
initializeQueueMappings();
|
updatePlacementRules();
|
||||||
|
|
||||||
// Re-calculate headroom for active applications
|
// Re-calculate headroom for active applications
|
||||||
root.updateClusterResource(clusterResource, new ResourceLimits(
|
root.updateClusterResource(clusterResource, new ResourceLimits(
|
||||||
|
@ -647,66 +661,8 @@ public class CapacityScheduler extends
|
||||||
return queues.get(queueName);
|
return queues.get(queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final String CURRENT_USER_MAPPING = "%user";
|
|
||||||
|
|
||||||
private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
|
|
||||||
|
|
||||||
private String getMappedQueue(String user) throws IOException {
|
|
||||||
for (QueueMapping mapping : mappings) {
|
|
||||||
if (mapping.type == MappingType.USER) {
|
|
||||||
if (mapping.source.equals(CURRENT_USER_MAPPING)) {
|
|
||||||
if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
|
|
||||||
return user;
|
|
||||||
}
|
|
||||||
else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
|
|
||||||
return groups.getGroups(user).get(0);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
return mapping.queue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (user.equals(mapping.source)) {
|
|
||||||
return mapping.queue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (mapping.type == MappingType.GROUP) {
|
|
||||||
for (String userGroups : groups.getGroups(user)) {
|
|
||||||
if (userGroups.equals(mapping.source)) {
|
|
||||||
return mapping.queue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized void addApplication(ApplicationId applicationId,
|
private synchronized void addApplication(ApplicationId applicationId,
|
||||||
String queueName, String user, boolean isAppRecovering, Priority priority) {
|
String queueName, String user, boolean isAppRecovering, Priority priority) {
|
||||||
|
|
||||||
if (mappings != null && mappings.size() > 0) {
|
|
||||||
try {
|
|
||||||
String mappedQueue = getMappedQueue(user);
|
|
||||||
if (mappedQueue != null) {
|
|
||||||
// We have a mapping, should we use it?
|
|
||||||
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
|
|
||||||
|| overrideWithQueueMappings) {
|
|
||||||
LOG.info("Application " + applicationId + " user " + user
|
|
||||||
+ " mapping [" + queueName + "] to [" + mappedQueue
|
|
||||||
+ "] override " + overrideWithQueueMappings);
|
|
||||||
queueName = mappedQueue;
|
|
||||||
RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
|
||||||
rmApp.setQueue(queueName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException ioex) {
|
|
||||||
String message = "Failed to submit application " + applicationId +
|
|
||||||
" submitted by user " + user + " reason: " + ioex.getMessage();
|
|
||||||
this.rmContext.getDispatcher().getEventHandler()
|
|
||||||
.handle(new RMAppRejectedEvent(applicationId, message));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// sanity checks.
|
// sanity checks.
|
||||||
CSQueue queue = getQueue(queueName);
|
CSQueue queue = getQueue(queueName);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.security.AccessType;
|
import org.apache.hadoop.yarn.security.AccessType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
|
||||||
|
@ -212,35 +213,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
@Private
|
@Private
|
||||||
public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0;
|
public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0;
|
||||||
|
|
||||||
@Private
|
|
||||||
public static class QueueMapping {
|
|
||||||
|
|
||||||
public enum MappingType {
|
|
||||||
|
|
||||||
USER("u"),
|
|
||||||
GROUP("g");
|
|
||||||
private final String type;
|
|
||||||
private MappingType(String type) {
|
|
||||||
this.type = type;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
MappingType type;
|
|
||||||
String source;
|
|
||||||
String queue;
|
|
||||||
|
|
||||||
public QueueMapping(MappingType type, String source, String queue) {
|
|
||||||
this.type = type;
|
|
||||||
this.source = source;
|
|
||||||
this.queue = queue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final String AVERAGE_CAPACITY = "average-capacity";
|
public static final String AVERAGE_CAPACITY = "average-capacity";
|
||||||
|
|
||||||
|
@ -747,7 +719,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
*/
|
*/
|
||||||
public List<QueueMapping> getQueueMappings() {
|
public List<QueueMapping> getQueueMappings() {
|
||||||
List<QueueMapping> mappings =
|
List<QueueMapping> mappings =
|
||||||
new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
|
new ArrayList<QueueMapping>();
|
||||||
Collection<String> mappingsString =
|
Collection<String> mappingsString =
|
||||||
getTrimmedStringCollection(QUEUE_MAPPING);
|
getTrimmedStringCollection(QUEUE_MAPPING);
|
||||||
for (String mappingValue : mappingsString) {
|
for (String mappingValue : mappingsString) {
|
||||||
|
|
|
@ -19,16 +19,10 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
|
||||||
import org.apache.hadoop.security.Credentials;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
|
||||||
|
|
||||||
import static org.mockito.Matchers.isA;
|
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyLong;
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
|
@ -40,8 +34,11 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.yarn.MockApps;
|
import org.apache.hadoop.yarn.MockApps;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
|
@ -57,11 +54,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||||
|
@ -73,8 +73,11 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
@ -659,6 +662,39 @@ public class TestAppManager{
|
||||||
Assert.assertTrue(msg.contains("applicationType=MAPREDUCE"));
|
Assert.assertTrue(msg.contains("applicationType=MAPREDUCE"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMAppSubmitWithQueueChanged() throws Exception {
|
||||||
|
// Setup a PlacementManager returns a new queue
|
||||||
|
PlacementManager placementMgr = mock(PlacementManager.class);
|
||||||
|
doAnswer(new Answer<Void>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
ApplicationSubmissionContext ctx =
|
||||||
|
(ApplicationSubmissionContext) invocation.getArguments()[0];
|
||||||
|
ctx.setQueue("newQueue");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}).when(placementMgr).placeApplication(any(ApplicationSubmissionContext.class),
|
||||||
|
any(String.class));
|
||||||
|
rmContext.setQueuePlacementManager(placementMgr);
|
||||||
|
|
||||||
|
asContext.setQueue("oldQueue");
|
||||||
|
appMonitor.submitApplication(asContext, "test");
|
||||||
|
RMApp app = rmContext.getRMApps().get(appId);
|
||||||
|
Assert.assertNotNull("app is null", app);
|
||||||
|
Assert.assertEquals("newQueue", asContext.getQueue());
|
||||||
|
|
||||||
|
// wait for event to be processed
|
||||||
|
int timeoutSecs = 0;
|
||||||
|
while ((getAppEventType() == RMAppEventType.KILL) && timeoutSecs++ < 20) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
|
||||||
|
getAppEventType());
|
||||||
|
}
|
||||||
|
|
||||||
private static ResourceScheduler mockResourceScheduler() {
|
private static ResourceScheduler mockResourceScheduler() {
|
||||||
ResourceScheduler scheduler = mock(ResourceScheduler.class);
|
ResourceScheduler scheduler = mock(ResourceScheduler.class);
|
||||||
when(scheduler.getMinimumResourceCapability()).thenReturn(
|
when(scheduler.getMinimumResourceCapability()).thenReturn(
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
||||||
|
import org.apache.hadoop.security.Groups;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestUserGroupMappingPlacementRule {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||||
|
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
|
||||||
|
String expectedQueue) throws YarnException {
|
||||||
|
verifyQueueMapping(queueMapping, inputUser,
|
||||||
|
YarnConfiguration.DEFAULT_QUEUE_NAME, expectedQueue, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
|
||||||
|
String inputQueue, String expectedQueue, boolean overwrite) throws YarnException {
|
||||||
|
Groups groups = new Groups(conf);
|
||||||
|
UserGroupMappingPlacementRule rule =
|
||||||
|
new UserGroupMappingPlacementRule(overwrite, Arrays.asList(queueMapping),
|
||||||
|
groups);
|
||||||
|
ApplicationSubmissionContext asc =
|
||||||
|
Records.newRecord(ApplicationSubmissionContext.class);
|
||||||
|
asc.setQueue(inputQueue);
|
||||||
|
String queue = rule.getQueueForApp(asc, inputUser);
|
||||||
|
Assert.assertEquals(expectedQueue, queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMapping() throws YarnException {
|
||||||
|
// simple base case for mapping user to queue
|
||||||
|
verifyQueueMapping(new QueueMapping(MappingType.USER, "a", "q1"), "a", "q1");
|
||||||
|
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "agroup", "q1"),
|
||||||
|
"a", "q1");
|
||||||
|
verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "q2"), "a",
|
||||||
|
"q2");
|
||||||
|
verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "%user"),
|
||||||
|
"a", "a");
|
||||||
|
verifyQueueMapping(new QueueMapping(MappingType.USER, "%user",
|
||||||
|
"%primary_group"), "a", "agroup");
|
||||||
|
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"),
|
||||||
|
"a", "q1");
|
||||||
|
|
||||||
|
// specify overwritten, and see if user specified a queue, and it will be
|
||||||
|
// overridden
|
||||||
|
verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
|
||||||
|
"user", "q2", "q1", true);
|
||||||
|
|
||||||
|
// if overwritten not specified, it should be which user specified
|
||||||
|
verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
|
||||||
|
"user", "q2", "q2", false);
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,22 +18,16 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestQueueMappings {
|
public class TestQueueMappings {
|
||||||
|
@ -48,14 +42,22 @@ public class TestQueueMappings {
|
||||||
private final static String Q2_PATH =
|
private final static String Q2_PATH =
|
||||||
CapacitySchedulerConfiguration.ROOT + "." + Q2;
|
CapacitySchedulerConfiguration.ROOT + "." + Q2;
|
||||||
|
|
||||||
private MockRM resourceManager;
|
private CapacityScheduler cs;
|
||||||
|
private YarnConfiguration conf;
|
||||||
|
|
||||||
@After
|
@Before
|
||||||
public void tearDown() throws Exception {
|
public void setup() {
|
||||||
if (resourceManager != null) {
|
CapacitySchedulerConfiguration csConf =
|
||||||
LOG.info("Stopping the resource manager");
|
new CapacitySchedulerConfiguration();
|
||||||
resourceManager.stop();
|
setupQueueConfiguration(csConf);
|
||||||
}
|
conf = new YarnConfiguration(csConf);
|
||||||
|
cs = new CapacityScheduler();
|
||||||
|
|
||||||
|
RMContext rmContext = TestUtils.getMockRMContext();
|
||||||
|
cs.setConf(conf);
|
||||||
|
cs.setRMContext(rmContext);
|
||||||
|
cs.init(conf);
|
||||||
|
cs.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
||||||
|
@ -68,25 +70,31 @@ public class TestQueueMappings {
|
||||||
LOG.info("Setup top-level queues q1 and q2");
|
LOG.info("Setup top-level queues q1 and q2");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueMappingSpecifyingNotExistedQueue() {
|
||||||
|
// if the mapping specifies a queue that does not exist, reinitialize will
|
||||||
|
// be failed
|
||||||
|
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
|
||||||
|
"u:user:non_existent_queue");
|
||||||
|
boolean fail = false;
|
||||||
|
try {
|
||||||
|
cs.reinitialize(conf, null);
|
||||||
|
} catch (IOException ioex) {
|
||||||
|
fail = true;
|
||||||
|
}
|
||||||
|
Assert.assertTrue("queue initialization failed for non-existent q", fail);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueMappingTrimSpaces() throws IOException {
|
||||||
|
// space trimming
|
||||||
|
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
|
||||||
|
cs.reinitialize(conf, null);
|
||||||
|
checkQMapping(new QueueMapping(MappingType.USER, "a", Q1));
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testQueueMapping() throws Exception {
|
public void testQueueMappingParsingInvalidCases() throws Exception {
|
||||||
CapacitySchedulerConfiguration csConf =
|
|
||||||
new CapacitySchedulerConfiguration();
|
|
||||||
setupQueueConfiguration(csConf);
|
|
||||||
YarnConfiguration conf = new YarnConfiguration(csConf);
|
|
||||||
CapacityScheduler cs = new CapacityScheduler();
|
|
||||||
|
|
||||||
RMContext rmContext = TestUtils.getMockRMContext();
|
|
||||||
cs.setConf(conf);
|
|
||||||
cs.setRMContext(rmContext);
|
|
||||||
cs.init(conf);
|
|
||||||
cs.start();
|
|
||||||
|
|
||||||
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
|
||||||
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
|
||||||
conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
|
|
||||||
"true");
|
|
||||||
|
|
||||||
// configuration parsing tests - negative test cases
|
// configuration parsing tests - negative test cases
|
||||||
checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier");
|
checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier");
|
||||||
checkInvalidQMapping(conf, cs, "u:a", "no queue specified");
|
checkInvalidQMapping(conf, cs, "u:a", "no queue specified");
|
||||||
|
@ -97,119 +105,6 @@ public class TestQueueMappings {
|
||||||
checkInvalidQMapping(conf, cs, "u::", "empty source and queue");
|
checkInvalidQMapping(conf, cs, "u::", "empty source and queue");
|
||||||
checkInvalidQMapping(conf, cs, "u:", "missing source missing queue");
|
checkInvalidQMapping(conf, cs, "u:", "missing source missing queue");
|
||||||
checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q");
|
checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q");
|
||||||
|
|
||||||
// simple base case for mapping user to queue
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1);
|
|
||||||
cs.reinitialize(conf, null);
|
|
||||||
checkQMapping("a", Q1, cs);
|
|
||||||
|
|
||||||
// group mapping test
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1);
|
|
||||||
cs.reinitialize(conf, null);
|
|
||||||
checkQMapping("a", Q1, cs);
|
|
||||||
|
|
||||||
// %user tests
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2);
|
|
||||||
cs.reinitialize(conf, null);
|
|
||||||
checkQMapping("a", Q2, cs);
|
|
||||||
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user");
|
|
||||||
cs.reinitialize(conf, null);
|
|
||||||
checkQMapping("a", "a", cs);
|
|
||||||
|
|
||||||
// %primary_group tests
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
|
|
||||||
"u:%user:%primary_group");
|
|
||||||
cs.reinitialize(conf, null);
|
|
||||||
checkQMapping("a", "agroup", cs);
|
|
||||||
|
|
||||||
// non-primary group mapping
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
|
|
||||||
"g:asubgroup1:" + Q1);
|
|
||||||
cs.reinitialize(conf, null);
|
|
||||||
checkQMapping("a", Q1, cs);
|
|
||||||
|
|
||||||
// space trimming
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
|
|
||||||
cs.reinitialize(conf, null);
|
|
||||||
checkQMapping("a", Q1, cs);
|
|
||||||
|
|
||||||
csConf = new CapacitySchedulerConfiguration();
|
|
||||||
csConf.set(YarnConfiguration.RM_SCHEDULER,
|
|
||||||
CapacityScheduler.class.getName());
|
|
||||||
setupQueueConfiguration(csConf);
|
|
||||||
conf = new YarnConfiguration(csConf);
|
|
||||||
|
|
||||||
resourceManager = new MockRM(csConf);
|
|
||||||
resourceManager.start();
|
|
||||||
|
|
||||||
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
|
||||||
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
|
||||||
conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
|
|
||||||
"true");
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
|
|
||||||
resourceManager.getResourceScheduler().reinitialize(conf, null);
|
|
||||||
|
|
||||||
// ensure that if the user specifies a Q that is still overriden
|
|
||||||
checkAppQueue(resourceManager, "user", Q2, Q1);
|
|
||||||
|
|
||||||
// toggle admin override and retry
|
|
||||||
conf.setBoolean(
|
|
||||||
CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
|
|
||||||
false);
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
|
|
||||||
setupQueueConfiguration(csConf);
|
|
||||||
resourceManager.getResourceScheduler().reinitialize(conf, null);
|
|
||||||
|
|
||||||
checkAppQueue(resourceManager, "user", Q2, Q2);
|
|
||||||
|
|
||||||
// ensure that if a user does not specify a Q, the user mapping is used
|
|
||||||
checkAppQueue(resourceManager, "user", null, Q1);
|
|
||||||
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2);
|
|
||||||
setupQueueConfiguration(csConf);
|
|
||||||
resourceManager.getResourceScheduler().reinitialize(conf, null);
|
|
||||||
|
|
||||||
// ensure that if a user does not specify a Q, the group mapping is used
|
|
||||||
checkAppQueue(resourceManager, "user", null, Q2);
|
|
||||||
|
|
||||||
// if the mapping specifies a queue that does not exist, the job is rejected
|
|
||||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
|
|
||||||
"u:user:non_existent_queue");
|
|
||||||
setupQueueConfiguration(csConf);
|
|
||||||
|
|
||||||
boolean fail = false;
|
|
||||||
try {
|
|
||||||
resourceManager.getResourceScheduler().reinitialize(conf, null);
|
|
||||||
}
|
|
||||||
catch (IOException ioex) {
|
|
||||||
fail = true;
|
|
||||||
}
|
|
||||||
Assert.assertTrue("queue initialization failed for non-existent q", fail);
|
|
||||||
resourceManager.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkAppQueue(MockRM resourceManager, String user,
|
|
||||||
String submissionQueue, String expected)
|
|
||||||
throws Exception {
|
|
||||||
RMApp app = resourceManager.submitApp(200, "name", user,
|
|
||||||
new HashMap<ApplicationAccessType, String>(), false, submissionQueue, -1,
|
|
||||||
null, "MAPREDUCE", false);
|
|
||||||
RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED
|
|
||||||
: RMAppState.ACCEPTED;
|
|
||||||
resourceManager.waitForState(app.getApplicationId(), expectedState);
|
|
||||||
// get scheduler app
|
|
||||||
CapacityScheduler cs = (CapacityScheduler)
|
|
||||||
resourceManager.getResourceScheduler();
|
|
||||||
SchedulerApplication schedulerApp =
|
|
||||||
cs.getSchedulerApplications().get(app.getApplicationId());
|
|
||||||
String queue = "";
|
|
||||||
if (schedulerApp != null) {
|
|
||||||
queue = schedulerApp.getQueue().getQueueName();
|
|
||||||
}
|
|
||||||
Assert.assertTrue("expected " + expected + " actual " + queue,
|
|
||||||
expected.equals(queue));
|
|
||||||
Assert.assertEquals(expected, app.getQueue());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkInvalidQMapping(YarnConfiguration conf,
|
private void checkInvalidQMapping(YarnConfiguration conf,
|
||||||
|
@ -227,10 +122,12 @@ public class TestQueueMappings {
|
||||||
fail);
|
fail);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkQMapping(String user, String expected, CapacityScheduler cs)
|
private void checkQMapping(QueueMapping expected)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String actual = cs.getMappedQueueForTest(user);
|
UserGroupMappingPlacementRule rule =
|
||||||
Assert.assertTrue("expected " + expected + " actual " + actual,
|
(UserGroupMappingPlacementRule) cs.getRMContext()
|
||||||
expected.equals(actual));
|
.getQueuePlacementManager().getPlacementRules().get(0);
|
||||||
|
QueueMapping queueMapping = rule.getQueueMappings().get(0);
|
||||||
|
Assert.assertEquals(queueMapping, expected);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue