YARN-5949. Add pluggable configuration ACL policy interface and implementation. (Jonathan Hung via wangda)

Change-Id: Ib98e82ff753bede21fcab2e6ca9ec1e7a5a2008f
This commit is contained in:
Wangda Tan 2017-05-22 13:38:31 -07:00 committed by Jonathan Hung
parent d1514bacd1
commit 5fd1668a27
13 changed files with 610 additions and 150 deletions

View File

@ -651,6 +651,9 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_CONFIGURATION_STORE = public static final String DEFAULT_CONFIGURATION_STORE =
MEMORY_CONFIGURATION_STORE; MEMORY_CONFIGURATION_STORE;
public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS =
YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class";
public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX
+ "authorization-provider"; + "authorization-provider";
private static final List<String> RM_SERVICES_ADDRESS_CONF_KEYS_HTTP = private static final List<String> RM_SERVICES_ADDRESS_CONF_KEYS_HTTP =

View File

@ -3352,4 +3352,15 @@
<value>memory</value> <value>memory</value>
</property> </property>
<property>
<description>
The class to use for configuration mutation ACL policy if using a mutable
configuration provider. Controls whether a mutation request is allowed.
The DefaultConfigurationMutationACLPolicy checks if the requestor is a
YARN admin.
</description>
<name>yarn.scheduler.configuration.mutation.acl-policy.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.DefaultConfigurationMutationACLPolicy</value>
</property>
</configuration> </configuration>

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
/**
* Interface for determining whether configuration mutations are allowed.
*/
public interface ConfigurationMutationACLPolicy {
/**
* Initialize ACL policy with configuration and RMContext.
* @param conf Configuration to initialize with.
* @param rmContext rmContext
*/
void init(Configuration conf, RMContext rmContext);
/**
* Check if mutation is allowed.
* @param user User issuing the request
* @param confUpdate configurations to be updated
* @return whether provided mutation is allowed or not
*/
boolean isMutationAllowed(UserGroupInformation user, QueueConfigsUpdateInfo
confUpdate);
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* Factory class for creating instances of
* {@link ConfigurationMutationACLPolicy}.
*/
public final class ConfigurationMutationACLPolicyFactory {
private static final Log LOG = LogFactory.getLog(
ConfigurationMutationACLPolicyFactory.class);
private ConfigurationMutationACLPolicyFactory() {
// Unused.
}
public static ConfigurationMutationACLPolicy getPolicy(Configuration conf) {
Class<? extends ConfigurationMutationACLPolicy> policyClass =
conf.getClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS,
DefaultConfigurationMutationACLPolicy.class,
ConfigurationMutationACLPolicy.class);
LOG.info("Using ConfigurationMutationACLPolicy implementation - " +
policyClass);
return ReflectionUtils.newInstance(policyClass, conf);
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
/**
* Default configuration mutation ACL policy. Checks if user is YARN admin.
*/
public class DefaultConfigurationMutationACLPolicy implements
ConfigurationMutationACLPolicy {
private YarnAuthorizationProvider authorizer;
@Override
public void init(Configuration conf, RMContext rmContext) {
authorizer = YarnAuthorizationProvider.getInstance(conf);
}
@Override
public boolean isMutationAllowed(UserGroupInformation user,
QueueConfigsUpdateInfo confUpdate) {
return authorizer.isAdmin(user);
}
}

View File

@ -17,10 +17,11 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
/** /**
* Interface for a scheduler that supports changing configuration at runtime. * Interface for a scheduler that supports changing configuration at runtime.
@ -31,10 +32,22 @@ public interface MutableConfScheduler extends ResourceScheduler {
/** /**
* Update the scheduler's configuration. * Update the scheduler's configuration.
* @param user Caller of this update * @param user Caller of this update
* @param confUpdate key-value map of the configuration update * @param confUpdate configuration update
* @throws IOException if update is invalid * @throws IOException if update is invalid
*/ */
void updateConfiguration(UserGroupInformation user, void updateConfiguration(UserGroupInformation user,
Map<String, String> confUpdate) throws IOException; QueueConfigsUpdateInfo confUpdate) throws IOException;
/**
* Get the scheduler configuration.
* @return the scheduler configuration
*/
Configuration getConfiguration();
/**
* Get queue object based on queue name.
* @param queueName the queue name
* @return the queue object
*/
Queue getQueue(String queueName);
} }

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
/** /**
* Interface for allowing changing scheduler configurations. * Interface for allowing changing scheduler configurations.
@ -32,7 +34,7 @@ public interface MutableConfigurationProvider {
* @param confUpdate Key-value pairs for configurations to be updated. * @param confUpdate Key-value pairs for configurations to be updated.
* @throws IOException if scheduler could not be reinitialized * @throws IOException if scheduler could not be reinitialized
*/ */
void mutateConfiguration(String user, Map<String, String> confUpdate) void mutateConfiguration(UserGroupInformation user, QueueConfigsUpdateInfo
throws IOException; confUpdate) throws IOException;
} }

View File

@ -137,6 +137,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.Placeme
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -649,6 +650,7 @@ public class CapacityScheduler extends
preemptionManager.refreshQueues(null, this.getRootQueue()); preemptionManager.refreshQueues(null, this.getRootQueue());
} }
@Override
public CSQueue getQueue(String queueName) { public CSQueue getQueue(String queueName) {
if (queueName == null) { if (queueName == null) {
return null; return null;
@ -2615,10 +2617,10 @@ public class CapacityScheduler extends
@Override @Override
public void updateConfiguration(UserGroupInformation user, public void updateConfiguration(UserGroupInformation user,
Map<String, String> confUpdate) throws IOException { QueueConfigsUpdateInfo confUpdate) throws IOException {
if (csConfProvider instanceof MutableConfigurationProvider) { if (csConfProvider instanceof MutableConfigurationProvider) {
((MutableConfigurationProvider) csConfProvider).mutateConfiguration( ((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
user.getShortUserName(), confUpdate); user, confUpdate);
} else { } else {
throw new UnsupportedOperationException("Configured CS configuration " + throw new UnsupportedOperationException("Configured CS configuration " +
"provider does not support updating configuration."); "provider does not support updating configuration.");

View File

@ -18,14 +18,27 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import com.google.common.base.Joiner;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -38,6 +51,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
private Configuration schedConf; private Configuration schedConf;
private YarnConfigurationStore confStore; private YarnConfigurationStore confStore;
private ConfigurationMutationACLPolicy aclMutationPolicy;
private RMContext rmContext; private RMContext rmContext;
private Configuration conf; private Configuration conf;
@ -68,6 +82,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
schedConf.set(kv.getKey(), kv.getValue()); schedConf.set(kv.getKey(), kv.getValue());
} }
confStore.initialize(config, schedConf); confStore.initialize(config, schedConf);
this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory
.getPolicy(config);
aclMutationPolicy.init(config, rmContext);
this.conf = config; this.conf = config;
} }
@ -80,12 +97,17 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
} }
@Override @Override
public void mutateConfiguration(String user, public void mutateConfiguration(UserGroupInformation user,
Map<String, String> confUpdate) throws IOException { QueueConfigsUpdateInfo confUpdate) throws IOException {
if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
throw new AccessControlException("User is not admin of all modified" +
" queues.");
}
Configuration oldConf = new Configuration(schedConf); Configuration oldConf = new Configuration(schedConf);
LogMutation log = new LogMutation(confUpdate, user); Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
long id = confStore.logMutation(log); long id = confStore.logMutation(log);
for (Map.Entry<String, String> kv : confUpdate.entrySet()) { for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
if (kv.getValue() == null) { if (kv.getValue() == null) {
schedConf.unset(kv.getKey()); schedConf.unset(kv.getKey());
} else { } else {
@ -101,4 +123,125 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
} }
confStore.confirmMutation(id, true); confStore.confirmMutation(id, true);
} }
private Map<String, String> constructKeyValueConfUpdate(
QueueConfigsUpdateInfo mutationInfo) throws IOException {
CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
CapacitySchedulerConfiguration proposedConf =
new CapacitySchedulerConfiguration(cs.getConfiguration(), false);
Map<String, String> confUpdate = new HashMap<>();
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
removeQueue(queueToRemove, proposedConf, confUpdate);
}
for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
addQueue(addQueueInfo, proposedConf, confUpdate);
}
for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
updateQueue(updateQueueInfo, proposedConf, confUpdate);
}
return confUpdate;
}
private void removeQueue(
String queueToRemove, CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) throws IOException {
if (queueToRemove == null) {
return;
} else {
CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
String queueName = queueToRemove.substring(
queueToRemove.lastIndexOf('.') + 1);
CSQueue queue = cs.getQueue(queueName);
if (queue == null ||
!queue.getQueuePath().equals(queueToRemove)) {
throw new IOException("Queue " + queueToRemove + " not found");
} else if (queueToRemove.lastIndexOf('.') == -1) {
throw new IOException("Can't remove queue " + queueToRemove);
}
String parentQueuePath = queueToRemove.substring(0, queueToRemove
.lastIndexOf('.'));
String[] siblingQueues = proposedConf.getQueues(parentQueuePath);
List<String> newSiblingQueues = new ArrayList<>();
for (String siblingQueue : siblingQueues) {
if (!siblingQueue.equals(queueName)) {
newSiblingQueues.add(siblingQueue);
}
}
proposedConf.setQueues(parentQueuePath, newSiblingQueues
.toArray(new String[0]));
String queuesConfig = CapacitySchedulerConfiguration.PREFIX
+ parentQueuePath + CapacitySchedulerConfiguration.DOT
+ CapacitySchedulerConfiguration.QUEUES;
if (newSiblingQueues.size() == 0) {
confUpdate.put(queuesConfig, null);
} else {
confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues));
}
for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
.entrySet()) {
proposedConf.unset(confRemove.getKey());
confUpdate.put(confRemove.getKey(), null);
}
}
}
private void addQueue(
QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) throws IOException {
if (addInfo == null) {
return;
} else {
CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
String queuePath = addInfo.getQueue();
String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
if (cs.getQueue(queueName) != null) {
throw new IOException("Can't add existing queue " + queuePath);
} else if (queuePath.lastIndexOf('.') == -1) {
throw new IOException("Can't add invalid queue " + queuePath);
}
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
String[] siblings = proposedConf.getQueues(parentQueue);
List<String> siblingQueues = siblings == null ? new ArrayList<>() :
new ArrayList<>(Arrays.<String>asList(siblings));
siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
proposedConf.setQueues(parentQueue,
siblingQueues.toArray(new String[0]));
confUpdate.put(CapacitySchedulerConfiguration.PREFIX
+ parentQueue + CapacitySchedulerConfiguration.DOT
+ CapacitySchedulerConfiguration.QUEUES,
Joiner.on(',').join(siblingQueues));
String keyPrefix = CapacitySchedulerConfiguration.PREFIX
+ queuePath + CapacitySchedulerConfiguration.DOT;
for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
if (kv.getValue() == null) {
proposedConf.unset(keyPrefix + kv.getKey());
} else {
proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
}
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
}
}
}
private void updateQueue(QueueConfigInfo updateInfo,
CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) {
if (updateInfo == null) {
return;
} else {
String queuePath = updateInfo.getQueue();
String keyPrefix = CapacitySchedulerConfiguration.PREFIX
+ queuePath + CapacitySchedulerConfiguration.DOT;
for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
if (kv.getValue() == null) {
proposedConf.unset(keyPrefix + kv.getKey());
} else {
proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
}
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
}
}
}
} }

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
* A configuration mutation ACL policy which checks that user has admin
* privileges on all queues they are changing.
*/
public class QueueAdminConfigurationMutationACLPolicy implements
ConfigurationMutationACLPolicy {
private RMContext rmContext;
@Override
public void init(Configuration conf, RMContext context) {
this.rmContext = context;
}
@Override
public boolean isMutationAllowed(UserGroupInformation user,
QueueConfigsUpdateInfo confUpdate) {
Set<String> queues = new HashSet<>();
for (QueueConfigInfo addQueueInfo : confUpdate.getAddQueueInfo()) {
queues.add(addQueueInfo.getQueue());
}
for (String removeQueue : confUpdate.getRemoveQueueInfo()) {
queues.add(removeQueue);
}
for (QueueConfigInfo updateQueueInfo : confUpdate.getUpdateQueueInfo()) {
queues.add(updateQueueInfo.getQueue());
}
for (String queuePath : queues) {
String queueName = queuePath.lastIndexOf('.') != -1 ?
queuePath.substring(queuePath.lastIndexOf('.') + 1) : queuePath;
QueueInfo queueInfo = null;
try {
queueInfo = rmContext.getScheduler()
.getQueueInfo(queueName, false, false);
} catch (IOException e) {
// Queue is not found, do nothing.
}
String parentPath = queuePath;
// TODO: handle global config change.
while (queueInfo == null) {
// We are adding a queue (whose parent we are possibly also adding).
// Check ACL of lowest parent queue which already exists.
parentPath = parentPath.substring(0, parentPath.lastIndexOf('.'));
String parentName = parentPath.lastIndexOf('.') != -1 ?
parentPath.substring(parentPath.lastIndexOf('.') + 1) : parentPath;
try {
queueInfo = rmContext.getScheduler()
.getQueueInfo(parentName, false, false);
} catch (IOException e) {
// Queue is not found, do nothing.
}
}
Queue queue = ((MutableConfScheduler) rmContext.getScheduler())
.getQueue(queueInfo.getQueueName());
if (queue != null && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, user)) {
return false;
}
}
return true;
}
}

View File

@ -142,7 +142,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@ -2484,10 +2483,8 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
callerUGI.doAs(new PrivilegedExceptionAction<Void>() { callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override @Override
public Void run() throws IOException, YarnException { public Void run() throws IOException, YarnException {
Map<String, String> confUpdate = ((MutableConfScheduler) scheduler).updateConfiguration(callerUGI,
constructKeyValueConfUpdate(mutationInfo); mutationInfo);
((CapacityScheduler) scheduler).updateConfiguration(callerUGI,
confUpdate);
return null; return null;
} }
}); });
@ -2499,129 +2496,9 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
"successfully applied.").build(); "successfully applied.").build();
} else { } else {
return Response.status(Status.BAD_REQUEST) return Response.status(Status.BAD_REQUEST)
.entity("Configuration change only supported by CapacityScheduler.") .entity("Configuration change only supported by " +
"MutableConfScheduler.")
.build(); .build();
} }
} }
private Map<String, String> constructKeyValueConfUpdate(
QueueConfigsUpdateInfo mutationInfo) throws IOException {
CapacitySchedulerConfiguration currentConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
CapacitySchedulerConfiguration proposedConf =
new CapacitySchedulerConfiguration(currentConf, false);
Map<String, String> confUpdate = new HashMap<>();
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
removeQueue(queueToRemove, proposedConf, confUpdate);
}
for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
addQueue(addQueueInfo, proposedConf, confUpdate);
}
for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
updateQueue(updateQueueInfo, proposedConf, confUpdate);
}
return confUpdate;
}
private void removeQueue(
String queueToRemove, CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) throws IOException {
if (queueToRemove == null) {
return;
} else {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
String queueName = queueToRemove.substring(
queueToRemove.lastIndexOf('.') + 1);
CSQueue queue = cs.getQueue(queueName);
if (queue == null ||
!queue.getQueuePath().equals(queueToRemove)) {
throw new IOException("Queue " + queueToRemove + " not found");
} else if (queueToRemove.lastIndexOf('.') == -1) {
throw new IOException("Can't remove queue " + queueToRemove);
}
String parentQueuePath = queueToRemove.substring(0, queueToRemove
.lastIndexOf('.'));
String[] siblingQueues = proposedConf.getQueues(parentQueuePath);
List<String> newSiblingQueues = new ArrayList<>();
for (String siblingQueue : siblingQueues) {
if (!siblingQueue.equals(queueName)) {
newSiblingQueues.add(siblingQueue);
}
}
proposedConf.setQueues(parentQueuePath, newSiblingQueues
.toArray(new String[0]));
String queuesConfig = CapacitySchedulerConfiguration.PREFIX +
parentQueuePath + CapacitySchedulerConfiguration.DOT +
CapacitySchedulerConfiguration.QUEUES;
if (newSiblingQueues.size() == 0) {
confUpdate.put(queuesConfig, null);
} else {
confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues));
}
for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
.entrySet()) {
proposedConf.unset(confRemove.getKey());
confUpdate.put(confRemove.getKey(), null);
}
}
}
private void addQueue(
QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) throws IOException {
if (addInfo == null) {
return;
} else {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
String queuePath = addInfo.getQueue();
String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
if (cs.getQueue(queueName) != null) {
throw new IOException("Can't add existing queue " + queuePath);
} else if (queuePath.lastIndexOf('.') == -1) {
throw new IOException("Can't add invalid queue " + queuePath);
}
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
String[] siblings = proposedConf.getQueues(parentQueue);
List<String> siblingQueues = siblings == null ? new ArrayList<>() :
new ArrayList<>(Arrays.<String>asList(siblings));
siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
proposedConf.setQueues(parentQueue,
siblingQueues.toArray(new String[0]));
confUpdate.put(CapacitySchedulerConfiguration.PREFIX +
parentQueue + CapacitySchedulerConfiguration.DOT +
CapacitySchedulerConfiguration.QUEUES,
Joiner.on(',').join(siblingQueues));
String keyPrefix = CapacitySchedulerConfiguration.PREFIX +
queuePath + CapacitySchedulerConfiguration.DOT;
for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
if (kv.getValue() == null) {
proposedConf.unset(keyPrefix + kv.getKey());
} else {
proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
}
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
}
}
}
private void updateQueue(QueueConfigInfo updateInfo,
CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) {
if (updateInfo == null) {
return;
} else {
String queuePath = updateInfo.getQueue();
String keyPrefix = CapacitySchedulerConfiguration.PREFIX +
queuePath + CapacitySchedulerConfiguration.DOT;
for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
if (kv.getValue() == null) {
proposedConf.unset(keyPrefix + kv.getKey());
} else {
proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
}
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
}
}
}
} }

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.QueueAdminConfigurationMutationACLPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestConfigurationMutationACLPolicies {
private ConfigurationMutationACLPolicy policy;
private RMContext rmContext;
private MutableConfScheduler scheduler;
private static final UserGroupInformation GOOD_USER = UserGroupInformation
.createUserForTesting("goodUser", new String[] {});
private static final UserGroupInformation BAD_USER = UserGroupInformation
.createUserForTesting("badUser", new String[] {});
private static final Map<String, String> EMPTY_MAP =
Collections.<String, String>emptyMap();
@Before
public void setUp() throws IOException {
rmContext = mock(RMContext.class);
scheduler = mock(MutableConfScheduler.class);
when(rmContext.getScheduler()).thenReturn(scheduler);
mockQueue("a", scheduler);
mockQueue("b", scheduler);
mockQueue("b1", scheduler);
}
private void mockQueue(String queueName, MutableConfScheduler scheduler)
throws IOException {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, 0, 0, 0, null, null,
null, null, null, null, false);
when(scheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean()))
.thenReturn(queueInfo);
Queue queue = mock(Queue.class);
when(queue.hasAccess(eq(QueueACL.ADMINISTER_QUEUE), eq(GOOD_USER)))
.thenReturn(true);
when(queue.hasAccess(eq(QueueACL.ADMINISTER_QUEUE), eq(BAD_USER)))
.thenReturn(false);
when(scheduler.getQueue(eq(queueName))).thenReturn(queue);
}
@Test
public void testDefaultPolicy() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.YARN_ADMIN_ACL, GOOD_USER.getShortUserName());
conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS,
DefaultConfigurationMutationACLPolicy.class,
ConfigurationMutationACLPolicy.class);
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
policy.init(conf, rmContext);
assertTrue(policy.isMutationAllowed(GOOD_USER, null));
assertFalse(policy.isMutationAllowed(BAD_USER, null));
}
@Test
public void testQueueAdminBasedPolicy() {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS,
QueueAdminConfigurationMutationACLPolicy.class,
ConfigurationMutationACLPolicy.class);
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
policy.init(conf, rmContext);
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
QueueConfigInfo configInfo = new QueueConfigInfo("root.a", EMPTY_MAP);
updateInfo.getUpdateQueueInfo().add(configInfo);
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo));
}
@Test
public void testQueueAdminPolicyAddQueue() {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS,
QueueAdminConfigurationMutationACLPolicy.class,
ConfigurationMutationACLPolicy.class);
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
policy.init(conf, rmContext);
// Add root.b.b1. Should check ACL of root.b queue.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2", EMPTY_MAP);
updateInfo.getAddQueueInfo().add(configInfo);
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo));
}
@Test
public void testQueueAdminPolicyAddNestedQueue() {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS,
QueueAdminConfigurationMutationACLPolicy.class,
ConfigurationMutationACLPolicy.class);
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
policy.init(conf, rmContext);
// Add root.b.b1.b11. Should check ACL of root.b queue.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2.b21", EMPTY_MAP);
updateInfo.getAddQueueInfo().add(configInfo);
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo));
}
@Test
public void testQueueAdminPolicyRemoveQueue() {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS,
QueueAdminConfigurationMutationACLPolicy.class,
ConfigurationMutationACLPolicy.class);
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
policy.init(conf, rmContext);
// Remove root.b.b1.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
updateInfo.getRemoveQueueInfo().add("root.b.b1");
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo));
}
}

View File

@ -19,8 +19,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -43,22 +47,34 @@ public class TestMutableCSConfigurationProvider {
private MutableCSConfigurationProvider confProvider; private MutableCSConfigurationProvider confProvider;
private RMContext rmContext; private RMContext rmContext;
private Map<String, String> goodUpdate; private QueueConfigsUpdateInfo goodUpdate;
private Map<String, String> badUpdate; private QueueConfigsUpdateInfo badUpdate;
private CapacityScheduler cs; private CapacityScheduler cs;
private static final String TEST_USER = "testUser"; private static final UserGroupInformation TEST_USER = UserGroupInformation
.createUserForTesting("testUser", new String[] {});
@Before @Before
public void setUp() { public void setUp() {
cs = mock(CapacityScheduler.class); cs = mock(CapacityScheduler.class);
rmContext = mock(RMContext.class); rmContext = mock(RMContext.class);
when(rmContext.getScheduler()).thenReturn(cs); when(rmContext.getScheduler()).thenReturn(cs);
when(cs.getConfiguration()).thenReturn(
new CapacitySchedulerConfiguration());
confProvider = new MutableCSConfigurationProvider(rmContext); confProvider = new MutableCSConfigurationProvider(rmContext);
goodUpdate = new HashMap<>(); goodUpdate = new QueueConfigsUpdateInfo();
goodUpdate.put("goodKey", "goodVal"); Map<String, String> goodUpdateMap = new HashMap<>();
badUpdate = new HashMap<>(); goodUpdateMap.put("goodKey", "goodVal");
badUpdate.put("badKey", "badVal"); QueueConfigInfo goodUpdateInfo = new
QueueConfigInfo("root.a", goodUpdateMap);
goodUpdate.getUpdateQueueInfo().add(goodUpdateInfo);
badUpdate = new QueueConfigsUpdateInfo();
Map<String, String> badUpdateMap = new HashMap<>();
badUpdateMap.put("badKey", "badVal");
QueueConfigInfo badUpdateInfo = new
QueueConfigInfo("root.a", badUpdateMap);
badUpdate.getUpdateQueueInfo().add(badUpdateInfo);
} }
@Test @Test
@ -66,15 +82,16 @@ public class TestMutableCSConfigurationProvider {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
confProvider.init(conf); confProvider.init(conf);
assertNull(confProvider.loadConfiguration(conf) assertNull(confProvider.loadConfiguration(conf)
.get("goodKey")); .get("yarn.scheduler.capacity.root.a.goodKey"));
doNothing().when(cs).reinitialize(any(Configuration.class), doNothing().when(cs).reinitialize(any(Configuration.class),
any(RMContext.class)); any(RMContext.class));
confProvider.mutateConfiguration(TEST_USER, goodUpdate); confProvider.mutateConfiguration(TEST_USER, goodUpdate);
assertEquals("goodVal", confProvider.loadConfiguration(conf) assertEquals("goodVal", confProvider.loadConfiguration(conf)
.get("goodKey")); .get("yarn.scheduler.capacity.root.a.goodKey"));
assertNull(confProvider.loadConfiguration(conf).get("badKey")); assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class), doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class),
any(RMContext.class)); any(RMContext.class));
try { try {
@ -82,6 +99,7 @@ public class TestMutableCSConfigurationProvider {
} catch (IOException e) { } catch (IOException e) {
// Expected exception. // Expected exception.
} }
assertNull(confProvider.loadConfiguration(conf).get("badKey")); assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
} }
} }