YARN-3100. Made YARN authorization pluggable. Contributed by Jian He.

This commit is contained in:
Zhijie Shen 2015-02-09 20:34:56 -08:00
parent 02340a24f2
commit 23bf6c7207
19 changed files with 445 additions and 108 deletions

View File

@ -258,6 +258,8 @@ Release 2.7.0 - UNRELEASED
YARN-3155. Refactor the exception handling code for TimelineClientImpl's
retryOn method (Li Lu via wangda)
YARN-3100. Made YARN authorization pluggable. (Jian He via zjshen)
OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and

View File

@ -435,6 +435,8 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS =
"org.apache.hadoop.yarn.LocalConfigurationProvider";
public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX
+ "authorization-provider";
private static final List<String> RM_SERVICES_ADDRESS_CONF_KEYS_HTTP =
Collections.unmodifiableList(Arrays.asList(
RM_ADDRESS,

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.security;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Access types for a queue or an application.
*/
@Private
@Unstable
public enum AccessType {
// queue
SUBMIT_APP,
ADMINISTER_QUEUE,
}

View File

@ -98,15 +98,6 @@ public class AdminACLsManager {
return aclsEnabled;
}
/**
* Returns the internal structure used to maintain administrator ACLs
*
* @return Structure used to maintain administrator access
*/
public AccessControlList getAdminAcl() {
return adminAcl;
}
/**
* Returns whether the specified user/group is an administrator
*
@ -117,26 +108,4 @@ public class AdminACLsManager {
public boolean isAdmin(UserGroupInformation callerUGI) {
return adminAcl.isUserAllowed(callerUGI);
}
/**
* Returns whether the specified user/group has administrator access
*
* @param callerUGI user/group to to check
* @return <tt>true</tt> if the UserGroupInformation specified
* is a member of the access control list for administrators
* and ACLs are enabled for this cluster
*
* @see #getAdminAcl
* @see #areACLsEnabled
*/
public boolean checkAccess(UserGroupInformation callerUGI) {
// Any user may perform this operation if authorization is not enabled
if (!areACLsEnabled()) {
return true;
}
// Administrators may perform any operation
return isAdmin(callerUGI);
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.security;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
/**
* A YarnAuthorizationProvider implementation based on configuration files.
*
*/
@Private
@Unstable
public class ConfiguredYarnAuthorizer extends YarnAuthorizationProvider {
private final ConcurrentMap<PrivilegedEntity, Map<AccessType, AccessControlList>> allAcls =
new ConcurrentHashMap<>();
private volatile AccessControlList adminAcl = null;
@Override
public void init(Configuration conf) {
adminAcl =
new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
}
@Override
public void setPermission(PrivilegedEntity target,
Map<AccessType, AccessControlList> acls, UserGroupInformation ugi) {
allAcls.put(target, acls);
}
@Override
public boolean checkPermission(AccessType accessType,
PrivilegedEntity target, UserGroupInformation user) {
boolean ret = false;
Map<AccessType, AccessControlList> acls = allAcls.get(target);
if (acls != null) {
AccessControlList list = acls.get(accessType);
if (list != null) {
ret = list.isUserAllowed(user);
}
}
// recursively look up the queue to see if parent queue has the permission.
if (target.getType() == EntityType.QUEUE && !ret) {
String queueName = target.getName();
if (!queueName.contains(".")) {
return ret;
}
String parentQueueName = queueName.substring(0, queueName.lastIndexOf("."));
return checkPermission(accessType, new PrivilegedEntity(target.getType(),
parentQueueName), user);
}
return ret;
}
@Override
public void setAdmins(AccessControlList acls, UserGroupInformation ugi) {
adminAcl = acls;
}
@Override
public boolean isAdmin(UserGroupInformation ugi) {
return adminAcl.isUserAllowed(ugi);
}
public AccessControlList getAdminAcls() {
return this.adminAcl;
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.security;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.QueueACL;
/**
* An entity in YARN that can be guarded with ACLs. The entity could be an
* application or a queue etc. An application entity has access types defined in
* {@link ApplicationAccessType}, a queue entity has access types defined in
* {@link QueueACL}.
*/
@Private
@Unstable
public class PrivilegedEntity {
public enum EntityType {
QUEUE
}
EntityType type;
String name;
public PrivilegedEntity(EntityType type, String name) {
this.type = type;
this.name = name;
}
public EntityType getType() {
return type;
}
public String getName() {
return name;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + ((type == null) ? 0 : type.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PrivilegedEntity other = (PrivilegedEntity) obj;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
if (type != other.type)
return false;
return true;
}
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.security;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* An implementation of the interface will provide authorization related
* information and enforce permission check. It is excepted that any of the
* methods defined in this interface should be non-blocking call and should not
* involve expensive computation as these method could be invoked in RPC.
*/
@Private
@Unstable
public abstract class YarnAuthorizationProvider {
private static final Log LOG = LogFactory.getLog(YarnAuthorizationProvider.class);
private static YarnAuthorizationProvider authorizer = null;
public static YarnAuthorizationProvider getInstance(Configuration conf) {
synchronized (YarnAuthorizationProvider.class) {
if (authorizer == null) {
Class<?> authorizerClass =
conf.getClass(YarnConfiguration.YARN_AUTHORIZATION_PROVIDER,
ConfiguredYarnAuthorizer.class);
authorizer =
(YarnAuthorizationProvider) ReflectionUtils.newInstance(
authorizerClass, conf);
authorizer.init(conf);
LOG.info(authorizerClass.getName() + " is instiantiated.");
}
}
return authorizer;
}
/**
* Initialize the provider. Invoked on daemon startup. DefaultYarnAuthorizer is
* initialized based on configurations.
*/
public abstract void init(Configuration conf);
/**
* Check if user has the permission to access the target object.
*
* @param accessType
* The type of accessing method.
* @param target
* The target object being accessed, e.g. app/queue
* @param user
* User who access the target
* @return true if user can access the object, otherwise false.
*/
public abstract boolean checkPermission(AccessType accessType,
PrivilegedEntity target, UserGroupInformation user);
/**
* Set ACLs for the target object. AccessControlList class encapsulate the
* users and groups who can access the target.
*
* @param target
* The target object.
* @param acls
* A map from access method to a list of users and/or groups who has
* permission to do the access.
* @param ugi User who sets the permissions.
*/
public abstract void setPermission(PrivilegedEntity target,
Map<AccessType, AccessControlList> acls, UserGroupInformation ugi);
/**
* Set a list of users/groups who have admin access
*
* @param acls users/groups who have admin access
* @param ugi User who sets the admin acls.
*/
public abstract void setAdmins(AccessControlList acls, UserGroupInformation ugi);
/**
* Check if the user is an admin.
*
* @param ugi the user to be determined if it is an admin
* @return true if the given user is an admin
*/
public abstract boolean isAdmin(UserGroupInformation ugi);
}

View File

@ -36,11 +36,10 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig.Policy;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AdminACLsManager;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -240,7 +239,9 @@ public class WebApps {
.addEndpoint(
URI.create(httpScheme + bindAddress
+ ":" + port)).setConf(conf).setFindPort(findPort)
.setACL(new AdminACLsManager(conf).getAdminAcl())
.setACL(new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)))
.setPathSpec(pathList.toArray(new String[0]));
boolean hasSpnegoConf = spnegoPrincipalKey != null

View File

@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
@ -101,7 +103,8 @@ public class AdminService extends CompositeService implements
// Address to use for binding. May be a wildcard address.
private InetSocketAddress masterServiceBindAddress;
private AccessControlList adminAcl;
private YarnAuthorizationProvider authorizer;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -129,10 +132,11 @@ public class AdminService extends CompositeService implements
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
authorizer = YarnAuthorizationProvider.getInstance(conf);
authorizer.setAdmins(new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)), UserGroupInformation
.getCurrentUser());
rmId = conf.get(YarnConfiguration.RM_HA_ID);
super.serviceInit(conf);
}
@ -206,7 +210,7 @@ public class AdminService extends CompositeService implements
}
private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
return RMServerUtils.verifyAdminAccess(authorizer, method, LOG);
}
private UserGroupInformation checkAcls(String method) throws YarnException {
@ -293,7 +297,7 @@ public class AdminService extends CompositeService implements
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
adminAcl.toString(), "RMHAProtocolService",
"", "RMHAProtocolService",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
@ -318,7 +322,7 @@ public class AdminService extends CompositeService implements
"transitionToStandby", "RMHAProtocolService");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
adminAcl.toString(), "RMHAProtocolService",
"", "RMHAProtocolService",
"Exception transitioning to standby");
throw new ServiceFailedException(
"Error when transitioning to Standby mode", e);
@ -446,9 +450,10 @@ public class AdminService extends CompositeService implements
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
authorizer.setAdmins(new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)), UserGroupInformation
.getCurrentUser());
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
@ -584,9 +589,10 @@ public class AdminService extends CompositeService implements
}
}
// only for testing
@VisibleForTesting
public AccessControlList getAccessControlList() {
return this.adminAcl;
return ((ConfiguredYarnAuthorizer)authorizer).getAdminAcls();
}
@VisibleForTesting
@ -661,7 +667,7 @@ public class AdminService extends CompositeService implements
private void checkRMStatus(String user, String argName, String msg)
throws StandbyException {
if (!isRMActive()) {
RMAuditLogger.logFailure(user, argName, adminAcl.toString(),
RMAuditLogger.logFailure(user, argName, "",
"AdminService", "ResourceManager is not active. Can not " + msg);
throwStandbyException();
}
@ -670,7 +676,7 @@ public class AdminService extends CompositeService implements
private YarnException logAndWrapException(IOException ioe, String user,
String argName, String msg) throws YarnException {
LOG.info("Exception " + msg, ioe);
RMAuditLogger.logFailure(user, argName, adminAcl.toString(),
RMAuditLogger.logFailure(user, argName, "",
"AdminService", "Exception " + msg);
return RPCUtil.getRemoteException(ioe);
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -140,43 +141,43 @@ public class RMServerUtils {
}
}
public static UserGroupInformation verifyAccess(
AccessControlList acl, String method, final Log LOG)
public static UserGroupInformation verifyAdminAccess(
YarnAuthorizationProvider authorizer, String method, final Log LOG)
throws IOException {
// by default, this method will use AdminService as module name
return verifyAccess(acl, method, "AdminService", LOG);
return verifyAdminAccess(authorizer, method, "AdminService", LOG);
}
/**
* Utility method to verify if the current user has access based on the
* passed {@link AccessControlList}
* @param acl the {@link AccessControlList} to check against
* @param authorizer the {@link AccessControlList} to check against
* @param method the method name to be logged
* @param module, like AdminService or NodeLabelManager
* @param module like AdminService or NodeLabelManager
* @param LOG the logger to use
* @return {@link UserGroupInformation} of the current user
* @throws IOException
*/
public static UserGroupInformation verifyAccess(
AccessControlList acl, String method, String module, final Log LOG)
public static UserGroupInformation verifyAdminAccess(
YarnAuthorizationProvider authorizer, String method, String module,
final Log LOG)
throws IOException {
UserGroupInformation user;
try {
user = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
LOG.warn("Couldn't get current user", ioe);
RMAuditLogger.logFailure("UNKNOWN", method, acl.toString(),
RMAuditLogger.logFailure("UNKNOWN", method, "",
"AdminService", "Couldn't get current user");
throw ioe;
}
if (!acl.isUserAllowed(user)) {
if (!authorizer.isAdmin(user)) {
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
" to call '" + method + "'");
RMAuditLogger.logFailure(user.getShortUserName(), method,
acl.toString(), module,
RMAuditLogger.AuditConstants.UNAUTHORIZED_USER);
RMAuditLogger.logFailure(user.getShortUserName(), method, "", module,
RMAuditLogger.AuditConstants.UNAUTHORIZED_USER);
throw new AccessControlException("User " + user.getShortUserName() +
" doesn't have permission" +

View File

@ -33,12 +33,11 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabel;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -60,16 +59,13 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
ConcurrentMap<String, Queue> queueCollections =
new ConcurrentHashMap<String, Queue>();
protected AccessControlList adminAcl;
private YarnAuthorizationProvider authorizer;
private RMContext rmContext = null;
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
adminAcl =
new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
authorizer = YarnAuthorizationProvider.getInstance(conf);
}
@Override
@ -479,7 +475,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
public boolean checkAccess(UserGroupInformation user) {
// make sure only admin can invoke
// this method
if (adminAcl.isUserAllowed(user)) {
if (authorizer.isAdmin(user)) {
return true;
}
return false;

View File

@ -28,12 +28,14 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -348,4 +350,15 @@ public class SchedulerUtils {
}
return true;
}
public static AccessType toAccessType(QueueACL acl) {
switch (acl) {
case ADMINISTER_QUEUE:
return AccessType.ADMINISTER_QUEUE;
case SUBMIT_APPLICATIONS:
return AccessType.SUBMIT_APP;
}
return null;
}
}

View File

@ -34,12 +34,16 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
@ -60,7 +64,8 @@ public abstract class AbstractCSQueue implements CSQueue {
Resource maximumAllocation;
QueueState state;
final QueueMetrics metrics;
protected final PrivilegedEntity queueEntity;
final ResourceCalculator resourceCalculator;
Set<String> accessibleLabels;
RMNodeLabelsManager labelManager;
@ -70,8 +75,8 @@ public abstract class AbstractCSQueue implements CSQueue {
Map<String, Float> absoluteMaxCapacityByNodeLabels;
Map<String, Float> maxCapacityByNodeLabels;
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>();
boolean reservationsContinueLooking;
private boolean preemptionDisabled;
@ -81,6 +86,7 @@ public abstract class AbstractCSQueue implements CSQueue {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private CapacitySchedulerContext csContext;
protected YarnAuthorizationProvider authorizer = null;
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
@ -126,6 +132,8 @@ public abstract class AbstractCSQueue implements CSQueue {
accessibleLabels, labelManager);
this.csContext = cs;
queueUsage = new ResourceUsage();
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
authorizer = YarnAuthorizationProvider.getInstance(cs.getConf());
}
@Override
@ -181,7 +189,11 @@ public abstract class AbstractCSQueue implements CSQueue {
public String getQueueName() {
return queueName;
}
public PrivilegedEntity getPrivilegedEntity() {
return queueEntity;
}
@Override
public synchronized CSQueue getParent() {
return parent;
@ -195,22 +207,13 @@ public abstract class AbstractCSQueue implements CSQueue {
public Set<String> getAccessibleNodeLabels() {
return accessibleLabels;
}
@Override
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
synchronized (this) {
if (acls.get(acl).isUserAllowed(user)) {
return true;
}
}
if (parent != null) {
return parent.hasAccess(acl, user);
}
return false;
return authorizer.checkPermission(SchedulerUtils.toAccessType(acl),
queueEntity, user);
}
@Override
public synchronized void setUsedCapacity(float usedCapacity) {
this.usedCapacity = usedCapacity;
@ -251,7 +254,7 @@ public abstract class AbstractCSQueue implements CSQueue {
synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map<QueueACL, AccessControlList> acls,
QueueState state, Map<AccessType, AccessControlList> acls,
Set<String> labels, String defaultLabelExpression,
Map<String, Float> nodeLabelCapacities,
Map<String, Float> maximumNodeLabelCapacities,
@ -436,7 +439,7 @@ public abstract class AbstractCSQueue implements CSQueue {
}
@Private
public Map<QueueACL, AccessControlList> getACLs() {
public Map<AccessType, AccessControlList> getACLs() {
return acls;
}

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -124,7 +125,8 @@ public class CapacityScheduler extends
PreemptableResourceScheduler, CapacitySchedulerContext, Configurable {
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
private YarnAuthorizationProvider authorizer;
private CSQueue root;
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@ -297,7 +299,7 @@ public class CapacityScheduler extends
new ConcurrentHashMap<ApplicationId,
SchedulerApplication<FiCaSchedulerApp>>();
this.labelManager = rmContext.getNodeLabelManager();
authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
@ -474,6 +476,7 @@ public class CapacityScheduler extends
labelManager.reinitializeQueueLabels(getQueueToLabels());
LOG.info("Initialized root queue " + root);
initializeQueueMappings();
setQueueAcls(authorizer, queues);
}
@Lock(CapacityScheduler.class)
@ -499,8 +502,19 @@ public class CapacityScheduler extends
root.updateClusterResource(clusterResource);
labelManager.reinitializeQueueLabels(getQueueToLabels());
setQueueAcls(authorizer, queues);
}
@VisibleForTesting
public static void setQueueAcls(YarnAuthorizationProvider authorizer,
Map<String, CSQueue> queues) throws IOException {
for (CSQueue queue : queues.values()) {
AbstractCSQueue csQueue = (AbstractCSQueue) queue;
authorizer.setPermission(csQueue.getPrivilegedEntity(),
csQueue.getACLs(), UserGroupInformation.getCurrentUser());
}
}
private Map<String, Set<String>> getQueueToLabels() {
Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>();
for (CSQueue queue : queues.values()) {

View File

@ -40,8 +40,10 @@ import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -530,11 +532,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
set(queuePrefix + getAclKey(acl), aclString);
}
public Map<QueueACL, AccessControlList> getAcls(String queue) {
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
public Map<AccessType, AccessControlList> getAcls(String queue) {
Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>();
for (QueueACL acl : QueueACL.values()) {
acls.put(acl, getAcl(queue, acl));
acls.put(SchedulerUtils.toAccessType(acl), getAcl(queue, acl));
}
return acls;
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -153,7 +154,7 @@ public class LeafQueue extends AbstractCSQueue {
QueueState state = cs.getConfiguration().getState(getQueuePath());
Map<QueueACL, AccessControlList> acls =
Map<AccessType, AccessControlList> acls =
cs.getConfiguration().getAcls(getQueuePath());
setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
@ -189,7 +190,7 @@ public class LeafQueue extends AbstractCSQueue {
int userLimit, float userLimitFactor,
int maxApplications, float maxAMResourcePerQueuePercent,
int maxApplicationsPerUser, QueueState state,
Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
Map<AccessType, AccessControlList> acls, int nodeLocalityDelay,
Set<String> labels, String defaultLabelExpression,
Map<String, Float> capacitieByLabel,
Map<String, Float> maximumCapacitiesByLabel,
@ -247,7 +248,7 @@ public class LeafQueue extends AbstractCSQueue {
maximumAllocation);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -107,7 +108,7 @@ public class ParentQueue extends AbstractCSQueue {
QueueState state = cs.getConfiguration().getState(getQueuePath());
Map<QueueACL, AccessControlList> acls =
Map<AccessType, AccessControlList> acls =
cs.getConfiguration().getAcls(getQueuePath());
setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
@ -124,7 +125,7 @@ public class ParentQueue extends AbstractCSQueue {
synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map<QueueACL, AccessControlList> acls,
QueueState state, Map<AccessType, AccessControlList> acls,
Set<String> accessibleLabels, String defaultLabelExpression,
Map<String, Float> nodeLabelCapacities,
Map<String, Float> maximumCapacitiesByLabel,
@ -134,7 +135,7 @@ public class ParentQueue extends AbstractCSQueue {
defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel,
reservationContinueLooking, maximumAllocation);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}

View File

@ -37,7 +37,6 @@ import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -726,6 +726,9 @@ public class TestParentQueue {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
YarnAuthorizationProvider authorizer =
YarnAuthorizationProvider.getInstance(conf);
CapacityScheduler.setQueueAcls(authorizer, queues);
UserGroupInformation user = UserGroupInformation.getCurrentUser();
// Setup queue configs

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
@ -58,8 +59,7 @@ public class SCMAdminProtocolService extends AbstractService implements
private Server server;
InetSocketAddress clientBindAddress;
private final CleanerService cleanerService;
private AccessControlList adminAcl;
private YarnAuthorizationProvider authorizer;
public SCMAdminProtocolService(CleanerService cleanerService) {
super(SCMAdminProtocolService.class.getName());
this.cleanerService = cleanerService;
@ -68,9 +68,7 @@ public class SCMAdminProtocolService extends AbstractService implements
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.clientBindAddress = getBindAddress(conf);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
authorizer = YarnAuthorizationProvider.getInstance(conf);
super.serviceInit(conf);
}
@ -119,7 +117,7 @@ public class SCMAdminProtocolService extends AbstractService implements
throw RPCUtil.getRemoteException(ioe);
}
if (!adminAcl.isUserAllowed(user)) {
if (!authorizer.isAdmin(user)) {
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
" to call '" + method + "'");