YARN-8028. Support authorizeUserAccessToQueue in RMWebServices. Contributed by Wangda Tan.
This commit is contained in:
parent
c962371430
commit
21717db6a0
|
@ -114,7 +114,6 @@ public class QueueACLsManager {
|
|||
// version is added for the moving the application case. The check has
|
||||
// extra logging to distinguish between the queue not existing in the
|
||||
// application move request case and the real access denied case.
|
||||
|
||||
if (scheduler instanceof CapacityScheduler) {
|
||||
CSQueue queue = ((CapacityScheduler) scheduler).getQueue(targetQueue);
|
||||
if (queue == null) {
|
||||
|
|
|
@ -174,6 +174,12 @@ public final class RMWSConsts {
|
|||
public static final String GET_CONTAINER =
|
||||
"/apps/{appid}/appattempts/{appattemptid}/containers/{containerid}";
|
||||
|
||||
/**
|
||||
* Path for {code checkUserAccessToQueue#}
|
||||
*/
|
||||
public static final String CHECK_USER_ACCESS_TO_QUEUE =
|
||||
"/queues/{queue}/access";
|
||||
|
||||
// ----------------QueryParams for RMWebServiceProtocol----------------
|
||||
|
||||
public static final String TIME = "time";
|
||||
|
@ -183,6 +189,7 @@ public final class RMWSConsts {
|
|||
public static final String FINAL_STATUS = "finalStatus";
|
||||
public static final String USER = "user";
|
||||
public static final String QUEUE = "queue";
|
||||
public static final String QUEUES = "queues";
|
||||
public static final String LIMIT = "limit";
|
||||
public static final String STARTED_TIME_BEGIN = "startedTimeBegin";
|
||||
public static final String STARTED_TIME_END = "startedTimeEnd";
|
||||
|
@ -209,6 +216,7 @@ public final class RMWSConsts {
|
|||
public static final String GET_LABELS = "get-labels";
|
||||
public static final String DESELECTS = "deSelects";
|
||||
public static final String CONTAINERS = "containers";
|
||||
public static final String QUEUE_ACL_TYPE = "queue-acl-type";
|
||||
|
||||
private RMWSConsts() {
|
||||
// not called
|
||||
|
|
|
@ -658,4 +658,22 @@ public interface RMWebServiceProtocol {
|
|||
* @return all the attempts info for a specific application
|
||||
*/
|
||||
AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId);
|
||||
|
||||
/**
|
||||
* This method verifies if an user has access to a specified queue.
|
||||
*
|
||||
* @return Response containing the status code.
|
||||
*
|
||||
* @param queue queue
|
||||
* @param username user
|
||||
* @param queueAclType acl type of queue, it could be
|
||||
* SUBMIT_APPLICATIONS/ADMINISTER_QUEUE
|
||||
* @param hsr request
|
||||
*
|
||||
* @throws AuthorizationException if the user is not authorized to invoke this
|
||||
* method.
|
||||
*/
|
||||
Response checkUserAccessToQueue(String queue, String username,
|
||||
String queueAclType, HttpServletRequest hsr)
|
||||
throws AuthorizationException;
|
||||
}
|
||||
|
|
|
@ -55,8 +55,7 @@ import javax.ws.rs.core.HttpHeaders;
|
|||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -140,7 +139,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurat
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
|
@ -2520,4 +2518,55 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
|||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path(RMWSConsts.CHECK_USER_ACCESS_TO_QUEUE)
|
||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||
public Response checkUserAccessToQueue(
|
||||
@PathParam(RMWSConsts.QUEUE) String queue,
|
||||
@QueryParam(RMWSConsts.USER) String username,
|
||||
@QueryParam(RMWSConsts.QUEUE_ACL_TYPE)
|
||||
@DefaultValue("SUBMIT_APPLICATIONS") String queueAclType,
|
||||
@Context HttpServletRequest hsr) throws AuthorizationException {
|
||||
init();
|
||||
|
||||
// Check if the specified queue acl is valid.
|
||||
QueueACL queueACL;
|
||||
try {
|
||||
queueACL = QueueACL.valueOf(queueAclType);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return Response.status(Status.BAD_REQUEST).entity(
|
||||
"Specified queueAclType=" + queueAclType
|
||||
+ " is not a valid type, valid queue acl types={"
|
||||
+ "SUBMIT_APPLICATIONS/ADMINISTER_QUEUE}").build();
|
||||
}
|
||||
|
||||
// For the user who invokes this REST call, he/she should have admin access
|
||||
// to the queue. Otherwise we will reject the call.
|
||||
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||
if (callerUGI != null && !this.rm.getResourceScheduler().checkAccess(
|
||||
callerUGI, QueueACL.ADMINISTER_QUEUE, queue)) {
|
||||
return Response.status(Status.FORBIDDEN).entity(
|
||||
"User=" + callerUGI.getUserName() + " doesn't haven access to queue="
|
||||
+ queue + " so it cannot check ACLs for other users.")
|
||||
.build();
|
||||
}
|
||||
|
||||
// Create UGI for the to-be-checked user.
|
||||
UserGroupInformation user = UserGroupInformation.createRemoteUser(username);
|
||||
if (user == null) {
|
||||
return Response.status(Status.FORBIDDEN).entity(
|
||||
"Failed to retrieve UserGroupInformation for user=" + username)
|
||||
.build();
|
||||
}
|
||||
|
||||
if (!this.rm.getResourceScheduler().checkAccess(user, queueACL, queue)) {
|
||||
return Response.status(Status.FORBIDDEN).entity(
|
||||
"User=" + username + " doesn't have access to queue=" + queue
|
||||
+ " with acl-type=" + queueAclType).build();
|
||||
}
|
||||
|
||||
return Response.status(Status.OK).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,15 +39,18 @@ import javax.ws.rs.core.MediaType;
|
|||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.JettyUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||
|
@ -72,6 +75,8 @@ import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
|||
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.eclipse.jetty.server.Response;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -752,4 +757,83 @@ public class TestRMWebServices extends JerseyTestBase {
|
|||
tickcount--;
|
||||
}
|
||||
}
|
||||
|
||||
private HttpServletRequest mockHttpServletRequestByUserName(String username) {
|
||||
HttpServletRequest mockHsr = mock(HttpServletRequest.class);
|
||||
when(mockHsr.getRemoteUser()).thenReturn(username);
|
||||
Principal principal = mock(Principal.class);
|
||||
when(principal.getName()).thenReturn(username);
|
||||
when(mockHsr.getUserPrincipal()).thenReturn(principal);
|
||||
return mockHsr;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckUserAccessToQueue() throws Exception {
|
||||
|
||||
ResourceManager mockRM = mock(ResourceManager.class);
|
||||
Configuration conf = new YarnConfiguration();
|
||||
|
||||
// Inject a mock scheduler implementation.
|
||||
// Only admin user has ADMINISTER_QUEUE access.
|
||||
// For SUBMIT_APPLICATION ACL, both of admin/yarn user have acess
|
||||
ResourceScheduler mockScheduler = new FifoScheduler() {
|
||||
@Override
|
||||
public synchronized boolean checkAccess(UserGroupInformation callerUGI,
|
||||
QueueACL acl, String queueName) {
|
||||
if (acl == QueueACL.ADMINISTER_QUEUE) {
|
||||
if (callerUGI.getUserName().equals("admin")) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (ImmutableSet.of("admin", "yarn").contains(callerUGI.getUserName())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
when(mockRM.getResourceScheduler()).thenReturn(mockScheduler);
|
||||
|
||||
RMWebServices webSvc =
|
||||
new RMWebServices(mockRM, conf, mock(HttpServletResponse.class));
|
||||
|
||||
// Case 1: Only queue admin user can access other user's information
|
||||
HttpServletRequest mockHsr = mockHttpServletRequestByUserName("non-admin");
|
||||
Assert.assertEquals(webSvc.checkUserAccessToQueue("queue", "jack",
|
||||
QueueACL.SUBMIT_APPLICATIONS.name(), mockHsr).getStatus(),
|
||||
Response.SC_FORBIDDEN);
|
||||
|
||||
// Case 2: request an unknown ACL causes BAD_REQUEST
|
||||
mockHsr = mockHttpServletRequestByUserName("admin");
|
||||
Assert.assertEquals(webSvc.checkUserAccessToQueue("queue", "jack",
|
||||
"XYZ_ACL", mockHsr).getStatus(), Response.SC_BAD_REQUEST);
|
||||
|
||||
// Case 3: get FORBIDDEN for rejected ACL
|
||||
mockHsr = mockHttpServletRequestByUserName("admin");
|
||||
Assert.assertEquals(webSvc.checkUserAccessToQueue("queue", "jack",
|
||||
QueueACL.SUBMIT_APPLICATIONS.name(), mockHsr).getStatus(),
|
||||
Response.SC_FORBIDDEN);
|
||||
Assert.assertEquals(webSvc.checkUserAccessToQueue("queue", "jack",
|
||||
QueueACL.ADMINISTER_QUEUE.name(), mockHsr).getStatus(),
|
||||
Response.SC_FORBIDDEN);
|
||||
|
||||
// Case 4: get OK for listed ACLs
|
||||
mockHsr = mockHttpServletRequestByUserName("admin");
|
||||
Assert.assertEquals(webSvc.checkUserAccessToQueue("queue", "admin",
|
||||
QueueACL.SUBMIT_APPLICATIONS.name(), mockHsr).getStatus(),
|
||||
Response.SC_OK);
|
||||
Assert.assertEquals(webSvc.checkUserAccessToQueue("queue", "admin",
|
||||
QueueACL.ADMINISTER_QUEUE.name(), mockHsr).getStatus(),
|
||||
Response.SC_OK);
|
||||
|
||||
// Case 5: get OK only for SUBMIT_APP acl for "yarn" user
|
||||
mockHsr = mockHttpServletRequestByUserName("admin");
|
||||
Assert.assertEquals(webSvc.checkUserAccessToQueue("queue", "yarn",
|
||||
QueueACL.SUBMIT_APPLICATIONS.name(), mockHsr).getStatus(),
|
||||
Response.SC_OK);
|
||||
Assert.assertEquals(webSvc.checkUserAccessToQueue("queue", "yarn",
|
||||
QueueACL.ADMINISTER_QUEUE.name(), mockHsr).getStatus(),
|
||||
Response.SC_FORBIDDEN);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -470,6 +470,15 @@ public class DefaultRequestInterceptorREST
|
|||
null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response checkUserAccessToQueue(String queue, String username,
|
||||
String queueAclType, HttpServletRequest hsr) {
|
||||
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
|
||||
Response.class, HTTPMethods.GET,
|
||||
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.QUEUES + "/" + queue
|
||||
+ "/access", null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
|
||||
HttpServletResponse res, String appId, String appAttemptId) {
|
||||
|
|
|
@ -1183,6 +1183,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response checkUserAccessToQueue(String queue, String username,
|
||||
String queueAclType, HttpServletRequest hsr) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
|
||||
HttpServletResponse res, String appId, String appAttemptId) {
|
||||
|
|
|
@ -832,6 +832,23 @@ public class RouterWebServices implements RMWebServiceProtocol {
|
|||
return pipeline.getRootInterceptor().getAppAttempts(hsr, appId);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path(RMWSConsts.CHECK_USER_ACCESS_TO_QUEUE)
|
||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||
@Override
|
||||
public Response checkUserAccessToQueue(
|
||||
@PathParam(RMWSConsts.QUEUE) String queue,
|
||||
@QueryParam(RMWSConsts.USER) String username,
|
||||
@QueryParam(RMWSConsts.QUEUE_ACL_TYPE)
|
||||
@DefaultValue("SUBMIT_APPLICATIONS") String queueAclType,
|
||||
@Context HttpServletRequest hsr) throws AuthorizationException {
|
||||
init();
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
|
||||
return pipeline.getRootInterceptor().checkUserAccessToQueue(queue,
|
||||
username, queueAclType, hsr);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path(RMWSConsts.APPS_APPID_APPATTEMPTS_APPATTEMPTID)
|
||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||
|
|
|
@ -318,6 +318,12 @@ public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
|
|||
return new AppAttemptsInfo();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response checkUserAccessToQueue(String queue, String username,
|
||||
String queueAclType, HttpServletRequest hsr) {
|
||||
return Response.status(Status.OK).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
|
||||
HttpServletResponse res, String appId, String appAttemptId) {
|
||||
|
|
|
@ -68,6 +68,14 @@ public class PassThroughRESTRequestInterceptor
|
|||
return getNextInterceptor().getAppAttempts(hsr, appId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response checkUserAccessToQueue(String queue, String username,
|
||||
String queueAclType, HttpServletRequest hsr)
|
||||
throws AuthorizationException {
|
||||
return getNextInterceptor().checkUserAccessToQueue(queue, username,
|
||||
queueAclType, hsr);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
|
||||
HttpServletResponse res, String appId, String appAttemptId) {
|
||||
|
|
Loading…
Reference in New Issue