YARN-4997. Update fair scheduler to use pluggable auth provider (Contributed by Tao Jie via Daniel Templeton)
(cherry picked from commit b3befc021b
)
This commit is contained in:
parent
4a48f0e702
commit
443f2803ac
|
@ -28,6 +28,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -60,6 +61,20 @@ public abstract class YarnAuthorizationProvider {
|
||||||
return authorizer;
|
return authorizer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destroy the {@link YarnAuthorizationProvider} instance.
|
||||||
|
* This method is called only in Tests.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static void destroy() {
|
||||||
|
synchronized (YarnAuthorizationProvider.class) {
|
||||||
|
if (authorizer != null) {
|
||||||
|
LOG.debug(authorizer.getClass().getName() + " is destroyed.");
|
||||||
|
authorizer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the provider. Invoked on daemon startup. DefaultYarnAuthorizer is
|
* Initialize the provider. Invoked on daemon startup. DefaultYarnAuthorizer is
|
||||||
* initialized based on configurations.
|
* initialized based on configurations.
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -25,13 +26,14 @@ import java.util.Set;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.security.AccessType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -69,7 +71,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
private final float queueMaxAMShareDefault;
|
private final float queueMaxAMShareDefault;
|
||||||
|
|
||||||
// ACL's for each queue. Only specifies non-default ACL's from configuration.
|
// ACL's for each queue. Only specifies non-default ACL's from configuration.
|
||||||
private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
|
private final Map<String, Map<AccessType, AccessControlList>> queueAcls;
|
||||||
|
|
||||||
// Reservation ACL's for each queue. Only specifies non-default ACL's from
|
// Reservation ACL's for each queue. Only specifies non-default ACL's from
|
||||||
// configuration.
|
// configuration.
|
||||||
|
@ -123,7 +125,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
Map<String, Long> minSharePreemptionTimeouts,
|
Map<String, Long> minSharePreemptionTimeouts,
|
||||||
Map<String, Long> fairSharePreemptionTimeouts,
|
Map<String, Long> fairSharePreemptionTimeouts,
|
||||||
Map<String, Float> fairSharePreemptionThresholds,
|
Map<String, Float> fairSharePreemptionThresholds,
|
||||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
Map<String, Map<AccessType, AccessControlList>> queueAcls,
|
||||||
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
|
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
|
||||||
QueuePlacementPolicy placementPolicy,
|
QueuePlacementPolicy placementPolicy,
|
||||||
Map<FSQueueType, Set<String>> configuredQueues,
|
Map<FSQueueType, Set<String>> configuredQueues,
|
||||||
|
@ -191,9 +193,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
* nobody ("")
|
* nobody ("")
|
||||||
*/
|
*/
|
||||||
public AccessControlList getQueueAcl(String queue, QueueACL operation) {
|
public AccessControlList getQueueAcl(String queue, QueueACL operation) {
|
||||||
Map<QueueACL, AccessControlList> queueAcls = this.queueAcls.get(queue);
|
Map<AccessType, AccessControlList> acls = this.queueAcls.get(queue);
|
||||||
if (queueAcls != null) {
|
if (acls != null) {
|
||||||
AccessControlList operationAcl = queueAcls.get(operation);
|
AccessControlList operationAcl =
|
||||||
|
acls.get(SchedulerUtils.toAccessType(operation));
|
||||||
if (operationAcl != null) {
|
if (operationAcl != null) {
|
||||||
return operationAcl;
|
return operationAcl;
|
||||||
}
|
}
|
||||||
|
@ -201,6 +204,14 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL;
|
return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the map of ACLs of all queues.
|
||||||
|
* @return the map of ACLs of all queues
|
||||||
|
*/
|
||||||
|
public Map<String, Map<AccessType, AccessControlList>> getQueueAcls() {
|
||||||
|
return Collections.unmodifiableMap(this.queueAcls);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
/**
|
/**
|
||||||
* Get the map of reservation ACLs to {@link AccessControlList} for the
|
* Get the map of reservation ACLs to {@link AccessControlList} for the
|
||||||
|
@ -315,21 +326,6 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
return maxChildQueueResources.get(queue);
|
return maxChildQueueResources.get(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasAccess(String queueName, QueueACL acl,
|
|
||||||
UserGroupInformation user) {
|
|
||||||
int lastPeriodIndex = queueName.length();
|
|
||||||
while (lastPeriodIndex != -1) {
|
|
||||||
String queue = queueName.substring(0, lastPeriodIndex);
|
|
||||||
if (getQueueAcl(queue, acl).isUserAllowed(user)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
lastPeriodIndex = queueName.lastIndexOf('.', lastPeriodIndex - 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
SchedulingPolicy getSchedulingPolicy(String queueName) {
|
SchedulingPolicy getSchedulingPolicy(String queueName) {
|
||||||
SchedulingPolicy policy = schedulingPolicies.get(queueName);
|
SchedulingPolicy policy = schedulingPolicies.get(queueName);
|
||||||
|
|
|
@ -41,8 +41,13 @@ import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.security.AccessType;
|
||||||
|
import org.apache.hadoop.yarn.security.Permission;
|
||||||
|
import org.apache.hadoop.yarn.security.PrivilegedEntity;
|
||||||
|
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -74,6 +79,12 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
|
|
||||||
public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
|
public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
|
||||||
|
|
||||||
|
private static final String ROOT = "root";
|
||||||
|
private static final AccessControlList EVERYBODY_ACL =
|
||||||
|
new AccessControlList("*");
|
||||||
|
private static final AccessControlList NOBODY_ACL =
|
||||||
|
new AccessControlList(" ");
|
||||||
|
|
||||||
private final Clock clock;
|
private final Clock clock;
|
||||||
|
|
||||||
private long lastSuccessfulReload; // Last time we successfully reloaded queues
|
private long lastSuccessfulReload; // Last time we successfully reloaded queues
|
||||||
|
@ -93,7 +104,9 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
public AllocationFileLoaderService() {
|
public AllocationFileLoaderService() {
|
||||||
this(SystemClock.getInstance());
|
this(SystemClock.getInstance());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<Permission> defaultPermissions;
|
||||||
|
|
||||||
public AllocationFileLoaderService(Clock clock) {
|
public AllocationFileLoaderService(Clock clock) {
|
||||||
super(AllocationFileLoaderService.class.getName());
|
super(AllocationFileLoaderService.class.getName());
|
||||||
this.clock = clock;
|
this.clock = clock;
|
||||||
|
@ -208,6 +221,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
ParserConfigurationException, SAXException,
|
ParserConfigurationException, SAXException,
|
||||||
AllocationConfigurationException {
|
AllocationConfigurationException {
|
||||||
if (allocFile == null) {
|
if (allocFile == null) {
|
||||||
|
reloadListener.onReload(null);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info("Loading allocation file " + allocFile);
|
LOG.info("Loading allocation file " + allocFile);
|
||||||
|
@ -224,9 +238,10 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
|
Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
|
||||||
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
|
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
|
||||||
Map<String, Float> fairSharePreemptionThresholds = new HashMap<>();
|
Map<String, Float> fairSharePreemptionThresholds = new HashMap<>();
|
||||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
|
Map<String, Map<AccessType, AccessControlList>> queueAcls =
|
||||||
|
new HashMap<>();
|
||||||
Map<String, Map<ReservationACL, AccessControlList>> reservationAcls =
|
Map<String, Map<ReservationACL, AccessControlList>> reservationAcls =
|
||||||
new HashMap<>();
|
new HashMap<>();
|
||||||
Set<String> reservableQueues = new HashSet<>();
|
Set<String> reservableQueues = new HashSet<>();
|
||||||
Set<String> nonPreemptableQueues = new HashSet<>();
|
Set<String> nonPreemptableQueues = new HashSet<>();
|
||||||
int userMaxAppsDefault = Integer.MAX_VALUE;
|
int userMaxAppsDefault = Integer.MAX_VALUE;
|
||||||
|
@ -444,7 +459,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
Map<String, Long> minSharePreemptionTimeouts,
|
Map<String, Long> minSharePreemptionTimeouts,
|
||||||
Map<String, Long> fairSharePreemptionTimeouts,
|
Map<String, Long> fairSharePreemptionTimeouts,
|
||||||
Map<String, Float> fairSharePreemptionThresholds,
|
Map<String, Float> fairSharePreemptionThresholds,
|
||||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
Map<String, Map<AccessType, AccessControlList>> queueAcls,
|
||||||
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
|
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
|
||||||
Map<FSQueueType, Set<String>> configuredQueues,
|
Map<FSQueueType, Set<String>> configuredQueues,
|
||||||
Set<String> reservableQueues,
|
Set<String> reservableQueues,
|
||||||
|
@ -468,7 +483,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
queueName = parentName + "." + queueName;
|
queueName = parentName + "." + queueName;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<QueueACL, AccessControlList> acls = new HashMap<>();
|
Map<AccessType, AccessControlList> acls = new HashMap<>();
|
||||||
Map<ReservationACL, AccessControlList> racls = new HashMap<>();
|
Map<ReservationACL, AccessControlList> racls = new HashMap<>();
|
||||||
NodeList fields = element.getChildNodes();
|
NodeList fields = element.getChildNodes();
|
||||||
boolean isLeaf = true;
|
boolean isLeaf = true;
|
||||||
|
@ -526,10 +541,10 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
queuePolicies.put(queueName, policy);
|
queuePolicies.put(queueName, policy);
|
||||||
} else if ("aclSubmitApps".equals(field.getTagName())) {
|
} else if ("aclSubmitApps".equals(field.getTagName())) {
|
||||||
String text = ((Text)field.getFirstChild()).getData();
|
String text = ((Text)field.getFirstChild()).getData();
|
||||||
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
|
acls.put(AccessType.SUBMIT_APP, new AccessControlList(text));
|
||||||
} else if ("aclAdministerApps".equals(field.getTagName())) {
|
} else if ("aclAdministerApps".equals(field.getTagName())) {
|
||||||
String text = ((Text)field.getFirstChild()).getData();
|
String text = ((Text)field.getFirstChild()).getData();
|
||||||
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
|
acls.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(text));
|
||||||
} else if ("aclAdministerReservations".equals(field.getTagName())) {
|
} else if ("aclAdministerReservations".equals(field.getTagName())) {
|
||||||
String text = ((Text)field.getFirstChild()).getData();
|
String text = ((Text)field.getFirstChild()).getData();
|
||||||
racls.put(ReservationACL.ADMINISTER_RESERVATIONS,
|
racls.put(ReservationACL.ADMINISTER_RESERVATIONS,
|
||||||
|
@ -578,6 +593,17 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
}
|
}
|
||||||
configuredQueues.get(FSQueueType.PARENT).add(queueName);
|
configuredQueues.get(FSQueueType.PARENT).add(queueName);
|
||||||
}
|
}
|
||||||
|
// Set default acls if not defined
|
||||||
|
// The root queue defaults to all access
|
||||||
|
for (QueueACL acl : QueueACL.values()) {
|
||||||
|
AccessType accessType = SchedulerUtils.toAccessType(acl);
|
||||||
|
if (acls.get(accessType) == null){
|
||||||
|
AccessControlList defaultAcl = queueName.equals(ROOT) ?
|
||||||
|
EVERYBODY_ACL : NOBODY_ACL;
|
||||||
|
acls.put(accessType, defaultAcl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
queueAcls.put(queueName, acls);
|
queueAcls.put(queueName, acls);
|
||||||
resAcls.put(queueName, racls);
|
resAcls.put(queueName, racls);
|
||||||
if (maxQueueResources.containsKey(queueName) &&
|
if (maxQueueResources.containsKey(queueName) &&
|
||||||
|
@ -590,8 +616,30 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
minQueueResources.get(queueName)));
|
minQueueResources.get(queueName)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface Listener {
|
/**
|
||||||
public void onReload(AllocationConfiguration info);
|
* Returns the list of default permissions.
|
||||||
|
* The default permission for the root queue is everybody ("*")
|
||||||
|
* and the default permission for all other queues is nobody ("").
|
||||||
|
* The default permission list would be loaded before the permissions
|
||||||
|
* from allocation file.
|
||||||
|
* @return default permission list
|
||||||
|
*/
|
||||||
|
protected List<Permission> getDefaultPermissions() {
|
||||||
|
if (defaultPermissions == null) {
|
||||||
|
defaultPermissions = new ArrayList<>();
|
||||||
|
Map<AccessType, AccessControlList> acls =
|
||||||
|
new HashMap<>();
|
||||||
|
for (QueueACL acl : QueueACL.values()) {
|
||||||
|
acls.put(SchedulerUtils.toAccessType(acl), EVERYBODY_ACL);
|
||||||
|
}
|
||||||
|
defaultPermissions.add(new Permission(
|
||||||
|
new PrivilegedEntity(EntityType.QUEUE, ROOT), acls));
|
||||||
|
}
|
||||||
|
return defaultPermissions;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Listener {
|
||||||
|
void onReload(AllocationConfiguration info) throws IOException;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
@ -37,8 +38,13 @@ import org.apache.hadoop.yarn.api.records.QueueStatistics;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.security.AccessRequest;
|
||||||
|
import org.apache.hadoop.yarn.security.PrivilegedEntity;
|
||||||
|
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
|
||||||
|
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -53,6 +59,8 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
private Resource steadyFairShare = Resources.createResource(0, 0);
|
private Resource steadyFairShare = Resources.createResource(0, 0);
|
||||||
private final String name;
|
private final String name;
|
||||||
protected final FairScheduler scheduler;
|
protected final FairScheduler scheduler;
|
||||||
|
private final YarnAuthorizationProvider authorizer;
|
||||||
|
private final PrivilegedEntity queueEntity;
|
||||||
private final FSQueueMetrics metrics;
|
private final FSQueueMetrics metrics;
|
||||||
|
|
||||||
protected final FSParentQueue parent;
|
protected final FSParentQueue parent;
|
||||||
|
@ -78,6 +86,9 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
|
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
|
this.authorizer =
|
||||||
|
YarnAuthorizationProvider.getInstance(scheduler.getConf());
|
||||||
|
this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, name);
|
||||||
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
|
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
reinit(false);
|
reinit(false);
|
||||||
|
@ -106,16 +117,16 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getQueueName() {
|
public String getQueueName() {
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SchedulingPolicy getPolicy() {
|
public SchedulingPolicy getPolicy() {
|
||||||
return policy;
|
return policy;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FSParentQueue getParent() {
|
public FSParentQueue getParent() {
|
||||||
return parent;
|
return parent;
|
||||||
}
|
}
|
||||||
|
@ -276,7 +287,10 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
||||||
return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
|
return authorizer.checkPermission(
|
||||||
|
new AccessRequest(queueEntity, user,
|
||||||
|
SchedulerUtils.toAccessType(acl), null, null,
|
||||||
|
Server.getRemoteAddress(), null));
|
||||||
}
|
}
|
||||||
|
|
||||||
long getFairSharePreemptionTimeout() {
|
long getFairSharePreemptionTimeout() {
|
||||||
|
|
|
@ -28,6 +28,8 @@ import java.util.Comparator;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@ -37,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
@ -55,6 +58,11 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||||
|
import org.apache.hadoop.yarn.security.AccessType;
|
||||||
|
import org.apache.hadoop.yarn.security.Permission;
|
||||||
|
import org.apache.hadoop.yarn.security.PrivilegedEntity;
|
||||||
|
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
|
||||||
|
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
@ -123,6 +131,7 @@ public class FairScheduler extends
|
||||||
private FairSchedulerConfiguration conf;
|
private FairSchedulerConfiguration conf;
|
||||||
|
|
||||||
private FSContext context;
|
private FSContext context;
|
||||||
|
private YarnAuthorizationProvider authorizer;
|
||||||
private Resource incrAllocation;
|
private Resource incrAllocation;
|
||||||
private QueueManager queueMgr;
|
private QueueManager queueMgr;
|
||||||
private boolean usePortForNodeName;
|
private boolean usePortForNodeName;
|
||||||
|
@ -1214,6 +1223,7 @@ public class FairScheduler extends
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
this.conf = new FairSchedulerConfiguration(conf);
|
this.conf = new FairSchedulerConfiguration(conf);
|
||||||
validateConf(this.conf);
|
validateConf(this.conf);
|
||||||
|
authorizer = YarnAuthorizationProvider.getInstance(conf);
|
||||||
minimumAllocation = this.conf.getMinimumAllocation();
|
minimumAllocation = this.conf.getMinimumAllocation();
|
||||||
initMaximumResourceCapability(this.conf.getMaximumAllocation());
|
initMaximumResourceCapability(this.conf.getMaximumAllocation());
|
||||||
incrAllocation = this.conf.getIncrementAllocation();
|
incrAllocation = this.conf.getIncrementAllocation();
|
||||||
|
@ -1422,23 +1432,46 @@ public class FairScheduler extends
|
||||||
AllocationFileLoaderService.Listener {
|
AllocationFileLoaderService.Listener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onReload(AllocationConfiguration queueInfo) {
|
public void onReload(AllocationConfiguration queueInfo)
|
||||||
|
throws IOException {
|
||||||
// Commit the reload; also create any queue defined in the alloc file
|
// Commit the reload; also create any queue defined in the alloc file
|
||||||
// if it does not already exist, so it can be displayed on the web UI.
|
// if it does not already exist, so it can be displayed on the web UI.
|
||||||
|
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
allocConf = queueInfo;
|
if (queueInfo == null) {
|
||||||
allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
|
authorizer.setPermission(allocsLoader.getDefaultPermissions(),
|
||||||
queueMgr.updateAllocationConfiguration(allocConf);
|
UserGroupInformation.getCurrentUser());
|
||||||
applyChildDefaults();
|
} else {
|
||||||
maxRunningEnforcer.updateRunnabilityOnReload();
|
allocConf = queueInfo;
|
||||||
|
setQueueAcls(allocConf.getQueueAcls());
|
||||||
|
allocConf.getDefaultSchedulingPolicy().initialize(
|
||||||
|
getClusterResource());
|
||||||
|
queueMgr.updateAllocationConfiguration(allocConf);
|
||||||
|
applyChildDefaults();
|
||||||
|
maxRunningEnforcer.updateRunnabilityOnReload();
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setQueueAcls(
|
||||||
|
Map<String, Map<AccessType, AccessControlList>> queueAcls)
|
||||||
|
throws IOException {
|
||||||
|
authorizer.setPermission(allocsLoader.getDefaultPermissions(),
|
||||||
|
UserGroupInformation.getCurrentUser());
|
||||||
|
List<Permission> permissions = new ArrayList<>();
|
||||||
|
for (Entry<String, Map<AccessType, AccessControlList>> queueAcl : queueAcls
|
||||||
|
.entrySet()) {
|
||||||
|
permissions.add(new Permission(new PrivilegedEntity(EntityType.QUEUE,
|
||||||
|
queueAcl.getKey()), queueAcl.getValue()));
|
||||||
|
}
|
||||||
|
authorizer.setPermission(permissions,
|
||||||
|
UserGroupInformation.getCurrentUser());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* After reloading the allocation config, the max resource settings for any
|
* After reloading the allocation config, the max resource settings for any
|
||||||
* ad hoc queues will be missing. This method goes through the queue manager's
|
* ad hoc queues will be missing. This method goes through the queue manager's
|
||||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
|
@ -94,10 +95,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
|
@ -153,6 +152,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
}
|
}
|
||||||
QueueMetrics.clearQueueMetrics();
|
QueueMetrics.clearQueueMetrics();
|
||||||
DefaultMetricsSystem.shutdown();
|
DefaultMetricsSystem.shutdown();
|
||||||
|
YarnAuthorizationProvider.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue