YARN-7225. Add queue and partition info to RM audit log. Contributed by Eric Payne

This commit is contained in:
Jonathan Hung 2018-11-01 14:13:15 -07:00
parent c19baf19da
commit 62ec800bd4
6 changed files with 129 additions and 21 deletions

View File

@ -558,7 +558,8 @@ public class ClientRMService extends AbstractService implements
LOG.warn("Unable to get the current user.", ie); LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService", ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId, callerContext); "Exception in submitting application", applicationId, callerContext,
submissionContext.getQueue());
throw RPCUtil.getRemoteException(ie); throw RPCUtil.getRemoteException(ie);
} }
@ -580,7 +581,8 @@ public class ClientRMService extends AbstractService implements
". Flow run should be a long integer", e); ". Flow run should be a long integer", e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService", e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId); "Exception in submitting application", applicationId,
submissionContext.getQueue());
throw RPCUtil.getRemoteException(e); throw RPCUtil.getRemoteException(e);
} }
} }
@ -639,12 +641,14 @@ public class ClientRMService extends AbstractService implements
LOG.info("Application with id " + applicationId.getId() + LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user); " submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId, callerContext); "ClientRMService", applicationId, callerContext,
submissionContext.getQueue());
} catch (YarnException e) { } catch (YarnException e) {
LOG.info("Exception in submitting " + applicationId, e); LOG.info("Exception in submitting " + applicationId, e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService", e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId, callerContext); "Exception in submitting application", applicationId, callerContext,
submissionContext.getQueue());
throw e; throw e;
} }

View File

@ -40,7 +40,7 @@ public class RMAuditLogger {
static enum Keys {USER, OPERATION, TARGET, RESULT, IP, PERMISSIONS, static enum Keys {USER, OPERATION, TARGET, RESULT, IP, PERMISSIONS,
DESCRIPTION, APPID, APPATTEMPTID, CONTAINERID, DESCRIPTION, APPID, APPATTEMPTID, CONTAINERID,
CALLERCONTEXT, CALLERSIGNATURE, RESOURCE, QUEUENAME, CALLERCONTEXT, CALLERSIGNATURE, RESOURCE, QUEUENAME,
INCLUDEAPPS, INCLUDECHILDQUEUES, RECURSIVE} INCLUDEAPPS, INCLUDECHILDQUEUES, RECURSIVE, NODELABEL}
public static class AuditConstants { public static class AuditConstants {
static final String SUCCESS = "SUCCESS"; static final String SUCCESS = "SUCCESS";
@ -98,7 +98,7 @@ public class RMAuditLogger {
ApplicationId appId, ApplicationAttemptId attemptId, ApplicationId appId, ApplicationAttemptId attemptId,
ContainerId containerId, Resource resource) { ContainerId containerId, Resource resource) {
return createSuccessLog(user, operation, target, appId, attemptId, return createSuccessLog(user, operation, target, appId, attemptId,
containerId, resource, null, Server.getRemoteIp()); containerId, resource, null, Server.getRemoteIp(), null, null);
} }
/** /**
@ -124,7 +124,7 @@ public class RMAuditLogger {
static String createSuccessLog(String user, String operation, String target, static String createSuccessLog(String user, String operation, String target,
ApplicationId appId, ApplicationAttemptId attemptId, ApplicationId appId, ApplicationAttemptId attemptId,
ContainerId containerId, Resource resource, CallerContext callerContext, ContainerId containerId, Resource resource, CallerContext callerContext,
InetAddress ip) { InetAddress ip, String queueName, String partition) {
StringBuilder b = StringBuilder b =
createStringBuilderForSuccessEvent(user, operation, target, ip); createStringBuilderForSuccessEvent(user, operation, target, ip);
if (appId != null) { if (appId != null) {
@ -140,6 +140,12 @@ public class RMAuditLogger {
add(Keys.RESOURCE, resource.toString(), b); add(Keys.RESOURCE, resource.toString(), b);
} }
appendCallerContext(b, callerContext); appendCallerContext(b, callerContext);
if (queueName != null) {
add(Keys.QUEUENAME, queueName, b);
}
if (partition != null) {
add(Keys.NODELABEL, partition, b);
}
return b.toString(); return b.toString();
} }
@ -202,6 +208,32 @@ public class RMAuditLogger {
} }
} }
/**
* Create a readable and parseable audit log string for a successful event.
*
* @param user User who made the service request to the ResourceManager
* @param operation Operation requested by the user.
* @param target The target on which the operation is being performed.
* @param appId Application Id in which operation was performed.
* @param containerId Container Id in which operation was performed.
* @param resource Resource associated with container.
* @param queueName Name of queue.
* @param partition Name of labeled partition.
*
* <br><br>
* Note that the {@link RMAuditLogger} uses tabs ('\t') as a key-val delimiter
* and hence the value fields should not contains tabs ('\t').
*/
public static void logSuccess(String user, String operation, String target,
ApplicationId appId, ContainerId containerId, Resource resource,
String queueName, String partition) {
if (LOG.isInfoEnabled()) {
LOG.info(createSuccessLog(user, operation, target, appId, null,
containerId, resource, null, Server.getRemoteIp(), queueName,
partition));
}
}
/** /**
* Create a general readable and parseable audit log string for a successful * Create a general readable and parseable audit log string for a successful
* event. * event.
@ -268,7 +300,15 @@ public class RMAuditLogger {
ApplicationId appId, CallerContext callerContext) { ApplicationId appId, CallerContext callerContext) {
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info(createSuccessLog(user, operation, target, appId, null, null, LOG.info(createSuccessLog(user, operation, target, appId, null, null,
null, callerContext, Server.getRemoteIp())); null, callerContext, Server.getRemoteIp(), null, null));
}
}
public static void logSuccess(String user, String operation, String target,
ApplicationId appId, CallerContext callerContext, String queueName) {
if (LOG.isInfoEnabled()) {
LOG.info(createSuccessLog(user, operation, target, appId, null, null,
null, callerContext, Server.getRemoteIp(), queueName, null));
} }
} }
@ -296,7 +336,7 @@ public class RMAuditLogger {
ApplicationId appId, InetAddress ip) { ApplicationId appId, InetAddress ip) {
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info(createSuccessLog(user, operation, target, appId, null, null, LOG.info(createSuccessLog(user, operation, target, appId, null, null,
null, null, ip)); null, null, ip, null, null));
} }
} }
@ -355,7 +395,7 @@ public class RMAuditLogger {
static String createFailureLog(String user, String operation, String perm, static String createFailureLog(String user, String operation, String perm,
String target, String description, ApplicationId appId, String target, String description, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId, ApplicationAttemptId attemptId, ContainerId containerId,
Resource resource, CallerContext callerContext) { Resource resource, CallerContext callerContext, String queueName) {
StringBuilder b = createStringBuilderForFailureLog(user, StringBuilder b = createStringBuilderForFailureLog(user,
operation, target, description, perm); operation, target, description, perm);
if (appId != null) { if (appId != null) {
@ -371,6 +411,9 @@ public class RMAuditLogger {
add(Keys.RESOURCE, resource.toString(), b); add(Keys.RESOURCE, resource.toString(), b);
} }
appendCallerContext(b, callerContext); appendCallerContext(b, callerContext);
if (queueName != null) {
add(Keys.QUEUENAME, queueName, b);
}
return b.toString(); return b.toString();
} }
@ -381,7 +424,7 @@ public class RMAuditLogger {
String target, String description, ApplicationId appId, String target, String description, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId, Resource resource) { ApplicationAttemptId attemptId, ContainerId containerId, Resource resource) {
return createFailureLog(user, operation, perm, target, description, appId, return createFailureLog(user, operation, perm, target, description, appId,
attemptId, containerId, resource, null); attemptId, containerId, resource, null, null);
} }
/** /**
@ -453,7 +496,16 @@ public class RMAuditLogger {
CallerContext callerContext) { CallerContext callerContext) {
if (LOG.isWarnEnabled()) { if (LOG.isWarnEnabled()) {
LOG.warn(createFailureLog(user, operation, perm, target, description, LOG.warn(createFailureLog(user, operation, perm, target, description,
appId, null, null, null, callerContext)); appId, null, null, null, callerContext, null));
}
}
public static void logFailure(String user, String operation, String perm,
String target, String description, ApplicationId appId,
CallerContext callerContext, String queueName) {
if (LOG.isWarnEnabled()) {
LOG.warn(createFailureLog(user, operation, perm, target, description,
appId, null, null, null, callerContext, queueName));
} }
} }
@ -480,6 +532,15 @@ public class RMAuditLogger {
} }
} }
public static void logFailure(String user, String operation, String perm,
String target, String description, ApplicationId appId,
String queueName) {
if (LOG.isWarnEnabled()) {
LOG.warn(createFailureLog(user, operation, perm, target, description,
appId, null, null, null, null, queueName));
}
}
/** /**
* Create a readable and parseable audit log string for a failed event. * Create a readable and parseable audit log string for a failed event.
* *

View File

@ -191,9 +191,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
containersToPreempt.remove(containerId); containersToPreempt.remove(containerId);
// In order to save space in the audit log, only include the partition
// if it is not the default partition.
String containerPartition = null;
if (partition != null && !partition.isEmpty()) {
containerPartition = partition;
}
Resource containerResource = rmContainer.getContainer().getResource(); Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
"SchedulerApp", getApplicationId(), containerId, containerResource); "SchedulerApp", getApplicationId(), containerId, containerResource,
getQueueName(), containerPartition);
// Update usage metrics // Update usage metrics
queue.getMetrics().releaseResources(partition, queue.getMetrics().releaseResources(partition,
@ -566,9 +573,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
+ " host=" + rmContainer.getAllocatedNode().getHost() + " host=" + rmContainer.getAllocatedNode().getHost()
+ " type=" + allocation.getAllocationLocalityType()); + " type=" + allocation.getAllocationLocalityType());
} }
// In order to save space in the audit log, only include the partition
// if it is not the default partition.
String partition =
schedulerContainer.getSchedulerNode().getPartition();
if (partition != null && partition.isEmpty()) {
partition = null;
}
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
"SchedulerApp", getApplicationId(), containerId, "SchedulerApp", getApplicationId(), containerId,
allocation.getAllocatedOrReservedResource()); allocation.getAllocatedOrReservedResource(), getQueueName(),
partition);
} else { } else {
// If the rmContainer's state is already updated to RESERVED, this is // If the rmContainer's state is already updated to RESERVED, this is
// a reReservation // a reReservation

View File

@ -168,7 +168,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resource containerResource = rmContainer.getContainer().getResource(); Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
"SchedulerApp", getApplicationId(), containerId, containerResource); "SchedulerApp", getApplicationId(), containerId, containerResource,
rmContainer.getQueueName(), null);
// Update usage metrics // Update usage metrics
queue.getMetrics().releaseResources( queue.getMetrics().releaseResources(
@ -486,7 +487,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
} }
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
"SchedulerApp", getApplicationId(), container.getId(), "SchedulerApp", getApplicationId(), container.getId(),
container.getResource()); container.getResource(), getQueueName(), null);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }

View File

@ -98,9 +98,17 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
.getApplicationAttemptId() + " container=" + containerId + " host=" .getApplicationAttemptId() + " container=" + containerId + " host="
+ container.getNodeId().getHost() + " type=" + type); + container.getNodeId().getHost() + " type=" + type);
} }
// In order to save space in the audit log, only include the partition
// if it is not the default partition.
String partition = null;
if (appAMNodePartitionName != null &&
!appAMNodePartitionName.isEmpty()) {
partition = appAMNodePartitionName;
}
RMAuditLogger.logSuccess(getUser(), RMAuditLogger.logSuccess(getUser(),
RMAuditLogger.AuditConstants.ALLOC_CONTAINER, "SchedulerApp", RMAuditLogger.AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), containerId, container.getResource()); getApplicationId(), containerId, container.getResource(),
getQueueName(), partition);
return rmContainer; return rmContainer;
} finally { } finally {

View File

@ -67,6 +67,7 @@ public class TestRMAuditLogger {
private static final Resource RESOURCE = mock(Resource.class); private static final Resource RESOURCE = mock(Resource.class);
private static final String CALLER_CONTEXT = "context"; private static final String CALLER_CONTEXT = "context";
private static final byte[] CALLER_SIGNATURE = "signature".getBytes(); private static final byte[] CALLER_SIGNATURE = "signature".getBytes();
private static final String PARTITION = "label1";
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -132,6 +133,14 @@ public class TestRMAuditLogger {
ApplicationAttemptId attemptId, ContainerId containerId, ApplicationAttemptId attemptId, ContainerId containerId,
CallerContext callerContext, Resource resource, InetAddress remoteIp, CallerContext callerContext, Resource resource, InetAddress remoteIp,
RMAuditLogger.ArgsBuilder args) { RMAuditLogger.ArgsBuilder args) {
testSuccessLogFormatHelper(checkIP, appId, attemptId, containerId,
callerContext, resource, remoteIp, args, null, null);
}
private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId,
CallerContext callerContext, Resource resource, InetAddress remoteIp,
RMAuditLogger.ArgsBuilder args, String queueName, String partition) {
String sLog; String sLog;
InetAddress tmpIp = checkIP ? remoteIp : null; InetAddress tmpIp = checkIP ? remoteIp : null;
if (args != null) { if (args != null) {
@ -139,7 +148,8 @@ public class TestRMAuditLogger {
tmpIp, args); tmpIp, args);
} else { } else {
sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET, appId, sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET, appId,
attemptId, containerId, resource, callerContext, tmpIp); attemptId, containerId, resource, callerContext, tmpIp, queueName,
partition);
} }
StringBuilder expLog = new StringBuilder(); StringBuilder expLog = new StringBuilder();
expLog.append("USER=test\t"); expLog.append("USER=test\t");
@ -177,6 +187,13 @@ public class TestRMAuditLogger {
if (args != null) { if (args != null) {
expLog.append("\tQUEUENAME=root"); expLog.append("\tQUEUENAME=root");
expLog.append("\tRECURSIVE=true"); expLog.append("\tRECURSIVE=true");
} else {
if (queueName != null) {
expLog.append("\tQUEUENAME=" + QUEUE);
}
}
if (partition != null) {
expLog.append("\tNODELABEL=" + PARTITION);
} }
assertEquals(expLog.toString(), sLog); assertEquals(expLog.toString(), sLog);
} }
@ -258,6 +275,8 @@ public class TestRMAuditLogger {
.append(Keys.QUEUENAME, QUEUE).append(Keys.RECURSIVE, "true"); .append(Keys.QUEUENAME, QUEUE).append(Keys.RECURSIVE, "true");
testSuccessLogFormatHelper(checkIP, null, null, null, null, null, testSuccessLogFormatHelper(checkIP, null, null, null, null, null,
Server.getRemoteIp(), args); Server.getRemoteIp(), args);
testSuccessLogFormatHelper(checkIP, null, null, null, null, null,
Server.getRemoteIp(), null, QUEUE, PARTITION);
testSuccessLogFormatHelperWithIP(checkIP, APPID, ATTEMPTID, CONTAINERID); testSuccessLogFormatHelperWithIP(checkIP, APPID, ATTEMPTID, CONTAINERID);
testSuccessLogNulls(checkIP); testSuccessLogNulls(checkIP);
} }
@ -283,7 +302,7 @@ public class TestRMAuditLogger {
RMAuditLogger.ArgsBuilder args) { RMAuditLogger.ArgsBuilder args) {
String fLog = args == null ? String fLog = args == null ?
RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC, RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC,
appId, attemptId, containerId, resource, callerContext) : appId, attemptId, containerId, resource, callerContext, null) :
RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC, RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC,
args); args);
StringBuilder expLog = new StringBuilder(); StringBuilder expLog = new StringBuilder();