YARN-2575. Create separate ACLs for Reservation create/update/delete/list ops (Sean Po via asuresh)

This commit is contained in:
Arun Suresh 2016-02-11 10:47:43 -08:00
parent 0aa8c82894
commit 23f937e3b7
18 changed files with 1125 additions and 125 deletions

View File

@ -813,6 +813,8 @@ Release 2.8.0 - UNRELEASED
YARN-4138. Roll back container resource allocation after resource
increase token expires. (Meng Ding via jianhe)
YARN-2575. Create separate ACLs for Reservation create/update/delete/list
ops (Sean Po via asuresh)
OPTIMIZATIONS

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
/**
* {@code ReservationACL} enumerates the various ACLs for reservations.
* <p>
* The ACL is one of:
* <ul>
* <li>
* {@link #ADMINISTER_RESERVATIONS} - ACL to create, list, update and
* delete reservations.
* </li>
* <li> {@link #LIST_RESERVATIONS} - ACL to list reservations. </li>
* <li> {@link #SUBMIT_RESERVATIONS} - ACL to create reservations. </li>
* </ul>
* Users can always list, update and delete their own reservations.
*/
@Public
@Stable
public enum ReservationACL {
/**
* ACL to create, list, update and delete reservations.
*/
ADMINISTER_RESERVATIONS,
/**
* ACL to list reservations.
*/
LIST_RESERVATIONS,
/**
* ACL to create reservations.
*/
SUBMIT_RESERVATIONS
}

View File

@ -279,6 +279,11 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "acl.enable";
public static final boolean DEFAULT_YARN_ACL_ENABLE = false;
/** Are reservation acls enabled.*/
public static final String YARN_RESERVATION_ACL_ENABLE =
YARN_PREFIX + "acl.reservation-enable";
public static final boolean DEFAULT_YARN_RESERVATION_ACL_ENABLE = false;
public static boolean isAclEnabled(Configuration conf) {
return conf.getBoolean(YARN_ACL_ENABLE, DEFAULT_YARN_ACL_ENABLE);
}

View File

@ -190,6 +190,12 @@
<value>false</value>
</property>
<property>
<description>Are reservation acls enabled.</description>
<name>yarn.acl.reservation-enable</name>
<value>false</value>
</property>
<property>
<description>ACL of who can be admin of the YARN cluster.</description>
<name>yarn.admin.acl</name>

View File

@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.UnrecognizedOptionException;
import org.apache.commons.lang.math.LongRange;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -117,6 +118,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -158,6 +160,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeRepo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -604,6 +607,12 @@ public class ClientRMService extends AbstractService implements
}
}
ReservationId reservationId = request.getApplicationSubmissionContext()
.getReservationID();
checkReservationACLs(submissionContext.getQueue(), AuditConstants
.SUBMIT_RESERVATION_REQUEST, reservationId);
try {
// call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext,
@ -1222,7 +1231,7 @@ public class ClientRMService extends AbstractService implements
String queueName = request.getQueue();
String user =
checkReservationACLs(queueName,
AuditConstants.SUBMIT_RESERVATION_REQUEST);
AuditConstants.SUBMIT_RESERVATION_REQUEST, null);
try {
// Try to place the reservation using the agent
boolean result =
@ -1264,7 +1273,7 @@ public class ClientRMService extends AbstractService implements
// Check ACLs
String user =
checkReservationACLs(queueName,
AuditConstants.UPDATE_RESERVATION_REQUEST);
AuditConstants.UPDATE_RESERVATION_REQUEST, reservationId);
// Try to update the reservation using default agent
try {
boolean result =
@ -1303,7 +1312,7 @@ public class ClientRMService extends AbstractService implements
// Check ACLs
String user =
checkReservationACLs(queueName,
AuditConstants.DELETE_RESERVATION_REQUEST);
AuditConstants.DELETE_RESERVATION_REQUEST, reservationId);
// Try to update the reservation using default agent
try {
boolean result =
@ -1340,8 +1349,15 @@ public class ClientRMService extends AbstractService implements
boolean includeResourceAllocations = requestInfo
.getIncludeResourceAllocations();
String user = checkReservationACLs(requestInfo.getQueue(),
AuditConstants.LIST_RESERVATION_REQUEST);
ReservationId reservationId = null;
if (requestInfo.getReservationId() != null && !requestInfo
.getReservationId().isEmpty()) {
reservationId = ReservationId.parseReservationId(
requestInfo.getReservationId());
}
checkReservationACLs(requestInfo.getQueue(),
AuditConstants.LIST_RESERVATION_REQUEST, reservationId);
ReservationId requestedId = null;
if (requestInfo.getReservationId() != null
@ -1354,8 +1370,10 @@ public class ClientRMService extends AbstractService implements
long endTime = requestInfo.getEndTime() <= -1? Long.MAX_VALUE : requestInfo
.getEndTime();
Set<ReservationAllocation> reservations = plan.getReservations(
requestedId, new ReservationInterval(startTime, endTime), user);
Set<ReservationAllocation> reservations;
reservations = plan.getReservations(requestedId, new ReservationInterval(
startTime, endTime));
List<ReservationAllocationState> info =
ReservationSystemUtil.convertAllocationsToReservationInfo(
@ -1419,8 +1437,9 @@ public class ClientRMService extends AbstractService implements
}
}
private String checkReservationACLs(String queueName, String auditConstant)
throws YarnException {
private String checkReservationACLs(String queueName, String auditConstant,
ReservationId reservationId)
throws YarnException, IOException {
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
@ -1429,20 +1448,79 @@ public class ClientRMService extends AbstractService implements
"ClientRMService", "Error getting UGI");
throw RPCUtil.getRemoteException(ie);
}
// Check if user has access on the managed queue
if (!queueACLsManager.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS,
queueName, null, null)) {
RMAuditLogger.logFailure(
callerUGI.getShortUserName(),
auditConstant,
"User doesn't have permissions to "
+ QueueACL.SUBMIT_APPLICATIONS.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ QueueACL.SUBMIT_APPLICATIONS.name() + " on queue" + queueName));
if (reservationSystem == null) {
return callerUGI.getShortUserName();
}
return callerUGI.getShortUserName();
ReservationsACLsManager manager = reservationSystem
.getReservationsACLsManager();
ReservationACL reservationACL = getReservationACLFromAuditConstant(
auditConstant);
if (manager == null) {
return callerUGI.getShortUserName();
}
String reservationCreatorName = "";
ReservationAllocation reservation;
// Get the user associated with the reservation.
Plan plan = reservationSystem.getPlan(queueName);
if (reservationId != null && plan != null) {
reservation = plan.getReservationById(reservationId);
if (reservation != null) {
reservationCreatorName = reservation.getUser();
}
}
// If the reservation to be altered or listed belongs to the current user,
// access will be given.
if (reservationCreatorName != null && !reservationCreatorName.isEmpty()
&& reservationCreatorName.equals(callerUGI.getUserName())) {
return callerUGI.getShortUserName();
}
// Check if the user has access to the specific ACL
if (manager.checkAccess(callerUGI, reservationACL, queueName)) {
return callerUGI.getShortUserName();
}
// If the user has Administer ACL then access is granted
if (manager.checkAccess(callerUGI, ReservationACL
.ADMINISTER_RESERVATIONS, queueName)) {
return callerUGI.getShortUserName();
}
handleNoAccess(callerUGI.getShortUserName(), queueName, auditConstant,
reservationACL.toString(), reservationACL.name());
throw new IllegalStateException();
}
private ReservationACL getReservationACLFromAuditConstant(
String auditConstant) throws YarnException{
if (auditConstant.equals(AuditConstants.SUBMIT_RESERVATION_REQUEST)) {
return ReservationACL.SUBMIT_RESERVATIONS;
} else if (auditConstant.equals(AuditConstants.LIST_RESERVATION_REQUEST)) {
return ReservationACL.LIST_RESERVATIONS;
} else if (auditConstant.equals(AuditConstants.DELETE_RESERVATION_REQUEST)
|| auditConstant.equals(AuditConstants.UPDATE_RESERVATION_REQUEST)) {
return ReservationACL.ADMINISTER_RESERVATIONS;
} else {
String error = "Audit Constant " + auditConstant + " is not recognized.";
LOG.error(error);
throw RPCUtil.getRemoteException(new UnrecognizedOptionException(error));
}
}
private void handleNoAccess(String name, String queue, String auditConstant,
String acl, String op) throws YarnException {
RMAuditLogger.logFailure(
name,
auditConstant,
"User doesn't have permissions to " + acl, "ClientRMService",
auditConstant);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ name + " cannot perform operation " + op + " on queue " + queue));
}
@Override

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -98,6 +99,8 @@ public abstract class AbstractReservationSystem extends AbstractService
private PlanFollower planFollower;
private ReservationsACLsManager reservationsACLsManager;
private boolean isRecoveryEnabled = false;
/**
@ -158,6 +161,13 @@ public abstract class AbstractReservationSystem extends AbstractService
isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
if (conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE,
YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) &&
conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
reservationsACLsManager = new ReservationsACLsManager(scheduler, conf);
}
}
private void loadPlan(String planName,
@ -475,6 +485,10 @@ public abstract class AbstractReservationSystem extends AbstractService
}
}
public ReservationsACLsManager getReservationsACLsManager() {
return this.reservationsACLsManager;
}
protected abstract ReservationSchedulerConfiguration
getReservationSchedulerConfiguration();

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
@ -85,14 +84,6 @@ public class CapacityOverTimePolicy implements SharingPolicy {
ReservationAllocation oldReservation =
plan.getReservationById(reservation.getReservationId());
// sanity check that the update of a reservation is not changing username
if (oldReservation != null
&& !oldReservation.getUser().equals(reservation.getUser())) {
throw new MismatchedUserException(
"Updating an existing reservation with mismatched user:"
+ oldReservation.getUser() + " != " + reservation.getUser());
}
long startTime = reservation.getStartTime();
long endTime = reservation.getEndTime();
long step = plan.getStep();

View File

@ -476,7 +476,13 @@ public class InMemoryPlan implements Plan {
}
@Override
public Set<ReservationAllocation> getReservations(ReservationId
public Set<ReservationAllocation> getReservations(ReservationId
reservationID, ReservationInterval interval) {
return getReservations(reservationID, interval, null);
}
@Override
public Set<ReservationAllocation> getReservations(ReservationId
reservationID, ReservationInterval interval, String user) {
if (reservationID != null) {
ReservationAllocation allocation = getReservationById(reservationID);

View File

@ -42,12 +42,26 @@ public interface PlanView extends PlanContext {
* greater than the interval end time, and end time no less
* than the interval start time will be selected.
* @param user the user to retrieve the reservation allocation from.
* @return {@link ReservationAllocation} identified by the user who
* @return a set of {@link ReservationAllocation} identified by the user who
* made the reservation
*/
Set<ReservationAllocation> getReservations(ReservationId
reservationID, ReservationInterval interval, String user);
/**
* Return a set of {@link ReservationAllocation} identified by any user.
*
* @param reservationID the unqiue id to identify the
* {@link ReservationAllocation}
* @param interval the time interval used to retrieve the reservation
* allocations from. Only reservations with start time no
* greater than the interval end time, and end time no less
* than the interval start time will be selected.
* @return a set of {@link ReservationAllocation} identified by any user
*/
Set<ReservationAllocation> getReservations(ReservationId reservationID,
ReservationInterval interval);
/**
* Return a {@link ReservationAllocation} identified by its
* {@link ReservationId}

View File

@ -20,8 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import java.util.Map;
public abstract class ReservationSchedulerConfiguration extends Configuration {
@InterfaceAudience.Private
@ -67,6 +71,17 @@ public abstract class ReservationSchedulerConfiguration extends Configuration {
*/
public abstract boolean isReservable(String queue);
/**
* Gets a map containing the {@link AccessControlList} of users for each
* {@link ReservationACL} acl on thee specified queue.
*
* @param queue the queue with which to check a user's permissions.
* @return The a Map of {@link ReservationACL} to {@link AccessControlList}
* which contains a list of users that have the specified permission level.
*/
public abstract Map<ReservationACL, AccessControlList> getReservationAcls(
String queue);
/**
* Gets the length of time in milliseconds for which the {@link SharingPolicy}
* checks for validity

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
import java.util.Map;
@ -123,4 +124,12 @@ public interface ReservationSystem extends Recoverable {
*/
void setQueueForReservation(ReservationId reservationId, String queueName);
/**
* Get the {@link ReservationsACLsManager} to use to check for the reservation
* access on a user.
*
* @return the reservation ACL manager to use to check reservation ACLs.
*
*/
ReservationsACLsManager getReservationsACLsManager();
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
@ -566,6 +567,35 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
set(queuePrefix + getAclKey(acl), aclString);
}
private static String getAclKey(ReservationACL acl) {
return "acl_" + StringUtils.toLowerCase(acl.toString());
}
@Override
public Map<ReservationACL, AccessControlList> getReservationAcls(String
queue) {
Map<ReservationACL, AccessControlList> resAcls = new HashMap<>();
for (ReservationACL acl : ReservationACL.values()) {
resAcls.put(acl, getReservationAcl(queue, acl));
}
return resAcls;
}
private AccessControlList getReservationAcl(String queue, ReservationACL
acl) {
String queuePrefix = getQueuePrefix(queue);
// The root queue defaults to all access if not defined
// Sub queues inherit access if not defined
String defaultAcl = ALL_ACL;
String aclString = get(queuePrefix + getAclKey(acl), defaultAcl);
return new AccessControlList(aclString);
}
private void setAcl(String queue, ReservationACL acl, String aclString) {
String queuePrefix = getQueuePrefix(queue);
set(queuePrefix + getAclKey(acl), aclString);
}
public Map<AccessType, AccessControlList> getAcls(String queue) {
Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>();
@ -581,6 +611,14 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
}
}
@VisibleForTesting
public void setReservationAcls(String queue,
Map<ReservationACL, AccessControlList> acls) {
for (Map.Entry<ReservationACL, AccessControlList> e : acls.entrySet()) {
setAcl(queue, e.getKey(), e.getValue().getAclString());
}
}
public String[] getQueues(String queue) {
LOG.debug("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue));
String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
@ -65,6 +66,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
// Reservation ACL's for each queue. Only specifies non-default ACL's from
// configuration.
private final Map<String, Map<ReservationACL, AccessControlList>> resAcls;
// Min share preemption timeout for each queue in seconds. If a job in the queue
// waits this long without receiving its guaranteed share, it is allowed to
// preempt other jobs' tasks.
@ -113,6 +118,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues,
ReservationQueueConfiguration globalReservationQueueConfig,
@ -134,6 +140,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts;
this.fairSharePreemptionThresholds = fairSharePreemptionThresholds;
this.queueAcls = queueAcls;
this.resAcls = resAcls;
this.reservableQueues = reservableQueues;
this.globalReservationQueueConfig = globalReservationQueueConfig;
this.placementPolicy = placementPolicy;
@ -153,6 +160,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
queueMaxResourcesDefault = Resources.unbounded();
queueMaxAMShareDefault = 0.5f;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
resAcls = new HashMap<String, Map<ReservationACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
fairSharePreemptionTimeouts = new HashMap<String, Long>();
fairSharePreemptionThresholds = new HashMap<String, Float>();
@ -184,7 +192,17 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
}
return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL;
}
@Override
/**
* Get the map of reservation ACLs to {@link AccessControlList} for the
* specified queue.
*/
public Map<ReservationACL, AccessControlList> getReservationAcls(String
queue) {
return this.resAcls.get(queue);
}
/**
* Get a queue's min share preemption timeout configured in the allocation
* file, in milliseconds. Return -1 if not set.

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.util.Clock;
@ -223,6 +224,8 @@ public class AllocationFileLoaderService extends AbstractService {
new HashMap<String, Float>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>();
Map<String, Map<ReservationACL, AccessControlList>> reservationAcls =
new HashMap<String, Map<ReservationACL, AccessControlList>>();
Set<String> reservableQueues = new HashSet<String>();
Set<String> nonPreemptableQueues = new HashSet<String>();
int userMaxAppsDefault = Integer.MAX_VALUE;
@ -360,8 +363,8 @@ public class AllocationFileLoaderService extends AbstractService {
loadQueue(parent, element, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
fairSharePreemptionThresholds, queueAcls, configuredQueues,
reservableQueues, nonPreemptableQueues);
fairSharePreemptionThresholds, queueAcls, reservationAcls,
configuredQueues, reservableQueues, nonPreemptableQueues);
}
// Load placement policy and pass it configured queues
@ -409,8 +412,8 @@ public class AllocationFileLoaderService extends AbstractService {
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
defaultSchedPolicy, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
newPlacementPolicy, configuredQueues, globalReservationQueueConfig,
reservableQueues, nonPreemptableQueues);
reservationAcls, newPlacementPolicy, configuredQueues,
globalReservationQueueConfig, reservableQueues, nonPreemptableQueues);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
@ -431,6 +434,7 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
Map<FSQueueType, Set<String>> configuredQueues,
Set<String> reservableQueues,
Set<String> nonPreemptableQueues)
@ -453,6 +457,7 @@ public class AllocationFileLoaderService extends AbstractService {
}
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
Map<ReservationACL, AccessControlList> racls = new HashMap<>();
NodeList fields = element.getChildNodes();
boolean isLeaf = true;
@ -506,6 +511,18 @@ public class AllocationFileLoaderService extends AbstractService {
} else if ("aclAdministerApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
} else if ("aclAdministerReservations".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
racls.put(ReservationACL.ADMINISTER_RESERVATIONS,
new AccessControlList(text));
} else if ("aclListReservations".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
racls.put(ReservationACL.LIST_RESERVATIONS, new AccessControlList(
text));
} else if ("aclSubmitReservations".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
racls.put(ReservationACL.SUBMIT_RESERVATIONS,
new AccessControlList(text));
} else if ("reservation".equals(field.getTagName())) {
isLeaf = false;
reservableQueues.add(queueName);
@ -521,7 +538,7 @@ public class AllocationFileLoaderService extends AbstractService {
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
queueAcls, configuredQueues, reservableQueues,
queueAcls, resAcls, configuredQueues, reservableQueues,
nonPreemptableQueues);
isLeaf = false;
}
@ -543,6 +560,7 @@ public class AllocationFileLoaderService extends AbstractService {
configuredQueues.get(FSQueueType.PARENT).add(queueName);
}
queueAcls.put(queueName, acls);
resAcls.put(queueName, racls);
if (maxQueueResources.containsKey(queueName) &&
minQueueResources.containsKey(queueName)
&& !Resources.fitsIn(minQueueResources.get(queueName),

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import java.util.HashMap;
import java.util.Map;
/**
* The {@link ReservationsACLsManager} is used to check a specified user's
* permissons to perform a reservation operation on the
* {@link CapacityScheduler} and the {@link FairScheduler}.
* {@link ReservationACL}s are used to specify reservation operations.
*/
public class ReservationsACLsManager {
private boolean isReservationACLsEnable;
private Map<String, Map<ReservationACL, AccessControlList>> reservationAcls
= new HashMap<>();
public ReservationsACLsManager(ResourceScheduler scheduler,
Configuration conf) throws YarnException {
this.isReservationACLsEnable =
conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE,
YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) &&
conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
YarnConfiguration.DEFAULT_YARN_ACL_ENABLE);
if (scheduler instanceof CapacityScheduler) {
CapacitySchedulerConfiguration csConf = new
CapacitySchedulerConfiguration(conf);
for (String planQueue : scheduler.getPlanQueues()) {
CSQueue queue = ((CapacityScheduler) scheduler).getQueue(planQueue);
reservationAcls.put(planQueue, csConf.getReservationAcls(queue
.getQueuePath()));
}
} else if (scheduler instanceof FairScheduler) {
AllocationConfiguration aConf = ((FairScheduler) scheduler)
.getAllocationConfiguration();
for (String planQueue : scheduler.getPlanQueues()) {
reservationAcls.put(planQueue, aConf.getReservationAcls(planQueue));
}
}
}
public boolean checkAccess(UserGroupInformation callerUGI,
ReservationACL acl, String queueName) {
if (!isReservationACLsEnable) {
return true;
}
if (this.reservationAcls.containsKey(queueName)) {
Map<ReservationACL, AccessControlList> acls = this.reservationAcls.get(
queueName);
if (acls.containsKey(acl)) {
return acls.get(acl).isUserAllowed(callerUGI);
} else {
// Give access if acl is undefined for queue.
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.junit.Before;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public abstract class ACLsTestBase {
protected static final String COMMON_USER = "common_user";
protected static final String QUEUE_A_USER = "queueA_user";
protected static final String QUEUE_B_USER = "queueB_user";
protected static final String ROOT_ADMIN = "root_admin";
protected static final String QUEUE_A_ADMIN = "queueA_admin";
protected static final String QUEUE_B_ADMIN = "queueB_admin";
protected static final String QUEUEA = "queueA";
protected static final String QUEUEB = "queueB";
protected static final String QUEUEC = "queueC";
protected static final Log LOG = LogFactory.getLog(TestApplicationACLs.class);
MockRM resourceManager;
Configuration conf;
YarnRPC rpc;
InetSocketAddress rmAddress;
@Before
public void setup() throws InterruptedException, IOException {
conf = createConfiguration();
rpc = YarnRPC.create(conf);
rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
AccessControlList adminACL = new AccessControlList("");
conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString());
resourceManager = new MockRM(conf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), this.scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager, getRMContext()
.getRMDelegationTokenSecretManager());
}
@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
@Override
protected void doSecureLogin() throws IOException {
}
};
new Thread() {
public void run() {
resourceManager.start();
};
}.start();
int waitCount = 0;
while (resourceManager.getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1500);
}
if (resourceManager.getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException("ResourceManager failed to start. Final state is "
+ resourceManager.getServiceState());
}
}
protected ApplicationClientProtocol getRMClientForUser(String user)
throws IOException, InterruptedException {
UserGroupInformation userUGI = UserGroupInformation.createRemoteUser(user);
ApplicationClientProtocol userClient =
userUGI
.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
@Override
public ApplicationClientProtocol run() throws Exception {
return (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
}
});
return userClient;
}
protected abstract Configuration createConfiguration() throws IOException;
}

View File

@ -18,20 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@ -43,73 +35,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public abstract class QueueACLsTestBase {
protected static final String COMMON_USER = "common_user";
protected static final String QUEUE_A_USER = "queueA_user";
protected static final String QUEUE_B_USER = "queueB_user";
protected static final String ROOT_ADMIN = "root_admin";
protected static final String QUEUE_A_ADMIN = "queueA_admin";
protected static final String QUEUE_B_ADMIN = "queueB_admin";
protected static final String QUEUEA = "queueA";
protected static final String QUEUEB = "queueB";
private static final Log LOG = LogFactory.getLog(TestApplicationACLs.class);
MockRM resourceManager;
Configuration conf;
YarnRPC rpc;
InetSocketAddress rmAddress;
@Before
public void setup() throws InterruptedException, IOException {
conf = createConfiguration();
rpc = YarnRPC.create(conf);
rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
AccessControlList adminACL = new AccessControlList("");
conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString());
resourceManager = new MockRM(conf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), this.scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager, getRMContext().getRMDelegationTokenSecretManager());
};
@Override
protected void doSecureLogin() throws IOException {
}
};
new Thread() {
public void run() {
resourceManager.start();
};
}.start();
int waitCount = 0;
while (resourceManager.getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1500);
}
if (resourceManager.getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException("ResourceManager failed to start. Final state is "
+ resourceManager.getServiceState());
}
}
public abstract class QueueACLsTestBase extends ACLsTestBase {
@After
public void tearDown() {
@ -248,21 +180,4 @@ public abstract class QueueACLsTestBase {
acls.put(ApplicationAccessType.MODIFY_APP, modifyACL.getAclString());
return acls;
}
private ApplicationClientProtocol getRMClientForUser(String user)
throws IOException, InterruptedException {
UserGroupInformation userUGI = UserGroupInformation.createRemoteUser(user);
ApplicationClientProtocol userClient =
userUGI
.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
@Override
public ApplicationClientProtocol run() throws Exception {
return (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
}
});
return userClient;
}
protected abstract Configuration createConfiguration() throws IOException;
}

View File

@ -0,0 +1,600 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class ReservationACLsTestBase extends ACLsTestBase {
private final int defaultDuration = 600000;
private final ReservationRequest defaultRequest = ReservationRequest
.newInstance(BuilderUtils.newResource(1024, 1), 1, 1,
defaultDuration);
private final ReservationRequests defaultRequests = ReservationRequests
.newInstance(Collections.singletonList(defaultRequest),
ReservationRequestInterpreter.R_ALL);
private Configuration configuration;
private boolean useFullQueuePath;
public ReservationACLsTestBase(Configuration conf, boolean useFullPath) {
configuration = conf;
useFullQueuePath = useFullPath;
}
@After
public void tearDown() {
if (resourceManager != null) {
resourceManager.stop();
}
}
@Parameterized.Parameters
public static Collection<Object[]> data() throws IOException {
return Arrays.asList(new Object[][] {
{ createCapacitySchedulerConfiguration(), false },
{ createFairSchedulerConfiguration(), true }
});
}
@Test
public void testApplicationACLs() throws Exception {
registerNode("test:1234", 8192, 8);
String queueA = !useFullQueuePath? QUEUEA : CapacitySchedulerConfiguration
.ROOT + "." + QUEUEA;
String queueB = !useFullQueuePath? QUEUEB : CapacitySchedulerConfiguration
.ROOT + "." + QUEUEB;
String queueC = !useFullQueuePath? QUEUEC : CapacitySchedulerConfiguration
.ROOT + "." + QUEUEC;
// Submit Reservations
// Users of queue A can submit reservations on QueueA.
verifySubmitReservationSuccess(QUEUE_A_USER, queueA);
verifySubmitReservationSuccess(QUEUE_A_ADMIN, queueA);
// Users of queue B cannot submit reservations on QueueA.
verifySubmitReservationFailure(QUEUE_B_USER, queueA);
verifySubmitReservationFailure(QUEUE_B_ADMIN, queueA);
// Users of queue B can submit reservations on QueueB.
verifySubmitReservationSuccess(QUEUE_B_USER, queueB);
verifySubmitReservationSuccess(QUEUE_B_ADMIN, queueB);
// Users of queue A cannot submit reservations on QueueB.
verifySubmitReservationFailure(QUEUE_A_USER, queueB);
verifySubmitReservationFailure(QUEUE_A_ADMIN, queueB);
// Everyone can submit reservations on QueueC.
verifySubmitReservationSuccess(QUEUE_B_USER, queueC);
verifySubmitReservationSuccess(QUEUE_B_ADMIN, queueC);
verifySubmitReservationSuccess(QUEUE_A_USER, queueC);
verifySubmitReservationSuccess(QUEUE_A_ADMIN, queueC);
verifySubmitReservationSuccess(COMMON_USER, queueC);
// List Reservations
// User with List Reservations, or Admin ACL can list everyone's
// reservations.
verifyListReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueA);
verifyListReservationSuccess(COMMON_USER, QUEUE_A_ADMIN, queueA);
verifyListReservationSuccess(COMMON_USER, QUEUE_A_USER, queueA);
// User without Admin or Reservation ACL can only list their own
// reservations by id.
verifyListReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_ADMIN, queueA);
verifyListReservationFailure(QUEUE_A_USER, QUEUE_A_USER, queueA);
verifyListReservationFailure(QUEUE_A_USER, QUEUE_A_ADMIN, queueA);
verifyListReservationByIdSuccess(QUEUE_A_USER, QUEUE_A_USER, queueA);
verifyListReservationByIdFailure(QUEUE_A_USER, QUEUE_A_ADMIN, queueA);
// User with List Reservations, or Admin ACL can list everyone's
// reservations.
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_USER, queueB);
verifyListReservationSuccess(COMMON_USER, QUEUE_B_ADMIN, queueB);
verifyListReservationSuccess(COMMON_USER, QUEUE_B_USER, queueB);
// User without Admin or Reservation ACL can only list their own
// reservations by id.
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_ADMIN, queueB);
verifyListReservationFailure(QUEUE_B_USER, QUEUE_B_USER, queueB);
verifyListReservationFailure(QUEUE_B_USER, QUEUE_B_ADMIN, queueB);
verifyListReservationByIdSuccess(QUEUE_B_USER, QUEUE_B_USER, queueB);
verifyListReservationByIdFailure(QUEUE_B_USER, QUEUE_B_ADMIN, queueB);
// Users with Admin ACL in one queue cannot list reservations in
// another queue
verifyListReservationFailure(QUEUE_B_ADMIN, QUEUE_A_ADMIN, queueA);
verifyListReservationFailure(QUEUE_B_ADMIN, QUEUE_A_USER, queueA);
verifyListReservationFailure(QUEUE_A_ADMIN, QUEUE_B_ADMIN, queueB);
verifyListReservationFailure(QUEUE_A_ADMIN, QUEUE_B_USER, queueB);
// All users can list reservations on QueueC because acls are enabled
// but not defined.
verifyListReservationSuccess(QUEUE_A_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationSuccess(QUEUE_B_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_A_ADMIN, queueC);
verifyListReservationSuccess(COMMON_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueC);
verifyListReservationSuccess(QUEUE_B_USER, QUEUE_A_USER, queueC);
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_A_USER, queueC);
verifyListReservationSuccess(COMMON_USER, QUEUE_A_USER, queueC);
verifyListReservationByIdSuccess(QUEUE_A_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationByIdSuccess(QUEUE_B_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationByIdSuccess(QUEUE_B_ADMIN, QUEUE_A_ADMIN, queueC);
verifyListReservationByIdSuccess(COMMON_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationByIdSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueC);
verifyListReservationByIdSuccess(QUEUE_B_USER, QUEUE_A_USER, queueC);
verifyListReservationByIdSuccess(QUEUE_B_ADMIN, QUEUE_A_USER, queueC);
verifyListReservationByIdSuccess(COMMON_USER, QUEUE_A_USER, queueC);
// Delete Reservations
// Only the user who made the reservation or an admin can delete it.
verifyDeleteReservationSuccess(QUEUE_A_USER, QUEUE_A_USER, queueA);
verifyDeleteReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueA);
// A non-admin cannot delete another user's reservation.
verifyDeleteReservationFailure(COMMON_USER, QUEUE_A_USER, queueA);
verifyDeleteReservationFailure(QUEUE_B_USER, QUEUE_A_USER, queueA);
verifyDeleteReservationFailure(QUEUE_B_ADMIN, QUEUE_A_USER, queueA);
// Only the user who made the reservation or an admin can delete it.
verifyDeleteReservationSuccess(QUEUE_B_USER, QUEUE_B_USER, queueB);
verifyDeleteReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_USER, queueB);
// A non-admin cannot delete another user's reservation.
verifyDeleteReservationFailure(COMMON_USER, QUEUE_B_USER, queueB);
verifyDeleteReservationFailure(QUEUE_A_USER, QUEUE_B_USER, queueB);
verifyDeleteReservationFailure(QUEUE_A_ADMIN, QUEUE_B_USER, queueB);
// All users can delete any reservation on QueueC because acls are enabled
// but not defined.
verifyDeleteReservationSuccess(COMMON_USER, QUEUE_B_ADMIN, queueC);
verifyDeleteReservationSuccess(QUEUE_B_USER, QUEUE_B_ADMIN, queueC);
verifyDeleteReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_ADMIN, queueC);
verifyDeleteReservationSuccess(QUEUE_A_USER, QUEUE_B_ADMIN, queueC);
verifyDeleteReservationSuccess(QUEUE_A_ADMIN, QUEUE_B_ADMIN, queueC);
// Update Reservation
// Only the user who made the reservation or an admin can update it.
verifyUpdateReservationSuccess(QUEUE_A_USER, QUEUE_A_USER, queueA);
verifyUpdateReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueA);
// A non-admin cannot update another user's reservation.
verifyUpdateReservationFailure(COMMON_USER, QUEUE_A_USER, queueA);
verifyUpdateReservationFailure(QUEUE_B_USER,QUEUE_A_USER, queueA);
verifyUpdateReservationFailure(QUEUE_B_ADMIN, QUEUE_A_USER, queueA);
// Only the user who made the reservation or an admin can update it.
verifyUpdateReservationSuccess(QUEUE_B_USER, QUEUE_B_USER, queueB);
verifyUpdateReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_USER, queueB);
// A non-admin cannot update another user's reservation.
verifyUpdateReservationFailure(COMMON_USER, QUEUE_B_USER, queueB);
verifyUpdateReservationFailure(QUEUE_A_USER, QUEUE_B_USER, queueB);
verifyUpdateReservationFailure(QUEUE_A_ADMIN, QUEUE_B_USER, queueB);
// All users can update any reservation on QueueC because acls are enabled
// but not defined.
verifyUpdateReservationSuccess(COMMON_USER, QUEUE_B_ADMIN, queueC);
verifyUpdateReservationSuccess(QUEUE_B_USER, QUEUE_B_ADMIN, queueC);
verifyUpdateReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_ADMIN, queueC);
verifyUpdateReservationSuccess(QUEUE_A_USER, QUEUE_B_ADMIN, queueC);
verifyUpdateReservationSuccess(QUEUE_A_ADMIN, QUEUE_B_ADMIN, queueC);
}
private void verifySubmitReservationSuccess(String submitter, String
queueName) throws Exception {
ReservationId reservationId =
submitReservation(submitter, queueName);
deleteReservation(submitter, reservationId);
}
private void verifySubmitReservationFailure(String submitter, String
queueName) throws Exception {
try {
submitReservation(submitter, queueName);
Assert.fail("Submit reservation by the enemy should fail!");
} catch (YarnException e) {
handleAdministerException(e, submitter, queueName, ReservationACL
.SUBMIT_RESERVATIONS.name());
}
}
private void verifyListReservationSuccess(String lister, String
originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
ReservationListResponse adminResponse = listReservation(lister, queueName);
assert(adminResponse.getReservationAllocationState().size() == 1);
assert(adminResponse.getReservationAllocationState().get(0).getUser()
.equals(originalSubmitter));
deleteReservation(originalSubmitter, reservationId);
}
private void verifyListReservationFailure(String lister,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
try {
listReservation(lister, queueName);
Assert.fail("List reservation by the enemy should fail!");
} catch (YarnException e) {
handleAdministerException(e, lister, queueName, ReservationACL
.LIST_RESERVATIONS.name());
}
deleteReservation(originalSubmitter, reservationId);
}
private void verifyListReservationByIdSuccess(String lister, String
originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
ReservationListResponse adminResponse = listReservationById(lister,
reservationId, queueName);
assert(adminResponse.getReservationAllocationState().size() == 1);
assert(adminResponse.getReservationAllocationState().get(0).getUser()
.equals(originalSubmitter));
deleteReservation(originalSubmitter, reservationId);
}
private void verifyListReservationByIdFailure(String lister,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
try {
listReservationById(lister, reservationId, queueName);
Assert.fail("List reservation by the enemy should fail!");
} catch (YarnException e) {
handleAdministerException(e, lister, queueName, ReservationACL
.LIST_RESERVATIONS.name());
}
deleteReservation(originalSubmitter, reservationId);
}
private void verifyDeleteReservationSuccess(String killer,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
deleteReservation(killer, reservationId);
}
private void verifyDeleteReservationFailure(String killer,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
try {
deleteReservation(killer, reservationId);
Assert.fail("Reservation deletion by the enemy should fail!");
} catch (YarnException e) {
handleAdministerException(e, killer, queueName, ReservationACL
.ADMINISTER_RESERVATIONS.name());
}
deleteReservation(originalSubmitter, reservationId);
}
private void verifyUpdateReservationSuccess(String updater,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
final ReservationUpdateRequest updateRequest =
ReservationUpdateRequest.newInstance(
makeSimpleReservationDefinition(), reservationId);
ApplicationClientProtocol ownerClient = getRMClientForUser(updater);
ownerClient.updateReservation(updateRequest);
deleteReservation(updater, reservationId);
}
private void verifyUpdateReservationFailure(String updater,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
final ReservationUpdateRequest updateRequest =
ReservationUpdateRequest.newInstance(
makeSimpleReservationDefinition(), reservationId);
ApplicationClientProtocol unauthorizedClient = getRMClientForUser(updater);
try {
unauthorizedClient.updateReservation(updateRequest);
Assert.fail("Reservation updating by the enemy should fail.");
} catch (YarnException e) {
handleAdministerException(e, updater, queueName, ReservationACL
.ADMINISTER_RESERVATIONS.name());
}
deleteReservation(originalSubmitter, reservationId);
}
private ReservationDefinition makeSimpleReservationDefinition() {
long arrival = System.currentTimeMillis();
String reservationName = UUID.randomUUID().toString();
return ReservationDefinition.newInstance
(arrival, arrival + (int)(defaultDuration * 1.1), defaultRequests,
reservationName);
}
private ReservationListResponse listReservationById(String lister,
ReservationId reservationId, String queueName) throws Exception {
final ReservationListRequest listRequest =
ReservationListRequest.newInstance(queueName, reservationId
.toString(), -1, -1, false);
ApplicationClientProtocol ownerClient = getRMClientForUser(lister);
return ownerClient.listReservations(listRequest);
}
private ReservationListResponse listReservation(String lister,
String queueName) throws Exception {
final ReservationListRequest listRequest =
ReservationListRequest.newInstance(queueName, null, -1, -1, false);
ApplicationClientProtocol ownerClient = getRMClientForUser(lister);
return ownerClient.listReservations(listRequest);
}
private void deleteReservation(String deleter, ReservationId id) throws
Exception {
ApplicationClientProtocol deleteClient = getRMClientForUser(deleter);
final ReservationDeleteRequest deleteRequest = ReservationDeleteRequest
.newInstance(id);
deleteClient.deleteReservation(deleteRequest);
}
private ReservationId submitReservation(String submitter,
String queueName) throws Exception {
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
ReservationSubmissionRequest reservationSubmissionRequest =
ReservationSubmissionRequest.newInstance(
makeSimpleReservationDefinition(), queueName);
ReservationSubmissionResponse response = submitterClient
.submitReservation(reservationSubmissionRequest);
return response.getReservationId();
}
private void handleAdministerException(Exception e, String user, String
queue, String operation) {
LOG.info("Got exception while killing app as the enemy", e);
Assert.assertTrue(e.getMessage().contains("User " + user
+ " cannot perform operation " + operation + " on queue "
+ queue));
}
private void registerNode(String host, int memory, int vCores) throws
Exception {
try {
resourceManager.registerNode(host, memory, vCores);
int attempts = 10;
Collection<Plan> plans;
do {
DrainDispatcher dispatcher =
(DrainDispatcher) resourceManager.getRMContext().getDispatcher();
dispatcher.await();
LOG.info("Waiting for node capacity to be added to plan");
plans = resourceManager.getRMContext().getReservationSystem()
.getAllPlans().values();
if (checkCapacity(plans)) {
break;
}
Thread.sleep(100);
} while (attempts-- > 0);
if (attempts <= 0) {
Assert.fail("Exhausted attempts in checking if node capacity was "
+ "added to the plan");
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
private boolean checkCapacity(Collection<Plan> plans) {
for (Plan plan : plans) {
if (plan.getTotalCapacity().getMemory() > 0) {
return true;
}
}
return false;
}
private static Configuration createCapacitySchedulerConfiguration() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
QUEUEA, QUEUEB, QUEUEC });
String absoluteQueueA = CapacitySchedulerConfiguration.ROOT + "." + QUEUEA;
String absoluteQueueB = CapacitySchedulerConfiguration.ROOT + "." + QUEUEB;
String absoluteQueueC = CapacitySchedulerConfiguration.ROOT + "." + QUEUEC;
csConf.setCapacity(absoluteQueueA, 50f);
csConf.setCapacity(absoluteQueueB, 20f);
csConf.setCapacity(absoluteQueueC, 30f);
csConf.setReservable(absoluteQueueA, true);
csConf.setReservable(absoluteQueueB, true);
csConf.setReservable(absoluteQueueC, true);
// Set up ACLs on Queue A
Map<ReservationACL, AccessControlList> reservationAclsOnQueueA =
new HashMap<>();
AccessControlList submitACLonQueueA = new AccessControlList(QUEUE_A_USER);
AccessControlList adminACLonQueueA = new AccessControlList(QUEUE_A_ADMIN);
AccessControlList listACLonQueueA = new AccessControlList(COMMON_USER);
reservationAclsOnQueueA.put(ReservationACL.SUBMIT_RESERVATIONS,
submitACLonQueueA);
reservationAclsOnQueueA.put(ReservationACL.ADMINISTER_RESERVATIONS,
adminACLonQueueA);
reservationAclsOnQueueA.put(ReservationACL.LIST_RESERVATIONS,
listACLonQueueA);
csConf.setReservationAcls(absoluteQueueA, reservationAclsOnQueueA);
// Set up ACLs on Queue B
Map<ReservationACL, AccessControlList> reservationAclsOnQueueB =
new HashMap<>();
AccessControlList submitACLonQueueB = new AccessControlList(QUEUE_B_USER);
AccessControlList adminACLonQueueB = new AccessControlList(QUEUE_B_ADMIN);
AccessControlList listACLonQueueB = new AccessControlList(COMMON_USER);
reservationAclsOnQueueB.put(ReservationACL.SUBMIT_RESERVATIONS,
submitACLonQueueB);
reservationAclsOnQueueB.put(ReservationACL.ADMINISTER_RESERVATIONS,
adminACLonQueueB);
reservationAclsOnQueueB.put(ReservationACL.LIST_RESERVATIONS,
listACLonQueueB);
csConf.setReservationAcls(absoluteQueueB, reservationAclsOnQueueB);
csConf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
csConf.setBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE, true);
csConf.set("yarn.resourcemanager.scheduler.class", CapacityScheduler
.class.getName());
return csConf;
}
private static Configuration createFairSchedulerConfiguration() throws
IOException {
FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
final String TEST_DIR = new File(System.getProperty("test.build.data",
"/tmp")).getAbsolutePath();
final String ALLOC_FILE = new File(TEST_DIR, "test-queues.xml")
.getAbsolutePath();
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"queueA\">");
out.println(" <aclSubmitReservations>" +
"queueA_user,common_user " +
"</aclSubmitReservations>");
out.println(" <aclAdministerReservations>" +
"queueA_admin " +
"</aclAdministerReservations>");
out.println(" <aclListReservations>common_user </aclListReservations>");
out.println(" <aclSubmitApps>queueA_user,common_user </aclSubmitApps>");
out.println(" <aclAdministerApps>queueA_admin </aclAdministerApps>");
out.println(" <reservation> </reservation>");
out.println(" </queue>");
out.println(" <queue name=\"queueB\">");
out.println(" <aclSubmitApps>queueB_user,common_user </aclSubmitApps>");
out.println(" <aclAdministerApps>queueB_admin </aclAdministerApps>");
out.println(" <aclSubmitReservations>" +
"queueB_user,common_user " +
"</aclSubmitReservations>");
out.println(" <aclAdministerReservations>" +
"queueB_admin " +
"</aclAdministerReservations>");
out.println(" <aclListReservations>common_user </aclListReservations>");
out.println(" <reservation> </reservation>");
out.println(" </queue>");
out.println(" <queue name=\"queueC\">");
out.println(" <reservation> </reservation>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
fsConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
fsConf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
fsConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
fsConf.setBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE, true);
fsConf.set("yarn.resourcemanager.scheduler.class", FairScheduler.class
.getName());
return fsConf;
}
@Override
protected Configuration createConfiguration() {
return configuration;
}
}