YARN-4997. Update fair scheduler to use pluggable auth provider (Contributed by Tao Jie via Daniel Templeton)

This commit is contained in:
Daniel Templeton 2016-11-30 09:50:33 -08:00
parent 625df87c7b
commit b3befc021b
6 changed files with 149 additions and 43 deletions

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
/**
@ -60,6 +61,20 @@ public abstract class YarnAuthorizationProvider {
return authorizer;
}
/**
* Destroy the {@link YarnAuthorizationProvider} instance.
* This method is called only in Tests.
*/
@VisibleForTesting
public static void destroy() {
synchronized (YarnAuthorizationProvider.class) {
if (authorizer != null) {
LOG.debug(authorizer.getClass().getName() + " is destroyed.");
authorizer = null;
}
}
}
/**
* Initialize the provider. Invoked on daemon startup. DefaultYarnAuthorizer is
* initialized based on configurations.

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -25,13 +26,14 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -69,7 +71,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
private final float queueMaxAMShareDefault;
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
private final Map<String, Map<AccessType, AccessControlList>> queueAcls;
// Reservation ACL's for each queue. Only specifies non-default ACL's from
// configuration.
@ -123,7 +125,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<String, Map<AccessType, AccessControlList>> queueAcls,
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues,
@ -191,9 +193,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
* nobody ("")
*/
public AccessControlList getQueueAcl(String queue, QueueACL operation) {
Map<QueueACL, AccessControlList> queueAcls = this.queueAcls.get(queue);
if (queueAcls != null) {
AccessControlList operationAcl = queueAcls.get(operation);
Map<AccessType, AccessControlList> acls = this.queueAcls.get(queue);
if (acls != null) {
AccessControlList operationAcl =
acls.get(SchedulerUtils.toAccessType(operation));
if (operationAcl != null) {
return operationAcl;
}
@ -201,6 +204,14 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL;
}
/**
* Get the map of ACLs of all queues.
* @return the map of ACLs of all queues
*/
public Map<String, Map<AccessType, AccessControlList>> getQueueAcls() {
return Collections.unmodifiableMap(this.queueAcls);
}
@Override
/**
* Get the map of reservation ACLs to {@link AccessControlList} for the
@ -315,21 +326,6 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
return maxChildQueueResources.get(queue);
}
public boolean hasAccess(String queueName, QueueACL acl,
UserGroupInformation user) {
int lastPeriodIndex = queueName.length();
while (lastPeriodIndex != -1) {
String queue = queueName.substring(0, lastPeriodIndex);
if (getQueueAcl(queue, acl).isUserAllowed(user)) {
return true;
}
lastPeriodIndex = queueName.lastIndexOf('.', lastPeriodIndex - 1);
}
return false;
}
@VisibleForTesting
SchedulingPolicy getSchedulingPolicy(String queueName) {
SchedulingPolicy policy = schedulingPolicies.get(queueName);

View File

@ -41,8 +41,13 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -74,6 +79,12 @@ public class AllocationFileLoaderService extends AbstractService {
public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
private static final String ROOT = "root";
private static final AccessControlList EVERYBODY_ACL =
new AccessControlList("*");
private static final AccessControlList NOBODY_ACL =
new AccessControlList(" ");
private final Clock clock;
private long lastSuccessfulReload; // Last time we successfully reloaded queues
@ -93,7 +104,9 @@ public class AllocationFileLoaderService extends AbstractService {
public AllocationFileLoaderService() {
this(SystemClock.getInstance());
}
private List<Permission> defaultPermissions;
public AllocationFileLoaderService(Clock clock) {
super(AllocationFileLoaderService.class.getName());
this.clock = clock;
@ -208,6 +221,7 @@ public class AllocationFileLoaderService extends AbstractService {
ParserConfigurationException, SAXException,
AllocationConfigurationException {
if (allocFile == null) {
reloadListener.onReload(null);
return;
}
LOG.info("Loading allocation file " + allocFile);
@ -224,9 +238,10 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
Map<String, Float> fairSharePreemptionThresholds = new HashMap<>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
Map<String, Map<AccessType, AccessControlList>> queueAcls =
new HashMap<>();
Map<String, Map<ReservationACL, AccessControlList>> reservationAcls =
new HashMap<>();
new HashMap<>();
Set<String> reservableQueues = new HashSet<>();
Set<String> nonPreemptableQueues = new HashSet<>();
int userMaxAppsDefault = Integer.MAX_VALUE;
@ -444,7 +459,7 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<String, Map<AccessType, AccessControlList>> queueAcls,
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
Map<FSQueueType, Set<String>> configuredQueues,
Set<String> reservableQueues,
@ -468,7 +483,7 @@ public class AllocationFileLoaderService extends AbstractService {
queueName = parentName + "." + queueName;
}
Map<QueueACL, AccessControlList> acls = new HashMap<>();
Map<AccessType, AccessControlList> acls = new HashMap<>();
Map<ReservationACL, AccessControlList> racls = new HashMap<>();
NodeList fields = element.getChildNodes();
boolean isLeaf = true;
@ -526,10 +541,10 @@ public class AllocationFileLoaderService extends AbstractService {
queuePolicies.put(queueName, policy);
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
acls.put(AccessType.SUBMIT_APP, new AccessControlList(text));
} else if ("aclAdministerApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
acls.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(text));
} else if ("aclAdministerReservations".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
racls.put(ReservationACL.ADMINISTER_RESERVATIONS,
@ -578,6 +593,17 @@ public class AllocationFileLoaderService extends AbstractService {
}
configuredQueues.get(FSQueueType.PARENT).add(queueName);
}
// Set default acls if not defined
// The root queue defaults to all access
for (QueueACL acl : QueueACL.values()) {
AccessType accessType = SchedulerUtils.toAccessType(acl);
if (acls.get(accessType) == null){
AccessControlList defaultAcl = queueName.equals(ROOT) ?
EVERYBODY_ACL : NOBODY_ACL;
acls.put(accessType, defaultAcl);
}
}
queueAcls.put(queueName, acls);
resAcls.put(queueName, racls);
if (maxQueueResources.containsKey(queueName) &&
@ -590,8 +616,30 @@ public class AllocationFileLoaderService extends AbstractService {
minQueueResources.get(queueName)));
}
}
public interface Listener {
public void onReload(AllocationConfiguration info);
/**
* Returns the list of default permissions.
* The default permission for the root queue is everybody ("*")
* and the default permission for all other queues is nobody ("").
* The default permission list would be loaded before the permissions
* from allocation file.
* @return default permission list
*/
protected List<Permission> getDefaultPermissions() {
if (defaultPermissions == null) {
defaultPermissions = new ArrayList<>();
Map<AccessType, AccessControlList> acls =
new HashMap<>();
for (QueueACL acl : QueueACL.values()) {
acls.put(SchedulerUtils.toAccessType(acl), EVERYBODY_ACL);
}
defaultPermissions.add(new Permission(
new PrivilegedEntity(EntityType.QUEUE, ROOT), acls));
}
return defaultPermissions;
}
interface Listener {
void onReload(AllocationConfiguration info) throws IOException;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
@ -37,8 +38,13 @@ import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessRequest;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@ -53,6 +59,8 @@ public abstract class FSQueue implements Queue, Schedulable {
private Resource steadyFairShare = Resources.createResource(0, 0);
private final String name;
protected final FairScheduler scheduler;
private final YarnAuthorizationProvider authorizer;
private final PrivilegedEntity queueEntity;
private final FSQueueMetrics metrics;
protected final FSParentQueue parent;
@ -78,6 +86,9 @@ public abstract class FSQueue implements Queue, Schedulable {
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
this.scheduler = scheduler;
this.authorizer =
YarnAuthorizationProvider.getInstance(scheduler.getConf());
this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, name);
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
this.parent = parent;
}
@ -96,16 +107,16 @@ public abstract class FSQueue implements Queue, Schedulable {
public String getName() {
return name;
}
@Override
public String getQueueName() {
return name;
}
public SchedulingPolicy getPolicy() {
return policy;
}
public FSParentQueue getParent() {
return parent;
}
@ -266,7 +277,10 @@ public abstract class FSQueue implements Queue, Schedulable {
}
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
return authorizer.checkPermission(
new AccessRequest(queueEntity, user,
SchedulerUtils.toAccessType(acl), null, null,
Server.getRemoteAddress(), null));
}
long getFairSharePreemptionTimeout() {

View File

@ -25,6 +25,8 @@ import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -34,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -53,6 +56,11 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -124,6 +132,7 @@ public class FairScheduler extends
private FairSchedulerConfiguration conf;
private FSContext context;
private YarnAuthorizationProvider authorizer;
private Resource incrAllocation;
private QueueManager queueMgr;
private boolean usePortForNodeName;
@ -1209,6 +1218,7 @@ public class FairScheduler extends
writeLock.lock();
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
authorizer = YarnAuthorizationProvider.getInstance(conf);
minimumAllocation = this.conf.getMinimumAllocation();
initMaximumResourceCapability(this.conf.getMaximumAllocation());
incrAllocation = this.conf.getIncrementAllocation();
@ -1417,23 +1427,46 @@ public class FairScheduler extends
AllocationFileLoaderService.Listener {
@Override
public void onReload(AllocationConfiguration queueInfo) {
public void onReload(AllocationConfiguration queueInfo)
throws IOException {
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
writeLock.lock();
try {
allocConf = queueInfo;
allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
queueMgr.updateAllocationConfiguration(allocConf);
applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload();
if (queueInfo == null) {
authorizer.setPermission(allocsLoader.getDefaultPermissions(),
UserGroupInformation.getCurrentUser());
} else {
allocConf = queueInfo;
setQueueAcls(allocConf.getQueueAcls());
allocConf.getDefaultSchedulingPolicy().initialize(
getClusterResource());
queueMgr.updateAllocationConfiguration(allocConf);
applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload();
}
} finally {
writeLock.unlock();
}
}
}
private void setQueueAcls(
Map<String, Map<AccessType, AccessControlList>> queueAcls)
throws IOException {
authorizer.setPermission(allocsLoader.getDefaultPermissions(),
UserGroupInformation.getCurrentUser());
List<Permission> permissions = new ArrayList<>();
for (Entry<String, Map<AccessType, AccessControlList>> queueAcl : queueAcls
.entrySet()) {
permissions.add(new Permission(new PrivilegedEntity(EntityType.QUEUE,
queueAcl.getKey()), queueAcl.getValue()));
}
authorizer.setPermission(permissions,
UserGroupInformation.getCurrentUser());
}
/**
* After reloading the allocation config, the max resource settings for any
* ad hoc queues will be missing. This method goes through the queue manager's

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -94,10 +95,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
@ -153,6 +152,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.shutdown();
YarnAuthorizationProvider.destroy();
}