YARN-2575. Create separate ACLs for Reservation create/update/delete/list ops (Sean Po via asuresh)
(cherry picked from commit 23f937e3b7
)
This commit is contained in:
parent
dfd49a027e
commit
6a238e4a6b
|
@ -758,6 +758,8 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4138. Roll back container resource allocation after resource
|
||||
increase token expires. (Meng Ding via jianhe)
|
||||
|
||||
YARN-2575. Create separate ACLs for Reservation create/update/delete/list
|
||||
ops (Sean Po via asuresh)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
|
||||
/**
|
||||
* {@code ReservationACL} enumerates the various ACLs for reservations.
|
||||
* <p>
|
||||
* The ACL is one of:
|
||||
* <ul>
|
||||
* <li>
|
||||
* {@link #ADMINISTER_RESERVATIONS} - ACL to create, list, update and
|
||||
* delete reservations.
|
||||
* </li>
|
||||
* <li> {@link #LIST_RESERVATIONS} - ACL to list reservations. </li>
|
||||
* <li> {@link #SUBMIT_RESERVATIONS} - ACL to create reservations. </li>
|
||||
* </ul>
|
||||
* Users can always list, update and delete their own reservations.
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public enum ReservationACL {
|
||||
/**
|
||||
* ACL to create, list, update and delete reservations.
|
||||
*/
|
||||
ADMINISTER_RESERVATIONS,
|
||||
|
||||
/**
|
||||
* ACL to list reservations.
|
||||
*/
|
||||
LIST_RESERVATIONS,
|
||||
|
||||
/**
|
||||
* ACL to create reservations.
|
||||
*/
|
||||
SUBMIT_RESERVATIONS
|
||||
|
||||
}
|
|
@ -279,6 +279,11 @@ public class YarnConfiguration extends Configuration {
|
|||
YARN_PREFIX + "acl.enable";
|
||||
public static final boolean DEFAULT_YARN_ACL_ENABLE = false;
|
||||
|
||||
/** Are reservation acls enabled.*/
|
||||
public static final String YARN_RESERVATION_ACL_ENABLE =
|
||||
YARN_PREFIX + "acl.reservation-enable";
|
||||
public static final boolean DEFAULT_YARN_RESERVATION_ACL_ENABLE = false;
|
||||
|
||||
public static boolean isAclEnabled(Configuration conf) {
|
||||
return conf.getBoolean(YARN_ACL_ENABLE, DEFAULT_YARN_ACL_ENABLE);
|
||||
}
|
||||
|
|
|
@ -190,6 +190,12 @@
|
|||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Are reservation acls enabled.</description>
|
||||
<name>yarn.acl.reservation-enable</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>ACL of who can be admin of the YARN cluster.</description>
|
||||
<name>yarn.admin.acl</name>
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.cli.UnrecognizedOptionException;
|
||||
import org.apache.commons.lang.math.LongRange;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -117,6 +118,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
|||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
|
@ -158,6 +160,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeRepo
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
|
@ -603,6 +606,12 @@ public class ClientRMService extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
ReservationId reservationId = request.getApplicationSubmissionContext()
|
||||
.getReservationID();
|
||||
|
||||
checkReservationACLs(submissionContext.getQueue(), AuditConstants
|
||||
.SUBMIT_RESERVATION_REQUEST, reservationId);
|
||||
|
||||
try {
|
||||
// call RMAppManager to submit application directly
|
||||
rmAppManager.submitApplication(submissionContext,
|
||||
|
@ -1221,7 +1230,7 @@ public class ClientRMService extends AbstractService implements
|
|||
String queueName = request.getQueue();
|
||||
String user =
|
||||
checkReservationACLs(queueName,
|
||||
AuditConstants.SUBMIT_RESERVATION_REQUEST);
|
||||
AuditConstants.SUBMIT_RESERVATION_REQUEST, null);
|
||||
try {
|
||||
// Try to place the reservation using the agent
|
||||
boolean result =
|
||||
|
@ -1263,7 +1272,7 @@ public class ClientRMService extends AbstractService implements
|
|||
// Check ACLs
|
||||
String user =
|
||||
checkReservationACLs(queueName,
|
||||
AuditConstants.UPDATE_RESERVATION_REQUEST);
|
||||
AuditConstants.UPDATE_RESERVATION_REQUEST, reservationId);
|
||||
// Try to update the reservation using default agent
|
||||
try {
|
||||
boolean result =
|
||||
|
@ -1302,7 +1311,7 @@ public class ClientRMService extends AbstractService implements
|
|||
// Check ACLs
|
||||
String user =
|
||||
checkReservationACLs(queueName,
|
||||
AuditConstants.DELETE_RESERVATION_REQUEST);
|
||||
AuditConstants.DELETE_RESERVATION_REQUEST, reservationId);
|
||||
// Try to update the reservation using default agent
|
||||
try {
|
||||
boolean result =
|
||||
|
@ -1339,8 +1348,15 @@ public class ClientRMService extends AbstractService implements
|
|||
boolean includeResourceAllocations = requestInfo
|
||||
.getIncludeResourceAllocations();
|
||||
|
||||
String user = checkReservationACLs(requestInfo.getQueue(),
|
||||
AuditConstants.LIST_RESERVATION_REQUEST);
|
||||
ReservationId reservationId = null;
|
||||
if (requestInfo.getReservationId() != null && !requestInfo
|
||||
.getReservationId().isEmpty()) {
|
||||
reservationId = ReservationId.parseReservationId(
|
||||
requestInfo.getReservationId());
|
||||
}
|
||||
|
||||
checkReservationACLs(requestInfo.getQueue(),
|
||||
AuditConstants.LIST_RESERVATION_REQUEST, reservationId);
|
||||
|
||||
ReservationId requestedId = null;
|
||||
if (requestInfo.getReservationId() != null
|
||||
|
@ -1353,8 +1369,10 @@ public class ClientRMService extends AbstractService implements
|
|||
long endTime = requestInfo.getEndTime() <= -1? Long.MAX_VALUE : requestInfo
|
||||
.getEndTime();
|
||||
|
||||
Set<ReservationAllocation> reservations = plan.getReservations(
|
||||
requestedId, new ReservationInterval(startTime, endTime), user);
|
||||
Set<ReservationAllocation> reservations;
|
||||
|
||||
reservations = plan.getReservations(requestedId, new ReservationInterval(
|
||||
startTime, endTime));
|
||||
|
||||
List<ReservationAllocationState> info =
|
||||
ReservationSystemUtil.convertAllocationsToReservationInfo(
|
||||
|
@ -1418,8 +1436,9 @@ public class ClientRMService extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
private String checkReservationACLs(String queueName, String auditConstant)
|
||||
throws YarnException {
|
||||
private String checkReservationACLs(String queueName, String auditConstant,
|
||||
ReservationId reservationId)
|
||||
throws YarnException, IOException {
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
|
@ -1428,20 +1447,79 @@ public class ClientRMService extends AbstractService implements
|
|||
"ClientRMService", "Error getting UGI");
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
// Check if user has access on the managed queue
|
||||
if (!queueACLsManager.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS,
|
||||
queueName, null, null)) {
|
||||
RMAuditLogger.logFailure(
|
||||
callerUGI.getShortUserName(),
|
||||
auditConstant,
|
||||
"User doesn't have permissions to "
|
||||
+ QueueACL.SUBMIT_APPLICATIONS.toString(), "ClientRMService",
|
||||
AuditConstants.UNAUTHORIZED_USER);
|
||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
||||
+ QueueACL.SUBMIT_APPLICATIONS.name() + " on queue" + queueName));
|
||||
|
||||
if (reservationSystem == null) {
|
||||
return callerUGI.getShortUserName();
|
||||
}
|
||||
return callerUGI.getShortUserName();
|
||||
|
||||
ReservationsACLsManager manager = reservationSystem
|
||||
.getReservationsACLsManager();
|
||||
ReservationACL reservationACL = getReservationACLFromAuditConstant(
|
||||
auditConstant);
|
||||
|
||||
if (manager == null) {
|
||||
return callerUGI.getShortUserName();
|
||||
}
|
||||
|
||||
String reservationCreatorName = "";
|
||||
ReservationAllocation reservation;
|
||||
// Get the user associated with the reservation.
|
||||
Plan plan = reservationSystem.getPlan(queueName);
|
||||
if (reservationId != null && plan != null) {
|
||||
reservation = plan.getReservationById(reservationId);
|
||||
if (reservation != null) {
|
||||
reservationCreatorName = reservation.getUser();
|
||||
}
|
||||
}
|
||||
|
||||
// If the reservation to be altered or listed belongs to the current user,
|
||||
// access will be given.
|
||||
if (reservationCreatorName != null && !reservationCreatorName.isEmpty()
|
||||
&& reservationCreatorName.equals(callerUGI.getUserName())) {
|
||||
return callerUGI.getShortUserName();
|
||||
}
|
||||
|
||||
// Check if the user has access to the specific ACL
|
||||
if (manager.checkAccess(callerUGI, reservationACL, queueName)) {
|
||||
return callerUGI.getShortUserName();
|
||||
}
|
||||
|
||||
// If the user has Administer ACL then access is granted
|
||||
if (manager.checkAccess(callerUGI, ReservationACL
|
||||
.ADMINISTER_RESERVATIONS, queueName)) {
|
||||
return callerUGI.getShortUserName();
|
||||
}
|
||||
|
||||
handleNoAccess(callerUGI.getShortUserName(), queueName, auditConstant,
|
||||
reservationACL.toString(), reservationACL.name());
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
private ReservationACL getReservationACLFromAuditConstant(
|
||||
String auditConstant) throws YarnException{
|
||||
if (auditConstant.equals(AuditConstants.SUBMIT_RESERVATION_REQUEST)) {
|
||||
return ReservationACL.SUBMIT_RESERVATIONS;
|
||||
} else if (auditConstant.equals(AuditConstants.LIST_RESERVATION_REQUEST)) {
|
||||
return ReservationACL.LIST_RESERVATIONS;
|
||||
} else if (auditConstant.equals(AuditConstants.DELETE_RESERVATION_REQUEST)
|
||||
|| auditConstant.equals(AuditConstants.UPDATE_RESERVATION_REQUEST)) {
|
||||
return ReservationACL.ADMINISTER_RESERVATIONS;
|
||||
} else {
|
||||
String error = "Audit Constant " + auditConstant + " is not recognized.";
|
||||
LOG.error(error);
|
||||
throw RPCUtil.getRemoteException(new UnrecognizedOptionException(error));
|
||||
}
|
||||
}
|
||||
|
||||
private void handleNoAccess(String name, String queue, String auditConstant,
|
||||
String acl, String op) throws YarnException {
|
||||
RMAuditLogger.logFailure(
|
||||
name,
|
||||
auditConstant,
|
||||
"User doesn't have permissions to " + acl, "ClientRMService",
|
||||
auditConstant);
|
||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||
+ name + " cannot perform operation " + op + " on queue " + queue));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.UTCClock;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
|
@ -98,6 +99,8 @@ public abstract class AbstractReservationSystem extends AbstractService
|
|||
|
||||
private PlanFollower planFollower;
|
||||
|
||||
private ReservationsACLsManager reservationsACLsManager;
|
||||
|
||||
private boolean isRecoveryEnabled = false;
|
||||
|
||||
/**
|
||||
|
@ -158,6 +161,13 @@ public abstract class AbstractReservationSystem extends AbstractService
|
|||
isRecoveryEnabled = conf.getBoolean(
|
||||
YarnConfiguration.RECOVERY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
|
||||
|
||||
if (conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE,
|
||||
YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) &&
|
||||
conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
|
||||
YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
|
||||
reservationsACLsManager = new ReservationsACLsManager(scheduler, conf);
|
||||
}
|
||||
}
|
||||
|
||||
private void loadPlan(String planName,
|
||||
|
@ -475,6 +485,10 @@ public abstract class AbstractReservationSystem extends AbstractService
|
|||
}
|
||||
}
|
||||
|
||||
public ReservationsACLsManager getReservationsACLsManager() {
|
||||
return this.reservationsACLsManager;
|
||||
}
|
||||
|
||||
protected abstract ReservationSchedulerConfiguration
|
||||
getReservationSchedulerConfiguration();
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
||||
|
@ -85,14 +84,6 @@ public class CapacityOverTimePolicy implements SharingPolicy {
|
|||
ReservationAllocation oldReservation =
|
||||
plan.getReservationById(reservation.getReservationId());
|
||||
|
||||
// sanity check that the update of a reservation is not changing username
|
||||
if (oldReservation != null
|
||||
&& !oldReservation.getUser().equals(reservation.getUser())) {
|
||||
throw new MismatchedUserException(
|
||||
"Updating an existing reservation with mismatched user:"
|
||||
+ oldReservation.getUser() + " != " + reservation.getUser());
|
||||
}
|
||||
|
||||
long startTime = reservation.getStartTime();
|
||||
long endTime = reservation.getEndTime();
|
||||
long step = plan.getStep();
|
||||
|
|
|
@ -476,7 +476,13 @@ public class InMemoryPlan implements Plan {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Set<ReservationAllocation> getReservations(ReservationId
|
||||
public Set<ReservationAllocation> getReservations(ReservationId
|
||||
reservationID, ReservationInterval interval) {
|
||||
return getReservations(reservationID, interval, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ReservationAllocation> getReservations(ReservationId
|
||||
reservationID, ReservationInterval interval, String user) {
|
||||
if (reservationID != null) {
|
||||
ReservationAllocation allocation = getReservationById(reservationID);
|
||||
|
|
|
@ -42,12 +42,26 @@ public interface PlanView extends PlanContext {
|
|||
* greater than the interval end time, and end time no less
|
||||
* than the interval start time will be selected.
|
||||
* @param user the user to retrieve the reservation allocation from.
|
||||
* @return {@link ReservationAllocation} identified by the user who
|
||||
* @return a set of {@link ReservationAllocation} identified by the user who
|
||||
* made the reservation
|
||||
*/
|
||||
Set<ReservationAllocation> getReservations(ReservationId
|
||||
reservationID, ReservationInterval interval, String user);
|
||||
|
||||
/**
|
||||
* Return a set of {@link ReservationAllocation} identified by any user.
|
||||
*
|
||||
* @param reservationID the unqiue id to identify the
|
||||
* {@link ReservationAllocation}
|
||||
* @param interval the time interval used to retrieve the reservation
|
||||
* allocations from. Only reservations with start time no
|
||||
* greater than the interval end time, and end time no less
|
||||
* than the interval start time will be selected.
|
||||
* @return a set of {@link ReservationAllocation} identified by any user
|
||||
*/
|
||||
Set<ReservationAllocation> getReservations(ReservationId reservationID,
|
||||
ReservationInterval interval);
|
||||
|
||||
/**
|
||||
* Return a {@link ReservationAllocation} identified by its
|
||||
* {@link ReservationId}
|
||||
|
|
|
@ -20,8 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class ReservationSchedulerConfiguration extends Configuration {
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -67,6 +71,17 @@ public abstract class ReservationSchedulerConfiguration extends Configuration {
|
|||
*/
|
||||
public abstract boolean isReservable(String queue);
|
||||
|
||||
/**
|
||||
* Gets a map containing the {@link AccessControlList} of users for each
|
||||
* {@link ReservationACL} acl on thee specified queue.
|
||||
*
|
||||
* @param queue the queue with which to check a user's permissions.
|
||||
* @return The a Map of {@link ReservationACL} to {@link AccessControlList}
|
||||
* which contains a list of users that have the specified permission level.
|
||||
*/
|
||||
public abstract Map<ReservationACL, AccessControlList> getReservationAcls(
|
||||
String queue);
|
||||
|
||||
/**
|
||||
* Gets the length of time in milliseconds for which the {@link SharingPolicy}
|
||||
* checks for validity
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -123,4 +124,12 @@ public interface ReservationSystem extends Recoverable {
|
|||
*/
|
||||
void setQueueForReservation(ReservationId reservationId, String queueName);
|
||||
|
||||
/**
|
||||
* Get the {@link ReservationsACLsManager} to use to check for the reservation
|
||||
* access on a user.
|
||||
*
|
||||
* @return the reservation ACL manager to use to check reservation ACLs.
|
||||
*
|
||||
*/
|
||||
ReservationsACLsManager getReservationsACLsManager();
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
|
@ -566,6 +567,35 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
set(queuePrefix + getAclKey(acl), aclString);
|
||||
}
|
||||
|
||||
private static String getAclKey(ReservationACL acl) {
|
||||
return "acl_" + StringUtils.toLowerCase(acl.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ReservationACL, AccessControlList> getReservationAcls(String
|
||||
queue) {
|
||||
Map<ReservationACL, AccessControlList> resAcls = new HashMap<>();
|
||||
for (ReservationACL acl : ReservationACL.values()) {
|
||||
resAcls.put(acl, getReservationAcl(queue, acl));
|
||||
}
|
||||
return resAcls;
|
||||
}
|
||||
|
||||
private AccessControlList getReservationAcl(String queue, ReservationACL
|
||||
acl) {
|
||||
String queuePrefix = getQueuePrefix(queue);
|
||||
// The root queue defaults to all access if not defined
|
||||
// Sub queues inherit access if not defined
|
||||
String defaultAcl = ALL_ACL;
|
||||
String aclString = get(queuePrefix + getAclKey(acl), defaultAcl);
|
||||
return new AccessControlList(aclString);
|
||||
}
|
||||
|
||||
private void setAcl(String queue, ReservationACL acl, String aclString) {
|
||||
String queuePrefix = getQueuePrefix(queue);
|
||||
set(queuePrefix + getAclKey(acl), aclString);
|
||||
}
|
||||
|
||||
public Map<AccessType, AccessControlList> getAcls(String queue) {
|
||||
Map<AccessType, AccessControlList> acls =
|
||||
new HashMap<AccessType, AccessControlList>();
|
||||
|
@ -581,6 +611,14 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setReservationAcls(String queue,
|
||||
Map<ReservationACL, AccessControlList> acls) {
|
||||
for (Map.Entry<ReservationACL, AccessControlList> e : acls.entrySet()) {
|
||||
setAcl(queue, e.getKey(), e.getValue().getAclString());
|
||||
}
|
||||
}
|
||||
|
||||
public String[] getQueues(String queue) {
|
||||
LOG.debug("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue));
|
||||
String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
|
@ -65,6 +66,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
// ACL's for each queue. Only specifies non-default ACL's from configuration.
|
||||
private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
|
||||
|
||||
// Reservation ACL's for each queue. Only specifies non-default ACL's from
|
||||
// configuration.
|
||||
private final Map<String, Map<ReservationACL, AccessControlList>> resAcls;
|
||||
|
||||
// Min share preemption timeout for each queue in seconds. If a job in the queue
|
||||
// waits this long without receiving its guaranteed share, it is allowed to
|
||||
// preempt other jobs' tasks.
|
||||
|
@ -113,6 +118,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
Map<String, Long> fairSharePreemptionTimeouts,
|
||||
Map<String, Float> fairSharePreemptionThresholds,
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
||||
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
|
||||
QueuePlacementPolicy placementPolicy,
|
||||
Map<FSQueueType, Set<String>> configuredQueues,
|
||||
ReservationQueueConfiguration globalReservationQueueConfig,
|
||||
|
@ -134,6 +140,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts;
|
||||
this.fairSharePreemptionThresholds = fairSharePreemptionThresholds;
|
||||
this.queueAcls = queueAcls;
|
||||
this.resAcls = resAcls;
|
||||
this.reservableQueues = reservableQueues;
|
||||
this.globalReservationQueueConfig = globalReservationQueueConfig;
|
||||
this.placementPolicy = placementPolicy;
|
||||
|
@ -153,6 +160,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
queueMaxResourcesDefault = Resources.unbounded();
|
||||
queueMaxAMShareDefault = 0.5f;
|
||||
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
|
||||
resAcls = new HashMap<String, Map<ReservationACL, AccessControlList>>();
|
||||
minSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||
fairSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||
fairSharePreemptionThresholds = new HashMap<String, Float>();
|
||||
|
@ -184,7 +192,17 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
}
|
||||
return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Get the map of reservation ACLs to {@link AccessControlList} for the
|
||||
* specified queue.
|
||||
*/
|
||||
public Map<ReservationACL, AccessControlList> getReservationAcls(String
|
||||
queue) {
|
||||
return this.resAcls.get(queue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a queue's min share preemption timeout configured in the allocation
|
||||
* file, in milliseconds. Return -1 if not set.
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
|
@ -223,6 +224,8 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
new HashMap<String, Float>();
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
|
||||
new HashMap<String, Map<QueueACL, AccessControlList>>();
|
||||
Map<String, Map<ReservationACL, AccessControlList>> reservationAcls =
|
||||
new HashMap<String, Map<ReservationACL, AccessControlList>>();
|
||||
Set<String> reservableQueues = new HashSet<String>();
|
||||
Set<String> nonPreemptableQueues = new HashSet<String>();
|
||||
int userMaxAppsDefault = Integer.MAX_VALUE;
|
||||
|
@ -360,8 +363,8 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
loadQueue(parent, element, minQueueResources, maxQueueResources,
|
||||
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
|
||||
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
|
||||
fairSharePreemptionThresholds, queueAcls, configuredQueues,
|
||||
reservableQueues, nonPreemptableQueues);
|
||||
fairSharePreemptionThresholds, queueAcls, reservationAcls,
|
||||
configuredQueues, reservableQueues, nonPreemptableQueues);
|
||||
}
|
||||
|
||||
// Load placement policy and pass it configured queues
|
||||
|
@ -409,8 +412,8 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
|
||||
defaultSchedPolicy, minSharePreemptionTimeouts,
|
||||
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
|
||||
newPlacementPolicy, configuredQueues, globalReservationQueueConfig,
|
||||
reservableQueues, nonPreemptableQueues);
|
||||
reservationAcls, newPlacementPolicy, configuredQueues,
|
||||
globalReservationQueueConfig, reservableQueues, nonPreemptableQueues);
|
||||
|
||||
lastSuccessfulReload = clock.getTime();
|
||||
lastReloadAttemptFailed = false;
|
||||
|
@ -431,6 +434,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
Map<String, Long> fairSharePreemptionTimeouts,
|
||||
Map<String, Float> fairSharePreemptionThresholds,
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
||||
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
|
||||
Map<FSQueueType, Set<String>> configuredQueues,
|
||||
Set<String> reservableQueues,
|
||||
Set<String> nonPreemptableQueues)
|
||||
|
@ -453,6 +457,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
}
|
||||
Map<QueueACL, AccessControlList> acls =
|
||||
new HashMap<QueueACL, AccessControlList>();
|
||||
Map<ReservationACL, AccessControlList> racls = new HashMap<>();
|
||||
NodeList fields = element.getChildNodes();
|
||||
boolean isLeaf = true;
|
||||
|
||||
|
@ -506,6 +511,18 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
} else if ("aclAdministerApps".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData();
|
||||
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
|
||||
} else if ("aclAdministerReservations".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData();
|
||||
racls.put(ReservationACL.ADMINISTER_RESERVATIONS,
|
||||
new AccessControlList(text));
|
||||
} else if ("aclListReservations".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData();
|
||||
racls.put(ReservationACL.LIST_RESERVATIONS, new AccessControlList(
|
||||
text));
|
||||
} else if ("aclSubmitReservations".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData();
|
||||
racls.put(ReservationACL.SUBMIT_RESERVATIONS,
|
||||
new AccessControlList(text));
|
||||
} else if ("reservation".equals(field.getTagName())) {
|
||||
isLeaf = false;
|
||||
reservableQueues.add(queueName);
|
||||
|
@ -521,7 +538,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
|
||||
queuePolicies, minSharePreemptionTimeouts,
|
||||
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
|
||||
queueAcls, configuredQueues, reservableQueues,
|
||||
queueAcls, resAcls, configuredQueues, reservableQueues,
|
||||
nonPreemptableQueues);
|
||||
isLeaf = false;
|
||||
}
|
||||
|
@ -543,6 +560,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
configuredQueues.get(FSQueueType.PARENT).add(queueName);
|
||||
}
|
||||
queueAcls.put(queueName, acls);
|
||||
resAcls.put(queueName, racls);
|
||||
if (maxQueueResources.containsKey(queueName) &&
|
||||
minQueueResources.containsKey(queueName)
|
||||
&& !Resources.fitsIn(minQueueResources.get(queueName),
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The {@link ReservationsACLsManager} is used to check a specified user's
|
||||
* permissons to perform a reservation operation on the
|
||||
* {@link CapacityScheduler} and the {@link FairScheduler}.
|
||||
* {@link ReservationACL}s are used to specify reservation operations.
|
||||
*/
|
||||
public class ReservationsACLsManager {
|
||||
private boolean isReservationACLsEnable;
|
||||
private Map<String, Map<ReservationACL, AccessControlList>> reservationAcls
|
||||
= new HashMap<>();
|
||||
|
||||
public ReservationsACLsManager(ResourceScheduler scheduler,
|
||||
Configuration conf) throws YarnException {
|
||||
this.isReservationACLsEnable =
|
||||
conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE,
|
||||
YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) &&
|
||||
conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
|
||||
YarnConfiguration.DEFAULT_YARN_ACL_ENABLE);
|
||||
if (scheduler instanceof CapacityScheduler) {
|
||||
CapacitySchedulerConfiguration csConf = new
|
||||
CapacitySchedulerConfiguration(conf);
|
||||
|
||||
for (String planQueue : scheduler.getPlanQueues()) {
|
||||
CSQueue queue = ((CapacityScheduler) scheduler).getQueue(planQueue);
|
||||
reservationAcls.put(planQueue, csConf.getReservationAcls(queue
|
||||
.getQueuePath()));
|
||||
}
|
||||
} else if (scheduler instanceof FairScheduler) {
|
||||
AllocationConfiguration aConf = ((FairScheduler) scheduler)
|
||||
.getAllocationConfiguration();
|
||||
for (String planQueue : scheduler.getPlanQueues()) {
|
||||
reservationAcls.put(planQueue, aConf.getReservationAcls(planQueue));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean checkAccess(UserGroupInformation callerUGI,
|
||||
ReservationACL acl, String queueName) {
|
||||
if (!isReservationACLsEnable) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (this.reservationAcls.containsKey(queueName)) {
|
||||
Map<ReservationACL, AccessControlList> acls = this.reservationAcls.get(
|
||||
queueName);
|
||||
if (acls.containsKey(acl)) {
|
||||
return acls.get(acl).isUserAllowed(callerUGI);
|
||||
} else {
|
||||
// Give access if acl is undefined for queue.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.junit.Before;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public abstract class ACLsTestBase {
|
||||
|
||||
protected static final String COMMON_USER = "common_user";
|
||||
protected static final String QUEUE_A_USER = "queueA_user";
|
||||
protected static final String QUEUE_B_USER = "queueB_user";
|
||||
protected static final String ROOT_ADMIN = "root_admin";
|
||||
protected static final String QUEUE_A_ADMIN = "queueA_admin";
|
||||
protected static final String QUEUE_B_ADMIN = "queueB_admin";
|
||||
|
||||
protected static final String QUEUEA = "queueA";
|
||||
protected static final String QUEUEB = "queueB";
|
||||
protected static final String QUEUEC = "queueC";
|
||||
|
||||
protected static final Log LOG = LogFactory.getLog(TestApplicationACLs.class);
|
||||
|
||||
MockRM resourceManager;
|
||||
Configuration conf;
|
||||
YarnRPC rpc;
|
||||
InetSocketAddress rmAddress;
|
||||
|
||||
@Before
|
||||
public void setup() throws InterruptedException, IOException {
|
||||
conf = createConfiguration();
|
||||
rpc = YarnRPC.create(conf);
|
||||
rmAddress = conf.getSocketAddr(
|
||||
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_PORT);
|
||||
|
||||
AccessControlList adminACL = new AccessControlList("");
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString());
|
||||
|
||||
resourceManager = new MockRM(conf) {
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(getRMContext(), this.scheduler,
|
||||
this.rmAppManager, this.applicationACLsManager,
|
||||
this.queueACLsManager, getRMContext()
|
||||
.getRMDelegationTokenSecretManager());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return new DrainDispatcher();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
}
|
||||
};
|
||||
new Thread() {
|
||||
public void run() {
|
||||
resourceManager.start();
|
||||
};
|
||||
}.start();
|
||||
int waitCount = 0;
|
||||
while (resourceManager.getServiceState() == STATE.INITED
|
||||
&& waitCount++ < 60) {
|
||||
LOG.info("Waiting for RM to start...");
|
||||
Thread.sleep(1500);
|
||||
}
|
||||
if (resourceManager.getServiceState() != STATE.STARTED) {
|
||||
// RM could have failed.
|
||||
throw new IOException("ResourceManager failed to start. Final state is "
|
||||
+ resourceManager.getServiceState());
|
||||
}
|
||||
}
|
||||
|
||||
protected ApplicationClientProtocol getRMClientForUser(String user)
|
||||
throws IOException, InterruptedException {
|
||||
UserGroupInformation userUGI = UserGroupInformation.createRemoteUser(user);
|
||||
ApplicationClientProtocol userClient =
|
||||
userUGI
|
||||
.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
|
||||
@Override
|
||||
public ApplicationClientProtocol run() throws Exception {
|
||||
return (ApplicationClientProtocol) rpc.getProxy(
|
||||
ApplicationClientProtocol.class, rmAddress, conf);
|
||||
}
|
||||
});
|
||||
return userClient;
|
||||
}
|
||||
|
||||
protected abstract Configuration createConfiguration() throws IOException;
|
||||
}
|
|
@ -18,20 +18,12 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
|
@ -43,73 +35,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public abstract class QueueACLsTestBase {
|
||||
|
||||
protected static final String COMMON_USER = "common_user";
|
||||
protected static final String QUEUE_A_USER = "queueA_user";
|
||||
protected static final String QUEUE_B_USER = "queueB_user";
|
||||
protected static final String ROOT_ADMIN = "root_admin";
|
||||
protected static final String QUEUE_A_ADMIN = "queueA_admin";
|
||||
protected static final String QUEUE_B_ADMIN = "queueB_admin";
|
||||
|
||||
protected static final String QUEUEA = "queueA";
|
||||
protected static final String QUEUEB = "queueB";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestApplicationACLs.class);
|
||||
|
||||
MockRM resourceManager;
|
||||
Configuration conf;
|
||||
YarnRPC rpc;
|
||||
InetSocketAddress rmAddress;
|
||||
|
||||
@Before
|
||||
public void setup() throws InterruptedException, IOException {
|
||||
conf = createConfiguration();
|
||||
rpc = YarnRPC.create(conf);
|
||||
rmAddress = conf.getSocketAddr(
|
||||
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_PORT);
|
||||
|
||||
AccessControlList adminACL = new AccessControlList("");
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString());
|
||||
|
||||
resourceManager = new MockRM(conf) {
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(getRMContext(), this.scheduler,
|
||||
this.rmAppManager, this.applicationACLsManager,
|
||||
this.queueACLsManager, getRMContext().getRMDelegationTokenSecretManager());
|
||||
};
|
||||
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
}
|
||||
};
|
||||
new Thread() {
|
||||
public void run() {
|
||||
resourceManager.start();
|
||||
};
|
||||
}.start();
|
||||
int waitCount = 0;
|
||||
while (resourceManager.getServiceState() == STATE.INITED
|
||||
&& waitCount++ < 60) {
|
||||
LOG.info("Waiting for RM to start...");
|
||||
Thread.sleep(1500);
|
||||
}
|
||||
if (resourceManager.getServiceState() != STATE.STARTED) {
|
||||
// RM could have failed.
|
||||
throw new IOException("ResourceManager failed to start. Final state is "
|
||||
+ resourceManager.getServiceState());
|
||||
}
|
||||
}
|
||||
public abstract class QueueACLsTestBase extends ACLsTestBase {
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
|
@ -248,21 +180,4 @@ public abstract class QueueACLsTestBase {
|
|||
acls.put(ApplicationAccessType.MODIFY_APP, modifyACL.getAclString());
|
||||
return acls;
|
||||
}
|
||||
|
||||
private ApplicationClientProtocol getRMClientForUser(String user)
|
||||
throws IOException, InterruptedException {
|
||||
UserGroupInformation userUGI = UserGroupInformation.createRemoteUser(user);
|
||||
ApplicationClientProtocol userClient =
|
||||
userUGI
|
||||
.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
|
||||
@Override
|
||||
public ApplicationClientProtocol run() throws Exception {
|
||||
return (ApplicationClientProtocol) rpc.getProxy(
|
||||
ApplicationClientProtocol.class, rmAddress, conf);
|
||||
}
|
||||
});
|
||||
return userClient;
|
||||
}
|
||||
|
||||
protected abstract Configuration createConfiguration() throws IOException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,600 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationACL;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class ReservationACLsTestBase extends ACLsTestBase {
|
||||
|
||||
private final int defaultDuration = 600000;
|
||||
private final ReservationRequest defaultRequest = ReservationRequest
|
||||
.newInstance(BuilderUtils.newResource(1024, 1), 1, 1,
|
||||
defaultDuration);
|
||||
private final ReservationRequests defaultRequests = ReservationRequests
|
||||
.newInstance(Collections.singletonList(defaultRequest),
|
||||
ReservationRequestInterpreter.R_ALL);
|
||||
private Configuration configuration;
|
||||
private boolean useFullQueuePath;
|
||||
|
||||
public ReservationACLsTestBase(Configuration conf, boolean useFullPath) {
|
||||
configuration = conf;
|
||||
useFullQueuePath = useFullPath;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (resourceManager != null) {
|
||||
resourceManager.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> data() throws IOException {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{ createCapacitySchedulerConfiguration(), false },
|
||||
{ createFairSchedulerConfiguration(), true }
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testApplicationACLs() throws Exception {
|
||||
registerNode("test:1234", 8192, 8);
|
||||
String queueA = !useFullQueuePath? QUEUEA : CapacitySchedulerConfiguration
|
||||
.ROOT + "." + QUEUEA;
|
||||
String queueB = !useFullQueuePath? QUEUEB : CapacitySchedulerConfiguration
|
||||
.ROOT + "." + QUEUEB;
|
||||
String queueC = !useFullQueuePath? QUEUEC : CapacitySchedulerConfiguration
|
||||
.ROOT + "." + QUEUEC;
|
||||
|
||||
// Submit Reservations
|
||||
|
||||
// Users of queue A can submit reservations on QueueA.
|
||||
verifySubmitReservationSuccess(QUEUE_A_USER, queueA);
|
||||
verifySubmitReservationSuccess(QUEUE_A_ADMIN, queueA);
|
||||
|
||||
// Users of queue B cannot submit reservations on QueueA.
|
||||
verifySubmitReservationFailure(QUEUE_B_USER, queueA);
|
||||
verifySubmitReservationFailure(QUEUE_B_ADMIN, queueA);
|
||||
|
||||
// Users of queue B can submit reservations on QueueB.
|
||||
verifySubmitReservationSuccess(QUEUE_B_USER, queueB);
|
||||
verifySubmitReservationSuccess(QUEUE_B_ADMIN, queueB);
|
||||
|
||||
// Users of queue A cannot submit reservations on QueueB.
|
||||
verifySubmitReservationFailure(QUEUE_A_USER, queueB);
|
||||
verifySubmitReservationFailure(QUEUE_A_ADMIN, queueB);
|
||||
|
||||
// Everyone can submit reservations on QueueC.
|
||||
verifySubmitReservationSuccess(QUEUE_B_USER, queueC);
|
||||
verifySubmitReservationSuccess(QUEUE_B_ADMIN, queueC);
|
||||
verifySubmitReservationSuccess(QUEUE_A_USER, queueC);
|
||||
verifySubmitReservationSuccess(QUEUE_A_ADMIN, queueC);
|
||||
verifySubmitReservationSuccess(COMMON_USER, queueC);
|
||||
|
||||
// List Reservations
|
||||
|
||||
// User with List Reservations, or Admin ACL can list everyone's
|
||||
// reservations.
|
||||
verifyListReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueA);
|
||||
verifyListReservationSuccess(COMMON_USER, QUEUE_A_ADMIN, queueA);
|
||||
verifyListReservationSuccess(COMMON_USER, QUEUE_A_USER, queueA);
|
||||
|
||||
// User without Admin or Reservation ACL can only list their own
|
||||
// reservations by id.
|
||||
verifyListReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_ADMIN, queueA);
|
||||
verifyListReservationFailure(QUEUE_A_USER, QUEUE_A_USER, queueA);
|
||||
verifyListReservationFailure(QUEUE_A_USER, QUEUE_A_ADMIN, queueA);
|
||||
verifyListReservationByIdSuccess(QUEUE_A_USER, QUEUE_A_USER, queueA);
|
||||
verifyListReservationByIdFailure(QUEUE_A_USER, QUEUE_A_ADMIN, queueA);
|
||||
|
||||
// User with List Reservations, or Admin ACL can list everyone's
|
||||
// reservations.
|
||||
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_USER, queueB);
|
||||
verifyListReservationSuccess(COMMON_USER, QUEUE_B_ADMIN, queueB);
|
||||
verifyListReservationSuccess(COMMON_USER, QUEUE_B_USER, queueB);
|
||||
|
||||
// User without Admin or Reservation ACL can only list their own
|
||||
// reservations by id.
|
||||
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_ADMIN, queueB);
|
||||
verifyListReservationFailure(QUEUE_B_USER, QUEUE_B_USER, queueB);
|
||||
verifyListReservationFailure(QUEUE_B_USER, QUEUE_B_ADMIN, queueB);
|
||||
verifyListReservationByIdSuccess(QUEUE_B_USER, QUEUE_B_USER, queueB);
|
||||
verifyListReservationByIdFailure(QUEUE_B_USER, QUEUE_B_ADMIN, queueB);
|
||||
|
||||
// Users with Admin ACL in one queue cannot list reservations in
|
||||
// another queue
|
||||
verifyListReservationFailure(QUEUE_B_ADMIN, QUEUE_A_ADMIN, queueA);
|
||||
verifyListReservationFailure(QUEUE_B_ADMIN, QUEUE_A_USER, queueA);
|
||||
verifyListReservationFailure(QUEUE_A_ADMIN, QUEUE_B_ADMIN, queueB);
|
||||
verifyListReservationFailure(QUEUE_A_ADMIN, QUEUE_B_USER, queueB);
|
||||
|
||||
// All users can list reservations on QueueC because acls are enabled
|
||||
// but not defined.
|
||||
verifyListReservationSuccess(QUEUE_A_USER, QUEUE_A_ADMIN, queueC);
|
||||
verifyListReservationSuccess(QUEUE_B_USER, QUEUE_A_ADMIN, queueC);
|
||||
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_A_ADMIN, queueC);
|
||||
verifyListReservationSuccess(COMMON_USER, QUEUE_A_ADMIN, queueC);
|
||||
verifyListReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueC);
|
||||
verifyListReservationSuccess(QUEUE_B_USER, QUEUE_A_USER, queueC);
|
||||
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_A_USER, queueC);
|
||||
verifyListReservationSuccess(COMMON_USER, QUEUE_A_USER, queueC);
|
||||
verifyListReservationByIdSuccess(QUEUE_A_USER, QUEUE_A_ADMIN, queueC);
|
||||
verifyListReservationByIdSuccess(QUEUE_B_USER, QUEUE_A_ADMIN, queueC);
|
||||
verifyListReservationByIdSuccess(QUEUE_B_ADMIN, QUEUE_A_ADMIN, queueC);
|
||||
verifyListReservationByIdSuccess(COMMON_USER, QUEUE_A_ADMIN, queueC);
|
||||
verifyListReservationByIdSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueC);
|
||||
verifyListReservationByIdSuccess(QUEUE_B_USER, QUEUE_A_USER, queueC);
|
||||
verifyListReservationByIdSuccess(QUEUE_B_ADMIN, QUEUE_A_USER, queueC);
|
||||
verifyListReservationByIdSuccess(COMMON_USER, QUEUE_A_USER, queueC);
|
||||
|
||||
// Delete Reservations
|
||||
|
||||
// Only the user who made the reservation or an admin can delete it.
|
||||
verifyDeleteReservationSuccess(QUEUE_A_USER, QUEUE_A_USER, queueA);
|
||||
verifyDeleteReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueA);
|
||||
|
||||
// A non-admin cannot delete another user's reservation.
|
||||
verifyDeleteReservationFailure(COMMON_USER, QUEUE_A_USER, queueA);
|
||||
verifyDeleteReservationFailure(QUEUE_B_USER, QUEUE_A_USER, queueA);
|
||||
verifyDeleteReservationFailure(QUEUE_B_ADMIN, QUEUE_A_USER, queueA);
|
||||
|
||||
// Only the user who made the reservation or an admin can delete it.
|
||||
verifyDeleteReservationSuccess(QUEUE_B_USER, QUEUE_B_USER, queueB);
|
||||
verifyDeleteReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_USER, queueB);
|
||||
|
||||
// A non-admin cannot delete another user's reservation.
|
||||
verifyDeleteReservationFailure(COMMON_USER, QUEUE_B_USER, queueB);
|
||||
verifyDeleteReservationFailure(QUEUE_A_USER, QUEUE_B_USER, queueB);
|
||||
verifyDeleteReservationFailure(QUEUE_A_ADMIN, QUEUE_B_USER, queueB);
|
||||
|
||||
// All users can delete any reservation on QueueC because acls are enabled
|
||||
// but not defined.
|
||||
verifyDeleteReservationSuccess(COMMON_USER, QUEUE_B_ADMIN, queueC);
|
||||
verifyDeleteReservationSuccess(QUEUE_B_USER, QUEUE_B_ADMIN, queueC);
|
||||
verifyDeleteReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_ADMIN, queueC);
|
||||
verifyDeleteReservationSuccess(QUEUE_A_USER, QUEUE_B_ADMIN, queueC);
|
||||
verifyDeleteReservationSuccess(QUEUE_A_ADMIN, QUEUE_B_ADMIN, queueC);
|
||||
|
||||
// Update Reservation
|
||||
|
||||
// Only the user who made the reservation or an admin can update it.
|
||||
verifyUpdateReservationSuccess(QUEUE_A_USER, QUEUE_A_USER, queueA);
|
||||
verifyUpdateReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueA);
|
||||
|
||||
// A non-admin cannot update another user's reservation.
|
||||
verifyUpdateReservationFailure(COMMON_USER, QUEUE_A_USER, queueA);
|
||||
verifyUpdateReservationFailure(QUEUE_B_USER,QUEUE_A_USER, queueA);
|
||||
verifyUpdateReservationFailure(QUEUE_B_ADMIN, QUEUE_A_USER, queueA);
|
||||
|
||||
// Only the user who made the reservation or an admin can update it.
|
||||
verifyUpdateReservationSuccess(QUEUE_B_USER, QUEUE_B_USER, queueB);
|
||||
verifyUpdateReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_USER, queueB);
|
||||
|
||||
// A non-admin cannot update another user's reservation.
|
||||
verifyUpdateReservationFailure(COMMON_USER, QUEUE_B_USER, queueB);
|
||||
verifyUpdateReservationFailure(QUEUE_A_USER, QUEUE_B_USER, queueB);
|
||||
verifyUpdateReservationFailure(QUEUE_A_ADMIN, QUEUE_B_USER, queueB);
|
||||
|
||||
// All users can update any reservation on QueueC because acls are enabled
|
||||
// but not defined.
|
||||
verifyUpdateReservationSuccess(COMMON_USER, QUEUE_B_ADMIN, queueC);
|
||||
verifyUpdateReservationSuccess(QUEUE_B_USER, QUEUE_B_ADMIN, queueC);
|
||||
verifyUpdateReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_ADMIN, queueC);
|
||||
verifyUpdateReservationSuccess(QUEUE_A_USER, QUEUE_B_ADMIN, queueC);
|
||||
verifyUpdateReservationSuccess(QUEUE_A_ADMIN, QUEUE_B_ADMIN, queueC);
|
||||
}
|
||||
|
||||
private void verifySubmitReservationSuccess(String submitter, String
|
||||
queueName) throws Exception {
|
||||
ReservationId reservationId =
|
||||
submitReservation(submitter, queueName);
|
||||
|
||||
deleteReservation(submitter, reservationId);
|
||||
}
|
||||
|
||||
private void verifySubmitReservationFailure(String submitter, String
|
||||
queueName) throws Exception {
|
||||
try {
|
||||
submitReservation(submitter, queueName);
|
||||
Assert.fail("Submit reservation by the enemy should fail!");
|
||||
} catch (YarnException e) {
|
||||
handleAdministerException(e, submitter, queueName, ReservationACL
|
||||
.SUBMIT_RESERVATIONS.name());
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyListReservationSuccess(String lister, String
|
||||
originalSubmitter, String queueName) throws Exception {
|
||||
ReservationId reservationId =
|
||||
submitReservation(originalSubmitter, queueName);
|
||||
|
||||
ReservationListResponse adminResponse = listReservation(lister, queueName);
|
||||
|
||||
assert(adminResponse.getReservationAllocationState().size() == 1);
|
||||
assert(adminResponse.getReservationAllocationState().get(0).getUser()
|
||||
.equals(originalSubmitter));
|
||||
|
||||
deleteReservation(originalSubmitter, reservationId);
|
||||
}
|
||||
|
||||
private void verifyListReservationFailure(String lister,
|
||||
String originalSubmitter, String queueName) throws Exception {
|
||||
ReservationId reservationId =
|
||||
submitReservation(originalSubmitter, queueName);
|
||||
|
||||
try {
|
||||
listReservation(lister, queueName);
|
||||
Assert.fail("List reservation by the enemy should fail!");
|
||||
} catch (YarnException e) {
|
||||
handleAdministerException(e, lister, queueName, ReservationACL
|
||||
.LIST_RESERVATIONS.name());
|
||||
}
|
||||
|
||||
deleteReservation(originalSubmitter, reservationId);
|
||||
}
|
||||
|
||||
private void verifyListReservationByIdSuccess(String lister, String
|
||||
originalSubmitter, String queueName) throws Exception {
|
||||
ReservationId reservationId =
|
||||
submitReservation(originalSubmitter, queueName);
|
||||
|
||||
ReservationListResponse adminResponse = listReservationById(lister,
|
||||
reservationId, queueName);
|
||||
|
||||
assert(adminResponse.getReservationAllocationState().size() == 1);
|
||||
assert(adminResponse.getReservationAllocationState().get(0).getUser()
|
||||
.equals(originalSubmitter));
|
||||
|
||||
deleteReservation(originalSubmitter, reservationId);
|
||||
}
|
||||
|
||||
private void verifyListReservationByIdFailure(String lister,
|
||||
String originalSubmitter, String queueName) throws Exception {
|
||||
ReservationId reservationId =
|
||||
submitReservation(originalSubmitter, queueName);
|
||||
try {
|
||||
listReservationById(lister, reservationId, queueName);
|
||||
Assert.fail("List reservation by the enemy should fail!");
|
||||
} catch (YarnException e) {
|
||||
handleAdministerException(e, lister, queueName, ReservationACL
|
||||
.LIST_RESERVATIONS.name());
|
||||
}
|
||||
|
||||
deleteReservation(originalSubmitter, reservationId);
|
||||
}
|
||||
|
||||
private void verifyDeleteReservationSuccess(String killer,
|
||||
String originalSubmitter, String queueName) throws Exception {
|
||||
ReservationId reservationId =
|
||||
submitReservation(originalSubmitter, queueName);
|
||||
|
||||
deleteReservation(killer, reservationId);
|
||||
}
|
||||
|
||||
private void verifyDeleteReservationFailure(String killer,
|
||||
String originalSubmitter, String queueName) throws Exception {
|
||||
|
||||
ReservationId reservationId =
|
||||
submitReservation(originalSubmitter, queueName);
|
||||
|
||||
try {
|
||||
deleteReservation(killer, reservationId);
|
||||
Assert.fail("Reservation deletion by the enemy should fail!");
|
||||
} catch (YarnException e) {
|
||||
handleAdministerException(e, killer, queueName, ReservationACL
|
||||
.ADMINISTER_RESERVATIONS.name());
|
||||
}
|
||||
|
||||
deleteReservation(originalSubmitter, reservationId);
|
||||
}
|
||||
|
||||
private void verifyUpdateReservationSuccess(String updater,
|
||||
String originalSubmitter, String queueName) throws Exception {
|
||||
ReservationId reservationId =
|
||||
submitReservation(originalSubmitter, queueName);
|
||||
|
||||
final ReservationUpdateRequest updateRequest =
|
||||
ReservationUpdateRequest.newInstance(
|
||||
makeSimpleReservationDefinition(), reservationId);
|
||||
|
||||
ApplicationClientProtocol ownerClient = getRMClientForUser(updater);
|
||||
|
||||
ownerClient.updateReservation(updateRequest);
|
||||
|
||||
deleteReservation(updater, reservationId);
|
||||
}
|
||||
|
||||
private void verifyUpdateReservationFailure(String updater,
|
||||
String originalSubmitter, String queueName) throws Exception {
|
||||
ReservationId reservationId =
|
||||
submitReservation(originalSubmitter, queueName);
|
||||
|
||||
final ReservationUpdateRequest updateRequest =
|
||||
ReservationUpdateRequest.newInstance(
|
||||
makeSimpleReservationDefinition(), reservationId);
|
||||
|
||||
ApplicationClientProtocol unauthorizedClient = getRMClientForUser(updater);
|
||||
try {
|
||||
unauthorizedClient.updateReservation(updateRequest);
|
||||
Assert.fail("Reservation updating by the enemy should fail.");
|
||||
} catch (YarnException e) {
|
||||
handleAdministerException(e, updater, queueName, ReservationACL
|
||||
.ADMINISTER_RESERVATIONS.name());
|
||||
}
|
||||
|
||||
deleteReservation(originalSubmitter, reservationId);
|
||||
}
|
||||
|
||||
private ReservationDefinition makeSimpleReservationDefinition() {
|
||||
long arrival = System.currentTimeMillis();
|
||||
|
||||
String reservationName = UUID.randomUUID().toString();
|
||||
return ReservationDefinition.newInstance
|
||||
(arrival, arrival + (int)(defaultDuration * 1.1), defaultRequests,
|
||||
reservationName);
|
||||
}
|
||||
|
||||
private ReservationListResponse listReservationById(String lister,
|
||||
ReservationId reservationId, String queueName) throws Exception {
|
||||
final ReservationListRequest listRequest =
|
||||
ReservationListRequest.newInstance(queueName, reservationId
|
||||
.toString(), -1, -1, false);
|
||||
|
||||
ApplicationClientProtocol ownerClient = getRMClientForUser(lister);
|
||||
|
||||
return ownerClient.listReservations(listRequest);
|
||||
}
|
||||
|
||||
private ReservationListResponse listReservation(String lister,
|
||||
String queueName) throws Exception {
|
||||
final ReservationListRequest listRequest =
|
||||
ReservationListRequest.newInstance(queueName, null, -1, -1, false);
|
||||
|
||||
ApplicationClientProtocol ownerClient = getRMClientForUser(lister);
|
||||
|
||||
return ownerClient.listReservations(listRequest);
|
||||
}
|
||||
|
||||
private void deleteReservation(String deleter, ReservationId id) throws
|
||||
Exception {
|
||||
|
||||
ApplicationClientProtocol deleteClient = getRMClientForUser(deleter);
|
||||
|
||||
final ReservationDeleteRequest deleteRequest = ReservationDeleteRequest
|
||||
.newInstance(id);
|
||||
|
||||
deleteClient.deleteReservation(deleteRequest);
|
||||
}
|
||||
|
||||
private ReservationId submitReservation(String submitter,
|
||||
String queueName) throws Exception {
|
||||
|
||||
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
|
||||
ReservationSubmissionRequest reservationSubmissionRequest =
|
||||
ReservationSubmissionRequest.newInstance(
|
||||
makeSimpleReservationDefinition(), queueName);
|
||||
|
||||
ReservationSubmissionResponse response = submitterClient
|
||||
.submitReservation(reservationSubmissionRequest);
|
||||
return response.getReservationId();
|
||||
}
|
||||
|
||||
private void handleAdministerException(Exception e, String user, String
|
||||
queue, String operation) {
|
||||
LOG.info("Got exception while killing app as the enemy", e);
|
||||
Assert.assertTrue(e.getMessage().contains("User " + user
|
||||
+ " cannot perform operation " + operation + " on queue "
|
||||
+ queue));
|
||||
}
|
||||
|
||||
private void registerNode(String host, int memory, int vCores) throws
|
||||
Exception {
|
||||
try {
|
||||
resourceManager.registerNode(host, memory, vCores);
|
||||
int attempts = 10;
|
||||
Collection<Plan> plans;
|
||||
do {
|
||||
DrainDispatcher dispatcher =
|
||||
(DrainDispatcher) resourceManager.getRMContext().getDispatcher();
|
||||
dispatcher.await();
|
||||
LOG.info("Waiting for node capacity to be added to plan");
|
||||
plans = resourceManager.getRMContext().getReservationSystem()
|
||||
.getAllPlans().values();
|
||||
|
||||
if (checkCapacity(plans)) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
} while (attempts-- > 0);
|
||||
if (attempts <= 0) {
|
||||
Assert.fail("Exhausted attempts in checking if node capacity was "
|
||||
+ "added to the plan");
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkCapacity(Collection<Plan> plans) {
|
||||
for (Plan plan : plans) {
|
||||
if (plan.getTotalCapacity().getMemory() > 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static Configuration createCapacitySchedulerConfiguration() {
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
|
||||
QUEUEA, QUEUEB, QUEUEC });
|
||||
|
||||
String absoluteQueueA = CapacitySchedulerConfiguration.ROOT + "." + QUEUEA;
|
||||
String absoluteQueueB = CapacitySchedulerConfiguration.ROOT + "." + QUEUEB;
|
||||
String absoluteQueueC = CapacitySchedulerConfiguration.ROOT + "." + QUEUEC;
|
||||
|
||||
csConf.setCapacity(absoluteQueueA, 50f);
|
||||
csConf.setCapacity(absoluteQueueB, 20f);
|
||||
csConf.setCapacity(absoluteQueueC, 30f);
|
||||
csConf.setReservable(absoluteQueueA, true);
|
||||
csConf.setReservable(absoluteQueueB, true);
|
||||
csConf.setReservable(absoluteQueueC, true);
|
||||
|
||||
// Set up ACLs on Queue A
|
||||
Map<ReservationACL, AccessControlList> reservationAclsOnQueueA =
|
||||
new HashMap<>();
|
||||
|
||||
AccessControlList submitACLonQueueA = new AccessControlList(QUEUE_A_USER);
|
||||
AccessControlList adminACLonQueueA = new AccessControlList(QUEUE_A_ADMIN);
|
||||
AccessControlList listACLonQueueA = new AccessControlList(COMMON_USER);
|
||||
|
||||
reservationAclsOnQueueA.put(ReservationACL.SUBMIT_RESERVATIONS,
|
||||
submitACLonQueueA);
|
||||
reservationAclsOnQueueA.put(ReservationACL.ADMINISTER_RESERVATIONS,
|
||||
adminACLonQueueA);
|
||||
reservationAclsOnQueueA.put(ReservationACL.LIST_RESERVATIONS,
|
||||
listACLonQueueA);
|
||||
|
||||
csConf.setReservationAcls(absoluteQueueA, reservationAclsOnQueueA);
|
||||
|
||||
// Set up ACLs on Queue B
|
||||
Map<ReservationACL, AccessControlList> reservationAclsOnQueueB =
|
||||
new HashMap<>();
|
||||
|
||||
AccessControlList submitACLonQueueB = new AccessControlList(QUEUE_B_USER);
|
||||
AccessControlList adminACLonQueueB = new AccessControlList(QUEUE_B_ADMIN);
|
||||
AccessControlList listACLonQueueB = new AccessControlList(COMMON_USER);
|
||||
|
||||
reservationAclsOnQueueB.put(ReservationACL.SUBMIT_RESERVATIONS,
|
||||
submitACLonQueueB);
|
||||
reservationAclsOnQueueB.put(ReservationACL.ADMINISTER_RESERVATIONS,
|
||||
adminACLonQueueB);
|
||||
reservationAclsOnQueueB.put(ReservationACL.LIST_RESERVATIONS,
|
||||
listACLonQueueB);
|
||||
|
||||
csConf.setReservationAcls(absoluteQueueB, reservationAclsOnQueueB);
|
||||
|
||||
csConf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
|
||||
csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
csConf.setBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE, true);
|
||||
csConf.set("yarn.resourcemanager.scheduler.class", CapacityScheduler
|
||||
.class.getName());
|
||||
|
||||
return csConf;
|
||||
}
|
||||
|
||||
private static Configuration createFairSchedulerConfiguration() throws
|
||||
IOException {
|
||||
FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
|
||||
|
||||
final String TEST_DIR = new File(System.getProperty("test.build.data",
|
||||
"/tmp")).getAbsolutePath();
|
||||
final String ALLOC_FILE = new File(TEST_DIR, "test-queues.xml")
|
||||
.getAbsolutePath();
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println(" <queue name=\"queueA\">");
|
||||
out.println(" <aclSubmitReservations>" +
|
||||
"queueA_user,common_user " +
|
||||
"</aclSubmitReservations>");
|
||||
out.println(" <aclAdministerReservations>" +
|
||||
"queueA_admin " +
|
||||
"</aclAdministerReservations>");
|
||||
out.println(" <aclListReservations>common_user </aclListReservations>");
|
||||
out.println(" <aclSubmitApps>queueA_user,common_user </aclSubmitApps>");
|
||||
out.println(" <aclAdministerApps>queueA_admin </aclAdministerApps>");
|
||||
out.println(" <reservation> </reservation>");
|
||||
out.println(" </queue>");
|
||||
out.println(" <queue name=\"queueB\">");
|
||||
out.println(" <aclSubmitApps>queueB_user,common_user </aclSubmitApps>");
|
||||
out.println(" <aclAdministerApps>queueB_admin </aclAdministerApps>");
|
||||
out.println(" <aclSubmitReservations>" +
|
||||
"queueB_user,common_user " +
|
||||
"</aclSubmitReservations>");
|
||||
out.println(" <aclAdministerReservations>" +
|
||||
"queueB_admin " +
|
||||
"</aclAdministerReservations>");
|
||||
out.println(" <aclListReservations>common_user </aclListReservations>");
|
||||
out.println(" <reservation> </reservation>");
|
||||
out.println(" </queue>");
|
||||
out.println(" <queue name=\"queueC\">");
|
||||
out.println(" <reservation> </reservation>");
|
||||
out.println(" </queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
fsConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
|
||||
fsConf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
|
||||
fsConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
fsConf.setBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE, true);
|
||||
fsConf.set("yarn.resourcemanager.scheduler.class", FairScheduler.class
|
||||
.getName());
|
||||
|
||||
return fsConf;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return configuration;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue