YARN-5956. Refactor ClientRMService for unify error handling across apis. Contributed by Kai Sasaki.

This commit is contained in:
Sunil G 2017-03-26 17:00:06 +05:30
parent 43f23e5021
commit f51ee482a7
2 changed files with 108 additions and 179 deletions

View File

@ -194,7 +194,7 @@ public class ClientRMService extends AbstractService implements
protected RMDelegationTokenSecretManager rmDTSecretManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
InetSocketAddress clientBindAddress;
private InetSocketAddress clientBindAddress;
private final ApplicationACLsManager applicationsACLsManager;
private final QueueACLsManager queueACLsManager;
@ -204,9 +204,6 @@ public class ClientRMService extends AbstractService implements
private ReservationSystem reservationSystem;
private ReservationInputValidator rValidator;
private static final EnumSet<RMAppState> COMPLETED_APP_STATES = EnumSet.of(
RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of(
RMAppState.ACCEPTED, RMAppState.RUNNING);
@ -296,11 +293,12 @@ public class ClientRMService extends AbstractService implements
/**
* check if the calling user has the access to application information.
* @param callerUGI
* @param owner
* @param operationPerformed
* @param application
* @return
* @param callerUGI the user information who submit the request
* @param owner the user of the application
* @param operationPerformed the type of operation defined in
* {@link ApplicationAccessType}
* @param application submitted application
* @return access is permitted or not
*/
private boolean checkAccess(UserGroupInformation callerUGI, String owner,
ApplicationAccessType operationPerformed, RMApp application) {
@ -377,24 +375,15 @@ public class ClientRMService extends AbstractService implements
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request) throws YarnException,
IOException {
ApplicationId applicationId
= request.getApplicationAttemptId().getApplicationId();
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(
appAttemptId.getApplicationId());
if (application == null) {
// If the RM doesn't have the application, throw
// ApplicationNotFoundException and let client to handle.
throw new ApplicationNotFoundException("Application with id '"
+ request.getApplicationAttemptId().getApplicationId()
+ "' doesn't exist in RM. Please check that the job "
+ "submission was successful.");
}
UserGroupInformation callerUGI = getCallerUgi(applicationId,
AuditConstants.GET_APP_ATTEMPT_REPORT);
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
AuditConstants.GET_APP_ATTEMPT_REPORT, ApplicationAccessType.VIEW_APP,
false);
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
@ -420,21 +409,13 @@ public class ClientRMService extends AbstractService implements
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException, IOException {
ApplicationId appId = request.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(appId);
if (application == null) {
// If the RM doesn't have the application, throw
// ApplicationNotFoundException and let client to handle.
throw new ApplicationNotFoundException("Application with id '" + appId
+ "' doesn't exist in RM. Please check that the job submission "
+ "was successful.");
}
UserGroupInformation callerUGI = getCallerUgi(appId,
AuditConstants.GET_APP_ATTEMPTS);
RMApp application = verifyUserAccessForRMApp(appId, callerUGI,
AuditConstants.GET_APP_ATTEMPTS, ApplicationAccessType.VIEW_APP,
false);
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
GetApplicationAttemptsResponse response = null;
@ -469,21 +450,13 @@ public class ClientRMService extends AbstractService implements
ContainerId containerId = request.getContainerId();
ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
ApplicationId appId = appAttemptId.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(appId);
if (application == null) {
// If the RM doesn't have the application, throw
// ApplicationNotFoundException and let client to handle.
throw new ApplicationNotFoundException("Application with id '" + appId
+ "' doesn't exist in RM. Please check that the job submission "
+ "was successful.");
}
UserGroupInformation callerUGI = getCallerUgi(appId,
AuditConstants.GET_CONTAINER_REPORT);
RMApp application = verifyUserAccessForRMApp(appId, callerUGI,
AuditConstants.GET_CONTAINER_REPORT, ApplicationAccessType.VIEW_APP,
false);
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
GetContainerReportResponse response = null;
@ -494,13 +467,13 @@ public class ClientRMService extends AbstractService implements
"ApplicationAttempt with id '" + appAttemptId +
"' doesn't exist in RM.");
}
RMContainer rmConatiner = this.rmContext.getScheduler().getRMContainer(
RMContainer rmContainer = this.rmContext.getScheduler().getRMContainer(
containerId);
if (rmConatiner == null) {
if (rmContainer == null) {
throw new ContainerNotFoundException("Container with id '" + containerId
+ "' doesn't exist in RM.");
}
response = GetContainerReportResponse.newInstance(rmConatiner
response = GetContainerReportResponse.newInstance(rmContainer
.createContainerReport());
} else {
throw new YarnException("User " + callerUGI.getShortUserName()
@ -520,21 +493,13 @@ public class ClientRMService extends AbstractService implements
throws YarnException, IOException {
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
ApplicationId appId = appAttemptId.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(appId);
if (application == null) {
// If the RM doesn't have the application, throw
// ApplicationNotFoundException and let client to handle.
throw new ApplicationNotFoundException("Application with id '" + appId
+ "' doesn't exist in RM. Please check that the job submission "
+ "was successful.");
}
UserGroupInformation callerUGI = getCallerUgi(appId,
AuditConstants.GET_CONTAINERS);
RMApp application = verifyUserAccessForRMApp(appId, callerUGI,
AuditConstants.GET_CONTAINERS, ApplicationAccessType.VIEW_APP,
false);
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
GetContainersResponse response = null;
@ -651,9 +616,8 @@ public class ClientRMService extends AbstractService implements
throw e;
}
SubmitApplicationResponse response = recordFactory
return recordFactory
.newRecordInstance(SubmitApplicationResponse.class);
return response;
}
@SuppressWarnings("unchecked")
@ -664,26 +628,11 @@ public class ClientRMService extends AbstractService implements
ApplicationAttemptId attemptId = request.getApplicationAttemptId();
ApplicationId applicationId = attemptId.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.FAIL_ATTEMPT_REQUEST,
"UNKNOWN", "ClientRMService" , "Error getting UGI",
applicationId, attemptId);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST, "UNKNOWN", "ClientRMService",
"Trying to fail an attempt of an absent application", applicationId,
attemptId);
throw new ApplicationNotFoundException("Trying to fail an attempt "
+ attemptId + " of an absent application " + applicationId);
}
UserGroupInformation callerUGI = getCallerUgi(applicationId,
AuditConstants.FAIL_ATTEMPT_REQUEST);
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
AuditConstants.FAIL_ATTEMPT_REQUEST, ApplicationAccessType.MODIFY_APP,
true);
RMAppAttempt appAttempt = application.getAppAttempts().get(attemptId);
if (appAttempt == null) {
@ -691,28 +640,14 @@ public class ClientRMService extends AbstractService implements
"ApplicationAttempt with id '" + attemptId + "' doesn't exist in RM.");
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST,
"User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
FailApplicationAttemptResponse response =
recordFactory.newRecordInstance(FailApplicationAttemptResponse.class);
if (!ACTIVE_APP_STATES.contains(application.getState())) {
if (COMPLETED_APP_STATES.contains(application.getState())) {
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST, "ClientRMService",
applicationId);
return response;
}
if (application.isAppInCompletedStates()) {
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST, "ClientRMService",
applicationId);
return response;
}
this.rmContext.getDispatcher().getEventHandler().handle(
@ -790,7 +725,7 @@ public class ClientRMService extends AbstractService implements
.handle(new RMAppKillByClientEvent(applicationId, message.toString(),
callerUGI, remoteAddress));
// For UnmanagedAMs, return true so they don't retry
// For Unmanaged AMs, return true so they don't retry
return KillApplicationResponse.newInstance(
application.getApplicationSubmissionContext().getUnmanagedAM());
}
@ -1082,15 +1017,15 @@ public class ClientRMService extends AbstractService implements
RMDelegationTokenIdentifier tokenIdentifier =
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()),
realUser);
Token<RMDelegationTokenIdentifier> realRMDTtoken =
Token<RMDelegationTokenIdentifier> realRMDToken =
new Token<RMDelegationTokenIdentifier>(tokenIdentifier,
this.rmDTSecretManager);
response.setRMDelegationToken(
BuilderUtils.newDelegationToken(
realRMDTtoken.getIdentifier(),
realRMDTtoken.getKind().toString(),
realRMDTtoken.getPassword(),
realRMDTtoken.getService().toString()
realRMDToken.getIdentifier(),
realRMDToken.getKind().toString(),
realRMDToken.getPassword(),
realRMDToken.getService().toString()
));
return response;
} catch(IOException io) {
@ -1150,37 +1085,11 @@ public class ClientRMService extends AbstractService implements
MoveApplicationAcrossQueuesRequest request) throws YarnException {
ApplicationId applicationId = request.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.MOVE_APP_REQUEST,
"UNKNOWN", "ClientRMService" , "Error getting UGI",
applicationId);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
"Trying to move an absent application", applicationId);
throw new ApplicationNotFoundException("Trying to move an absent"
+ " application " + applicationId);
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.MOVE_APP_REQUEST,
"User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
UserGroupInformation callerUGI = getCallerUgi(applicationId,
AuditConstants.MOVE_APP_REQUEST);
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
AuditConstants.MOVE_APP_REQUEST, ApplicationAccessType.MODIFY_APP,
true);
// Moves only allowed when app is in a state that means it is tracked by
// the scheduler. Introducing SUBMITTED state also to this list as there
@ -1206,9 +1115,8 @@ public class ClientRMService extends AbstractService implements
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId);
MoveApplicationAcrossQueuesResponse response = recordFactory
return recordFactory
.newRecordInstance(MoveApplicationAcrossQueuesResponse.class);
return response;
}
private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
@ -1247,7 +1155,7 @@ public class ClientRMService extends AbstractService implements
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
checkReservationSytem(AuditConstants.CREATE_NEW_RESERVATION_REQUEST);
checkReservationSystem(AuditConstants.CREATE_NEW_RESERVATION_REQUEST);
GetNewReservationResponse response =
recordFactory.newRecordInstance(GetNewReservationResponse.class);
@ -1261,7 +1169,7 @@ public class ClientRMService extends AbstractService implements
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.SUBMIT_RESERVATION_REQUEST);
checkReservationSystem(AuditConstants.SUBMIT_RESERVATION_REQUEST);
ReservationSubmissionResponse response =
recordFactory.newRecordInstance(ReservationSubmissionResponse.class);
ReservationId reservationId = request.getReservationId();
@ -1320,7 +1228,7 @@ public class ClientRMService extends AbstractService implements
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.UPDATE_RESERVATION_REQUEST);
checkReservationSystem(AuditConstants.UPDATE_RESERVATION_REQUEST);
ReservationUpdateResponse response =
recordFactory.newRecordInstance(ReservationUpdateResponse.class);
// Validate the input
@ -1359,7 +1267,7 @@ public class ClientRMService extends AbstractService implements
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.DELETE_RESERVATION_REQUEST);
checkReservationSystem(AuditConstants.DELETE_RESERVATION_REQUEST);
ReservationDeleteResponse response =
recordFactory.newRecordInstance(ReservationDeleteResponse.class);
// Validate the input
@ -1398,7 +1306,7 @@ public class ClientRMService extends AbstractService implements
public ReservationListResponse listReservations(
ReservationListRequest requestInfo) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.LIST_RESERVATION_REQUEST);
checkReservationSystem(AuditConstants.LIST_RESERVATION_REQUEST);
ReservationListResponse response =
recordFactory.newRecordInstance(ReservationListResponse.class);
@ -1473,7 +1381,7 @@ public class ClientRMService extends AbstractService implements
return response;
}
private void checkReservationSytem(String auditConstant) throws YarnException {
private void checkReservationSystem(String auditConstant) throws YarnException {
// Check if reservation is enabled
if (reservationSystem == null) {
throw RPCUtil.getRemoteException("Reservation is not enabled."
@ -1592,7 +1500,8 @@ public class ClientRMService extends AbstractService implements
UserGroupInformation callerUGI =
getCallerUgi(applicationId, AuditConstants.UPDATE_APP_PRIORITY);
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
AuditConstants.UPDATE_APP_PRIORITY);
AuditConstants.UPDATE_APP_PRIORITY, ApplicationAccessType.MODIFY_APP,
true);
UpdateApplicationPriorityResponse response = recordFactory
.newRecordInstance(UpdateApplicationPriorityResponse.class);
@ -1635,9 +1544,14 @@ public class ClientRMService extends AbstractService implements
}
/**
* Signal a container.
* Send a signal to a container.
*
* After the request passes some sanity check, it will be delivered
* to RMNodeImpl so that the next NM heartbeat will pick up the signal request
* @param request request to signal a container
* @return the response of sending signal request
* @throws YarnException rpc related exception
* @throws IOException fail to obtain user group information
*/
@SuppressWarnings("unchecked")
@Override
@ -1709,7 +1623,8 @@ public class ClientRMService extends AbstractService implements
UserGroupInformation callerUGI =
getCallerUgi(applicationId, AuditConstants.UPDATE_APP_TIMEOUTS);
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
AuditConstants.UPDATE_APP_TIMEOUTS);
AuditConstants.UPDATE_APP_TIMEOUTS, ApplicationAccessType.MODIFY_APP,
true);
if (applicationTimeouts.isEmpty()) {
String message =
@ -1728,7 +1643,7 @@ public class ClientRMService extends AbstractService implements
if (!EnumSet
.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.RUNNING)
.contains(state)) {
if (COMPLETED_APP_STATES.contains(state)) {
if (application.isAppInCompletedStates()) {
// If Application is in any of the final states, update timeout
// can be skipped rather throwing exception.
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
@ -1773,26 +1688,35 @@ public class ClientRMService extends AbstractService implements
}
private RMApp verifyUserAccessForRMApp(ApplicationId applicationId,
UserGroupInformation callerUGI, String operation) throws YarnException {
UserGroupInformation callerUGI, String operation,
ApplicationAccessType accessType,
boolean needCheckAccess) throws YarnException {
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(), operation, "UNKNOWN",
"ClientRMService",
"Trying to " + operation + " of an absent application",
applicationId);
throw new ApplicationNotFoundException("Trying to " + operation
+ " of an absent application " + applicationId);
"ClientRMService",
"Trying to " + operation + " of an absent application",
applicationId);
// If the RM doesn't have the application, throw
// ApplicationNotFoundException and let client to handle.
throw new ApplicationNotFoundException("Application with id '"
+ applicationId + "' doesn't exist in RM. "
+ "Please check that the job "
+ "submission was successful.");
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation,
"User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(),
"ClientRMService", AuditConstants.UNAUTHORIZED_USER, applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
if (needCheckAccess) {
if (!checkAccess(callerUGI, application.getUser(),
accessType, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation,
"User doesn't have permissions to "
+ accessType.toString(),
"ClientRMService", AuditConstants.UNAUTHORIZED_USER,
applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ accessType.name() + " on " + applicationId));
}
}
return application;
}

View File

@ -54,6 +54,11 @@ public class RMAuditLogger {
public static final String GET_APP_STATE = "Get Application State";
public static final String GET_APP_PRIORITY = "Get Application Priority";
public static final String GET_APP_QUEUE = "Get Application Queue";
public static final String GET_APP_ATTEMPTS = "Get Application Attempts";
public static final String GET_APP_ATTEMPT_REPORT
= "Get Application Attempt Report";
public static final String GET_CONTAINERS = "Get Containers";
public static final String GET_CONTAINER_REPORT = "Get Container Report";
public static final String FINISH_SUCCESS_APP = "Application Finished - Succeeded";
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
public static final String FINISH_KILLED_APP = "Application Finished - Killed";