YARN-4349. Support CallerContext in YARN. Contributed by Wangda Tan

(cherry picked from commit 8676a118a1)
This commit is contained in:
Jian He 2015-11-23 17:18:59 -08:00
parent 75b007260b
commit b339a4b8d6
27 changed files with 318 additions and 61 deletions

View File

@ -23,6 +23,7 @@ import java.io.PrintStream;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallerContext;
/** /**
* A utility to help run {@link Tool}s. * A utility to help run {@link Tool}s.
@ -58,6 +59,11 @@ public class ToolRunner {
*/ */
public static int run(Configuration conf, Tool tool, String[] args) public static int run(Configuration conf, Tool tool, String[] args)
throws Exception{ throws Exception{
if (CallerContext.getCurrent() == null) {
CallerContext ctx = new CallerContext.Builder("CLI").build();
CallerContext.setCurrent(ctx);
}
if(conf == null) { if(conf == null) {
conf = new Configuration(); conf = new Configuration();
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
@ -86,6 +87,9 @@ class YarnChild {
JVMId jvmId = new JVMId(firstTaskid.getJobID(), JVMId jvmId = new JVMId(firstTaskid.getJobID(),
firstTaskid.getTaskType() == TaskType.MAP, jvmIdLong); firstTaskid.getTaskType() == TaskType.MAP, jvmIdLong);
CallerContext.setCurrent(
new CallerContext.Builder("mr_" + firstTaskid.toString()).build());
// initialize metrics // initialize metrics
DefaultMetricsSystem.initialize( DefaultMetricsSystem.initialize(
StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task"); StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.LocalContainerLauncher;
@ -1525,6 +1526,10 @@ public class MRAppMaster extends CompositeService {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId = ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId(); containerId.getApplicationAttemptId();
if (applicationAttemptId != null) {
CallerContext.setCurrent(new CallerContext.Builder(
"mr_appmaster_" + applicationAttemptId.toString()).build());
}
long appSubmitTime = Long.parseLong(appSubmitTimeStr); long appSubmitTime = Long.parseLong(appSubmitTimeStr);

View File

@ -198,6 +198,8 @@ Release 2.8.0 - UNRELEASED
YARN-4184. Remove update reservation state api from state store as its not used by YARN-4184. Remove update reservation state api from state store as its not used by
ReservationSystem (Sean Po via asuresh) ReservationSystem (Sean Po via asuresh)
YARN-4349. Support CallerContext in YARN. (wtan via jianhe)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -76,6 +76,12 @@ public class ApplicationMetricsConstants {
public static final String LATEST_APP_ATTEMPT_EVENT_INFO = public static final String LATEST_APP_ATTEMPT_EVENT_INFO =
"YARN_APPLICATION_LATEST_APP_ATTEMPT"; "YARN_APPLICATION_LATEST_APP_ATTEMPT";
public static final String YARN_APP_CALLER_CONTEXT =
"YARN_APPLICATION_CALLER_CONTEXT";
public static final String YARN_APP_CALLER_SIGNATURE =
"YARN_APPLICATION_CALLER_SIGNATURE";
public static final String APP_TAGS_INFO = "YARN_APPLICATION_TAGS"; public static final String APP_TAGS_INFO = "YARN_APPLICATION_TAGS";
public static final String UNMANAGED_APPLICATION_ENTITY_INFO = public static final String UNMANAGED_APPLICATION_ENTITY_INFO =

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -551,6 +552,7 @@ public class ClientRMService extends AbstractService implements
ApplicationSubmissionContext submissionContext = request ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext(); .getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId(); ApplicationId applicationId = submissionContext.getApplicationId();
CallerContext callerContext = CallerContext.getCurrent();
// ApplicationSubmissionContext needs to be validated for safety - only // ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be // those fields that are independent of the RM's configuration will be
@ -565,7 +567,7 @@ public class ClientRMService extends AbstractService implements
LOG.warn("Unable to get the current user.", ie); LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService", ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId); "Exception in submitting application", applicationId, callerContext);
throw RPCUtil.getRemoteException(ie); throw RPCUtil.getRemoteException(ie);
} }
@ -602,13 +604,13 @@ public class ClientRMService extends AbstractService implements
LOG.info("Application with id " + applicationId.getId() + LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user); " submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId); "ClientRMService", applicationId, callerContext);
} catch (YarnException e) { } catch (YarnException e) {
LOG.info("Exception in submitting application with id " + LOG.info("Exception in submitting application with id " +
applicationId.getId(), e); applicationId.getId(), e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService", e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId); "Exception in submitting application", applicationId, callerContext);
throw e; throw e;
} }
@ -693,6 +695,7 @@ public class ClientRMService extends AbstractService implements
KillApplicationRequest request) throws YarnException { KillApplicationRequest request) throws YarnException {
ApplicationId applicationId = request.getApplicationId(); ApplicationId applicationId = request.getApplicationId();
CallerContext callerContext = CallerContext.getCurrent();
UserGroupInformation callerUGI; UserGroupInformation callerUGI;
try { try {
@ -701,7 +704,7 @@ public class ClientRMService extends AbstractService implements
LOG.info("Error getting UGI ", ie); LOG.info("Error getting UGI ", ie);
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST, RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "ClientRMService" , "Error getting UGI", "UNKNOWN", "ClientRMService" , "Error getting UGI",
applicationId); applicationId, callerContext);
throw RPCUtil.getRemoteException(ie); throw RPCUtil.getRemoteException(ie);
} }
@ -709,7 +712,7 @@ public class ClientRMService extends AbstractService implements
if (application == null) { if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(), RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService", AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService",
"Trying to kill an absent application", applicationId); "Trying to kill an absent application", applicationId, callerContext);
throw new ApplicationNotFoundException("Trying to kill an absent" throw new ApplicationNotFoundException("Trying to kill an absent"
+ " application " + applicationId); + " application " + applicationId);
} }
@ -720,7 +723,7 @@ public class ClientRMService extends AbstractService implements
AuditConstants.KILL_APP_REQUEST, AuditConstants.KILL_APP_REQUEST,
"User doesn't have permissions to " "User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId); AuditConstants.UNAUTHORIZED_USER, applicationId, callerContext);
throw RPCUtil.getRemoteException(new AccessControlException("User " throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation " + callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
@ -728,7 +731,8 @@ public class ClientRMService extends AbstractService implements
if (application.isAppFinalStateStored()) { if (application.isAppFinalStateStored()) {
RMAuditLogger.logSuccess(callerUGI.getShortUserName(), RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId); AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId,
callerContext);
return KillApplicationResponse.newInstance(true); return KillApplicationResponse.newInstance(true);
} }

View File

@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;

View File

@ -17,10 +17,12 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress; import java.net.InetAddress;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -35,7 +37,8 @@ public class RMAuditLogger {
private static final Log LOG = LogFactory.getLog(RMAuditLogger.class); private static final Log LOG = LogFactory.getLog(RMAuditLogger.class);
static enum Keys {USER, OPERATION, TARGET, RESULT, IP, PERMISSIONS, static enum Keys {USER, OPERATION, TARGET, RESULT, IP, PERMISSIONS,
DESCRIPTION, APPID, APPATTEMPTID, CONTAINERID} DESCRIPTION, APPID, APPATTEMPTID, CONTAINERID,
CALLERCONTEXT, CALLERSIGNATURE}
public static class AuditConstants { public static class AuditConstants {
static final String SUCCESS = "SUCCESS"; static final String SUCCESS = "SUCCESS";
@ -70,11 +73,19 @@ public class RMAuditLogger {
public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request"; public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request";
} }
static String createSuccessLog(String user, String operation, String target,
ApplicationId appId, ApplicationAttemptId attemptId,
ContainerId containerId) {
return createSuccessLog(user, operation, target, appId, attemptId,
containerId, null);
}
/** /**
* A helper api for creating an audit log for a successful event. * A helper api for creating an audit log for a successful event.
*/ */
static String createSuccessLog(String user, String operation, String target, static String createSuccessLog(String user, String operation, String target,
ApplicationId appId, ApplicationAttemptId attemptId, ContainerId containerId) { ApplicationId appId, ApplicationAttemptId attemptId,
ContainerId containerId, CallerContext callerContext) {
StringBuilder b = new StringBuilder(); StringBuilder b = new StringBuilder();
start(Keys.USER, user, b); start(Keys.USER, user, b);
addRemoteIP(b); addRemoteIP(b);
@ -90,9 +101,33 @@ public class RMAuditLogger {
if (containerId != null) { if (containerId != null) {
add(Keys.CONTAINERID, containerId.toString(), b); add(Keys.CONTAINERID, containerId.toString(), b);
} }
appendCallerContext(b, callerContext);
return b.toString(); return b.toString();
} }
private static void appendCallerContext(StringBuilder sb, CallerContext callerContext) {
String context = null;
byte[] signature = null;
if (callerContext != null) {
context = callerContext.getContext();
signature = callerContext.getSignature();
}
if (context != null) {
add(Keys.CALLERCONTEXT, context, sb);
}
if (signature != null) {
try {
String sigStr = new String(signature, "UTF-8");
add(Keys.CALLERSIGNATURE, sigStr, sb);
} catch (UnsupportedEncodingException e) {
// ignore this signature
}
}
}
/** /**
* Create a readable and parseable audit log string for a successful event. * Create a readable and parseable audit log string for a successful event.
* *
@ -135,6 +170,14 @@ public class RMAuditLogger {
} }
} }
public static void logSuccess(String user, String operation, String target,
ApplicationId appId, CallerContext callerContext) {
if (LOG.isInfoEnabled()) {
LOG.info(createSuccessLog(user, operation, target, appId, null, null,
callerContext));
}
}
/** /**
* Create a readable and parseable audit log string for a successful event. * Create a readable and parseable audit log string for a successful event.
@ -172,12 +215,10 @@ public class RMAuditLogger {
} }
} }
/**
* A helper api for creating an audit log for a failure event.
*/
static String createFailureLog(String user, String operation, String perm, static String createFailureLog(String user, String operation, String perm,
String target, String description, ApplicationId appId, String target, String description, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) { ApplicationAttemptId attemptId, ContainerId containerId,
CallerContext callerContext) {
StringBuilder b = new StringBuilder(); StringBuilder b = new StringBuilder();
start(Keys.USER, user, b); start(Keys.USER, user, b);
addRemoteIP(b); addRemoteIP(b);
@ -195,9 +236,20 @@ public class RMAuditLogger {
if (containerId != null) { if (containerId != null) {
add(Keys.CONTAINERID, containerId.toString(), b); add(Keys.CONTAINERID, containerId.toString(), b);
} }
appendCallerContext(b, callerContext);
return b.toString(); return b.toString();
} }
/**
* A helper api for creating an audit log for a failure event.
*/
static String createFailureLog(String user, String operation, String perm,
String target, String description, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) {
return createFailureLog(user, operation, perm, target, description, appId,
attemptId, containerId, null);
}
/** /**
* Create a readable and parseable audit log string for a failed event. * Create a readable and parseable audit log string for a failed event.
* *
@ -247,6 +299,14 @@ public class RMAuditLogger {
} }
} }
public static void logFailure(String user, String operation, String perm,
String target, String description, ApplicationId appId,
CallerContext callerContext) {
if (LOG.isWarnEnabled()) {
LOG.warn(createFailureLog(user, operation, perm, target, description,
appId, null, null, callerContext));
}
}
/** /**
* Create a readable and parseable audit log string for a failed event. * Create a readable and parseable audit log string for a failed event.

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
@ -37,6 +38,8 @@ public class ApplicationCreatedEvent extends
private Priority applicationPriority; private Priority applicationPriority;
private String appNodeLabelsExpression; private String appNodeLabelsExpression;
private String amNodeLabelsExpression; private String amNodeLabelsExpression;
private final CallerContext callerContext;
public ApplicationCreatedEvent(ApplicationId appId, public ApplicationCreatedEvent(ApplicationId appId,
String name, String name,
@ -49,7 +52,8 @@ public class ApplicationCreatedEvent extends
boolean unmanagedApplication, boolean unmanagedApplication,
Priority applicationPriority, Priority applicationPriority,
String appNodeLabelsExpression, String appNodeLabelsExpression,
String amNodeLabelsExpression) { String amNodeLabelsExpression,
CallerContext callerContext) {
super(SystemMetricsEventType.APP_CREATED, createdTime); super(SystemMetricsEventType.APP_CREATED, createdTime);
this.appId = appId; this.appId = appId;
this.name = name; this.name = name;
@ -62,6 +66,7 @@ public class ApplicationCreatedEvent extends
this.applicationPriority = applicationPriority; this.applicationPriority = applicationPriority;
this.appNodeLabelsExpression = appNodeLabelsExpression; this.appNodeLabelsExpression = appNodeLabelsExpression;
this.amNodeLabelsExpression = amNodeLabelsExpression; this.amNodeLabelsExpression = amNodeLabelsExpression;
this.callerContext = callerContext;
} }
@Override @Override
@ -112,4 +117,8 @@ public class ApplicationCreatedEvent extends
public String getAmNodeLabelsExpression() { public String getAmNodeLabelsExpression() {
return amNodeLabelsExpression; return amNodeLabelsExpression;
} }
public CallerContext getCallerContext() {
return callerContext;
}
} }

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -114,7 +113,8 @@ public class SystemMetricsPublisher extends CompositeService {
appSubmissionContext.getUnmanagedAM(), appSubmissionContext.getUnmanagedAM(),
appSubmissionContext.getPriority(), appSubmissionContext.getPriority(),
app.getAppNodeLabelExpression(), app.getAppNodeLabelExpression(),
app.getAmNodeLabelExpression())); app.getAmNodeLabelExpression(),
app.getCallerContext()));
} }
} }
@ -122,10 +122,9 @@ public class SystemMetricsPublisher extends CompositeService {
public void appUpdated(RMApp app, long updatedTime) { public void appUpdated(RMApp app, long updatedTime) {
if (publishSystemMetrics) { if (publishSystemMetrics) {
dispatcher.getEventHandler() dispatcher.getEventHandler()
.handle( .handle(new ApplicationUpdatedEvent(app.getApplicationId(),
new ApplicationUpdatedEvent(app.getApplicationId(), app app.getQueue(), updatedTime,
.getQueue(), updatedTime, app app.getApplicationSubmissionContext().getPriority()));
.getApplicationSubmissionContext().getPriority()));
} }
} }
@ -284,6 +283,16 @@ public class SystemMetricsPublisher extends CompositeService {
event.getAppNodeLabelsExpression()); event.getAppNodeLabelsExpression());
entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
event.getAmNodeLabelsExpression()); event.getAmNodeLabelsExpression());
if (event.getCallerContext() != null) {
if (event.getCallerContext().getContext() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
event.getCallerContext().getContext());
}
if (event.getCallerContext().getSignature() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
event.getCallerContext().getSignature());
}
}
entity.setOtherInfo(entityInfo); entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent(); TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType( tEvent.setEventType(

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
@ -741,8 +742,8 @@ public abstract class RMStateStore extends AbstractService {
.getApplicationSubmissionContext(); .getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl; assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState = ApplicationStateData appState =
ApplicationStateData.newInstance( ApplicationStateData.newInstance(app.getSubmitTime(),
app.getSubmitTime(), app.getStartTime(), context, app.getUser()); app.getStartTime(), context, app.getUser(), app.getCallerContext());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
} }
@ -964,9 +965,9 @@ public abstract class RMStateStore extends AbstractService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public synchronized void removeApplication(RMApp app) { public synchronized void removeApplication(RMApp app) {
ApplicationStateData appState = ApplicationStateData appState =
ApplicationStateData.newInstance( ApplicationStateData.newInstance(app.getSubmitTime(),
app.getSubmitTime(), app.getStartTime(), app.getStartTime(), app.getApplicationSubmissionContext(),
app.getApplicationSubmissionContext(), app.getUser()); app.getUser(), app.getCallerContext());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
appState.attempts.put(appAttempt.getAppAttemptId(), null); appState.attempts.put(appAttempt.getAppAttemptId(), null);
} }

View File

@ -20,10 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -43,8 +45,8 @@ public abstract class ApplicationStateData {
public static ApplicationStateData newInstance(long submitTime, public static ApplicationStateData newInstance(long submitTime,
long startTime, String user, long startTime, String user,
ApplicationSubmissionContext submissionContext, ApplicationSubmissionContext submissionContext, RMAppState state,
RMAppState state, String diagnostics, long finishTime) { String diagnostics, long finishTime, CallerContext callerContext) {
ApplicationStateData appState = Records.newRecord(ApplicationStateData.class); ApplicationStateData appState = Records.newRecord(ApplicationStateData.class);
appState.setSubmitTime(submitTime); appState.setSubmitTime(submitTime);
appState.setStartTime(startTime); appState.setStartTime(startTime);
@ -53,12 +55,20 @@ public abstract class ApplicationStateData {
appState.setState(state); appState.setState(state);
appState.setDiagnostics(diagnostics); appState.setDiagnostics(diagnostics);
appState.setFinishTime(finishTime); appState.setFinishTime(finishTime);
appState.setCallerContext(callerContext);
return appState; return appState;
} }
public static ApplicationStateData newInstance(long submitTime,
long startTime, ApplicationSubmissionContext context, String user,
CallerContext callerContext) {
return newInstance(submitTime, startTime, user, context, null, "", 0,
callerContext);
}
public static ApplicationStateData newInstance(long submitTime, public static ApplicationStateData newInstance(long submitTime,
long startTime, ApplicationSubmissionContext context, String user) { long startTime, ApplicationSubmissionContext context, String user) {
return newInstance(submitTime, startTime, user, context, null, "", 0); return newInstance(submitTime, startTime, context, user, null);
} }
public int getAttemptCount() { public int getAttemptCount() {
@ -144,4 +154,8 @@ public abstract class ApplicationStateData {
public abstract long getFinishTime(); public abstract long getFinishTime();
public abstract void setFinishTime(long finishTime); public abstract void setFinishTime(long finishTime);
public abstract CallerContext getCallerContext();
public abstract void setCallerContext(CallerContext callerContext);
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAp
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
public class ApplicationStateDataPBImpl extends ApplicationStateData { public class ApplicationStateDataPBImpl extends ApplicationStateData {
@ -210,6 +213,37 @@ public class ApplicationStateDataPBImpl extends ApplicationStateData {
return false; return false;
} }
@Override
public CallerContext getCallerContext() {
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
RpcHeaderProtos.RPCCallerContextProto pbContext = p.getCallerContext();
if (pbContext != null) {
CallerContext context = new CallerContext.Builder(pbContext.getContext())
.setSignature(pbContext.getSignature().toByteArray()).build();
return context;
}
return null;
}
@Override
public void setCallerContext(CallerContext callerContext) {
if (callerContext != null) {
maybeInitBuilder();
RpcHeaderProtos.RPCCallerContextProto.Builder b = RpcHeaderProtos.RPCCallerContextProto
.newBuilder();
if (callerContext.getContext() != null) {
b.setContext(callerContext.getContext());
}
if (callerContext.getSignature() != null) {
b.setSignature(ByteString.copyFrom(callerContext.getSignature()));
}
builder.setCallerContext(b);
}
}
@Override @Override
public String toString() { public String toString() {
return TextFormat.shortDebugString(getProto()); return TextFormat.shortDebugString(getProto());

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -254,4 +255,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
String getAmNodeLabelExpression(); String getAmNodeLabelExpression();
String getAppNodeLabelExpression(); String getAppNodeLabelExpression();
CallerContext getCallerContext();
} }

View File

@ -45,6 +45,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -178,6 +179,8 @@ public class RMAppImpl implements RMApp, Recoverable {
private RMAppState recoveredFinalState; private RMAppState recoveredFinalState;
private ResourceRequest amReq; private ResourceRequest amReq;
private CallerContext callerContext;
Object transitionTodo; Object transitionTodo;
private static final StateMachineFactory<RMAppImpl, private static final StateMachineFactory<RMAppImpl,
@ -439,6 +442,8 @@ public class RMAppImpl implements RMApp, Recoverable {
this.stateMachine = stateMachineFactory.make(this); this.stateMachine = stateMachineFactory.make(this);
this.callerContext = CallerContext.getCurrent();
rmContext.getRMApplicationHistoryWriter().applicationStarted(this); rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
rmContext.getSystemMetricsPublisher().appCreated(this, startTime); rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
@ -806,6 +811,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.getDiagnostics()); .getDiagnostics());
this.storedFinishTime = appState.getFinishTime(); this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime(); this.startTime = appState.getStartTime();
this.callerContext = appState.getCallerContext();
for(int i=0; i<appState.getAttemptCount(); ++i) { for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt // create attempt
@ -1115,10 +1121,11 @@ public class RMAppImpl implements RMApp, Recoverable {
default: default:
break; break;
} }
ApplicationStateData appState = ApplicationStateData appState =
ApplicationStateData.newInstance(this.submitTime, this.startTime, ApplicationStateData.newInstance(this.submitTime, this.startTime,
this.user, this.submissionContext, this.user, this.submissionContext,
stateToBeStored, diags, this.storedFinishTime); stateToBeStored, diags, this.storedFinishTime, this.callerContext);
this.rmContext.getStateStore().updateApplicationState(appState); this.rmContext.getStateStore().updateApplicationState(appState);
} }
@ -1722,4 +1729,9 @@ public class RMAppImpl implements RMApp, Recoverable {
} }
return amNodeLabelExpression; return amNodeLabelExpression;
} }
@Override
public CallerContext getCallerContext() {
return callerContext;
}
} }

View File

@ -1981,9 +1981,10 @@ public class CapacityScheduler extends
rmApp.getApplicationSubmissionContext().setPriority(appPriority); rmApp.getApplicationSubmissionContext().setPriority(appPriority);
// Update to state store // Update to state store
ApplicationStateData appState = ApplicationStateData.newInstance( ApplicationStateData appState =
rmApp.getSubmitTime(), rmApp.getStartTime(), ApplicationStateData.newInstance(rmApp.getSubmitTime(),
rmApp.getApplicationSubmissionContext(), rmApp.getUser()); rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(),
rmApp.getUser(), rmApp.getCallerContext());
rmContext.getStateStore().updateApplicationStateSynchronously(appState, rmContext.getStateStore().updateApplicationStateSynchronously(appState,
false); false);

View File

@ -25,6 +25,7 @@ package hadoop.yarn;
import "yarn_server_common_protos.proto"; import "yarn_server_common_protos.proto";
import "yarn_protos.proto"; import "yarn_protos.proto";
import "yarn_security_token.proto"; import "yarn_security_token.proto";
import "RpcHeader.proto";
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
////// RM recovery related records ///////////////////////////////////// ////// RM recovery related records /////////////////////////////////////
@ -67,6 +68,7 @@ message ApplicationStateDataProto {
optional RMAppStateProto application_state = 5; optional RMAppStateProto application_state = 5;
optional string diagnostics = 6 [default = "N/A"]; optional string diagnostics = 6 [default = "N/A"];
optional int64 finish_time = 7; optional int64 finish_time = 7;
optional hadoop.common.RPCCallerContextProto caller_context = 8;
} }
message ApplicationAttemptStateDataProto { message ApplicationAttemptStateDataProto {

View File

@ -25,6 +25,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestRPC.TestImpl; import org.apache.hadoop.ipc.TestRPC.TestImpl;
@ -50,6 +51,8 @@ public class TestRMAuditLogger {
private static final ApplicationId APPID = mock(ApplicationId.class); private static final ApplicationId APPID = mock(ApplicationId.class);
private static final ApplicationAttemptId ATTEMPTID = mock(ApplicationAttemptId.class); private static final ApplicationAttemptId ATTEMPTID = mock(ApplicationAttemptId.class);
private static final ContainerId CONTAINERID = mock(ContainerId.class); private static final ContainerId CONTAINERID = mock(ContainerId.class);
private static final String CALLER_CONTEXT = "context";
private static final byte[] CALLER_SIGNATURE = "signature".getBytes();
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -87,14 +90,19 @@ public class TestRMAuditLogger {
assertEquals(expLog.toString(), actLog.toString()); assertEquals(expLog.toString(), actLog.toString());
} }
private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) {
testSuccessLogFormatHelper(checkIP, appId, attemptId, containerId, null);
}
/** /**
* Test the AuditLog format for successful events. * Test the AuditLog format for successful events.
*/ */
private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId, private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) { ApplicationAttemptId attemptId, ContainerId containerId,
CallerContext callerContext) {
String sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET, String sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET,
appId, attemptId, containerId); appId, attemptId, containerId, callerContext);
StringBuilder expLog = new StringBuilder(); StringBuilder expLog = new StringBuilder();
expLog.append("USER=test\t"); expLog.append("USER=test\t");
if (checkIP) { if (checkIP) {
@ -112,6 +120,14 @@ public class TestRMAuditLogger {
if (containerId != null) { if (containerId != null) {
expLog.append("\tCONTAINERID=container_1"); expLog.append("\tCONTAINERID=container_1");
} }
if (callerContext != null) {
if (callerContext.getContext() != null) {
expLog.append("\tCALLERCONTEXT=context");
}
if (callerContext.getSignature() != null) {
expLog.append("\tCALLERSIGNATURE=signature");
}
}
assertEquals(expLog.toString(), sLog); assertEquals(expLog.toString(), sLog);
} }
@ -144,18 +160,33 @@ public class TestRMAuditLogger {
testSuccessLogFormatHelper(checkIP, APPID, null, CONTAINERID); testSuccessLogFormatHelper(checkIP, APPID, null, CONTAINERID);
testSuccessLogFormatHelper(checkIP, null, ATTEMPTID, CONTAINERID); testSuccessLogFormatHelper(checkIP, null, ATTEMPTID, CONTAINERID);
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID); testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID);
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID, null);
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(null).setSignature(null).build());
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(CALLER_CONTEXT).setSignature(null).build());
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(null).setSignature(CALLER_SIGNATURE).build());
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(CALLER_CONTEXT).setSignature(CALLER_SIGNATURE)
.build());
testSuccessLogNulls(checkIP); testSuccessLogNulls(checkIP);
} }
private void testFailureLogFormatHelper(boolean checkIP, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) {
testFailureLogFormatHelper(checkIP, appId, attemptId, containerId, null);
}
/** /**
* Test the AuditLog format for failure events. * Test the AuditLog format for failure events.
*/ */
private void testFailureLogFormatHelper(boolean checkIP, ApplicationId appId, private void testFailureLogFormatHelper(boolean checkIP, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) { ApplicationAttemptId attemptId, ContainerId containerId,
CallerContext callerContext) {
String fLog = String fLog =
RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC, RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC,
appId, attemptId, containerId); appId, attemptId, containerId, callerContext);
StringBuilder expLog = new StringBuilder(); StringBuilder expLog = new StringBuilder();
expLog.append("USER=test\t"); expLog.append("USER=test\t");
if (checkIP) { if (checkIP) {
@ -174,6 +205,14 @@ public class TestRMAuditLogger {
if (containerId != null) { if (containerId != null) {
expLog.append("\tCONTAINERID=container_1"); expLog.append("\tCONTAINERID=container_1");
} }
if (callerContext != null) {
if (callerContext.getContext() != null) {
expLog.append("\tCALLERCONTEXT=context");
}
if (callerContext.getSignature() != null) {
expLog.append("\tCALLERSIGNATURE=signature");
}
}
assertEquals(expLog.toString(), fLog); assertEquals(expLog.toString(), fLog);
} }
@ -190,6 +229,16 @@ public class TestRMAuditLogger {
testFailureLogFormatHelper(checkIP, APPID, null, CONTAINERID); testFailureLogFormatHelper(checkIP, APPID, null, CONTAINERID);
testFailureLogFormatHelper(checkIP, null, ATTEMPTID, CONTAINERID); testFailureLogFormatHelper(checkIP, null, ATTEMPTID, CONTAINERID);
testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID); testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID);
testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(null).setSignature(null).build());
testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(CALLER_CONTEXT).setSignature(null).build());
testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(null).setSignature(CALLER_SIGNATURE).build());
testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(CALLER_CONTEXT).setSignature(CALLER_SIGNATURE)
.build());
} }
/** /**

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -212,6 +213,10 @@ public abstract class MockAsm extends MockApps {
public String getAppNodeLabelExpression() { public String getAppNodeLabelExpression() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
public CallerContext getCallerContext() {
throw new UnsupportedOperationException("Not supported yet.");
}
} }
public static RMApp newApplication(int i) { public static RMApp newApplication(int i) {

View File

@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -205,6 +206,8 @@ public class TestSystemMetricsPublisher {
Long.parseLong(entity.getOtherInfo() Long.parseLong(entity.getOtherInfo()
.get(ApplicationMetricsConstants.APP_CPU_METRICS).toString())); .get(ApplicationMetricsConstants.APP_CPU_METRICS).toString()));
} }
Assert.assertEquals("context", entity.getOtherInfo()
.get(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT));
boolean hasCreatedEvent = false; boolean hasCreatedEvent = false;
boolean hasUpdatedEvent = false; boolean hasUpdatedEvent = false;
boolean hasFinishedEvent = false; boolean hasFinishedEvent = false;
@ -426,6 +429,8 @@ public class TestSystemMetricsPublisher {
when(amReq.getNodeLabelExpression()).thenReturn("high-mem"); when(amReq.getNodeLabelExpression()).thenReturn("high-mem");
when(app.getAMResourceRequest()).thenReturn(amReq); when(app.getAMResourceRequest()).thenReturn(amReq);
when(app.getAmNodeLabelExpression()).thenCallRealMethod(); when(app.getAmNodeLabelExpression()).thenCallRealMethod();
when(app.getCallerContext())
.thenReturn(new CallerContext.Builder("context").build());
return app; return app;
} }

View File

@ -18,18 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery; package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -39,16 +35,11 @@ import java.util.Map;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -57,33 +48,43 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
public class RMStateStoreTestBase { public class RMStateStoreTestBase {
@ -165,6 +166,8 @@ public class RMStateStoreTestBase {
when(mockApp.getStartTime()).thenReturn(startTime); when(mockApp.getStartTime()).thenReturn(startTime);
when(mockApp.getApplicationSubmissionContext()).thenReturn(context); when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
when(mockApp.getUser()).thenReturn("test"); when(mockApp.getUser()).thenReturn("test");
when(mockApp.getCallerContext())
.thenReturn(new CallerContext.Builder("context").build());
store.storeNewApplication(mockApp); store.storeNewApplication(mockApp);
return mockApp; return mockApp;
} }
@ -322,6 +325,7 @@ public class RMStateStoreTestBase {
clientTokenKey1.getEncoded(), clientTokenKey1.getEncoded(),
attemptState.getAppAttemptTokens() attemptState.getAppAttemptTokens()
.getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
assertEquals("context", appState.getCallerContext().getContext());
attemptState = appState.getAttempt(attemptId2); attemptState = appState.getAttempt(attemptId2);
// attempt2 is loaded correctly // attempt2 is loaded correctly
@ -340,7 +344,7 @@ public class RMStateStoreTestBase {
ApplicationStateData.newInstance(appState.getSubmitTime(), ApplicationStateData.newInstance(appState.getSubmitTime(),
appState.getStartTime(), appState.getUser(), appState.getStartTime(), appState.getUser(),
appState.getApplicationSubmissionContext(), RMAppState.FINISHED, appState.getApplicationSubmissionContext(), RMAppState.FINISHED,
"appDiagnostics", 1234); "appDiagnostics", 1234, appState.getCallerContext());
appState2.attempts.putAll(appState.attempts); appState2.attempts.putAll(appState.attempts);
store.updateApplicationState(appState2); store.updateApplicationState(appState2);
@ -365,7 +369,7 @@ public class RMStateStoreTestBase {
ApplicationStateData dummyApp = ApplicationStateData dummyApp =
ApplicationStateData.newInstance(appState.getSubmitTime(), ApplicationStateData.newInstance(appState.getSubmitTime(),
appState.getStartTime(), appState.getUser(), dummyContext, appState.getStartTime(), appState.getUser(), dummyContext,
RMAppState.FINISHED, "appDiagnostics", 1234); RMAppState.FINISHED, "appDiagnostics", 1234, null);
store.updateApplicationState(dummyApp); store.updateApplicationState(dummyApp);
ApplicationAttemptId dummyAttemptId = ApplicationAttemptId dummyAttemptId =

View File

@ -392,7 +392,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
store.storeApplicationStateInternal( store.storeApplicationStateInternal(
ApplicationId.newInstance(100L, 1), ApplicationId.newInstance(100L, 1),
ApplicationStateData.newInstance(111, 111, "user", null, ApplicationStateData.newInstance(111, 111, "user", null,
RMAppState.ACCEPTED, "diagnostics", 333)); RMAppState.ACCEPTED, "diagnostics", 333, null));
} catch (Exception e) { } catch (Exception e) {
assertionFailedInThread.set(true); assertionFailedInThread.set(true);
e.printStackTrace(); e.printStackTrace();

View File

@ -488,6 +488,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
ApplicationSubmissionContext context = ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl(); new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appIdRemoved); context.setApplicationId(appIdRemoved);
ApplicationStateData appStateRemoved = ApplicationStateData appStateRemoved =
ApplicationStateData.newInstance( ApplicationStateData.newInstance(
submitTime, startTime, context, "user1"); submitTime, startTime, context, "user1");

View File

@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -296,4 +297,8 @@ public class MockRMApp implements RMApp {
public String getAppNodeLabelExpression() { public String getAppNodeLabelExpression() {
return null; return null;
} }
public CallerContext getCallerContext() {
throw new UnsupportedOperationException("Not supported yet.");
}
} }

View File

@ -397,7 +397,7 @@ public class TestRMAppTransitions {
// NEW => SUBMITTED event RMAppEventType.RECOVER // NEW => SUBMITTED event RMAppEventType.RECOVER
RMState state = new RMState(); RMState state = new RMState();
ApplicationStateData appState = ApplicationStateData appState =
ApplicationStateData.newInstance(123, 123, null, "user"); ApplicationStateData.newInstance(123, 123, null, "user", null);
state.getApplicationState().put(application.getApplicationId(), appState); state.getApplicationState().put(application.getApplicationId(), appState);
RMAppEvent event = RMAppEvent event =
new RMAppRecoverEvent(application.getApplicationId(), state); new RMAppRecoverEvent(application.getApplicationId(), state);
@ -1011,7 +1011,7 @@ public class TestRMAppTransitions {
ApplicationStateData appState = ApplicationStateData appState =
ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(),
app.getUser(), app.getApplicationSubmissionContext(), rmAppState, app.getUser(), app.getApplicationSubmissionContext(), rmAppState,
null, app.getFinishTime()); null, app.getFinishTime(), null);
applicationState.put(app.getApplicationId(), appState); applicationState.put(app.getApplicationId(), appState);
} }

View File

@ -108,7 +108,11 @@ public class TestRMContainerImpl {
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getRMApps()).thenReturn(rmApps); when(rmContext.getRMApps()).thenReturn(rmApps);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, "user", rmContext); nodeId, "user", rmContext);
@ -202,8 +206,14 @@ public class TestRMContainerImpl {
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
when(rmContext.getRMApps()).thenReturn(appMap); when(rmContext.getRMApps()).thenReturn(appMap);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, "user", rmContext); nodeId, "user", rmContext);
@ -388,6 +398,9 @@ public class TestRMContainerImpl {
public void testStoreAllContainerMetrics() throws Exception { public void testStoreAllContainerMetrics() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(conf);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);

View File

@ -177,6 +177,7 @@ public class TestFifoScheduler {
mock(SystemMetricsPublisher.class)); mock(SystemMetricsPublisher.class));
Configuration conf = new Configuration(); Configuration conf = new Configuration();
((RMContextImpl) rmContext).setScheduler(scheduler);
scheduler.setRMContext(rmContext); scheduler.setRMContext(rmContext);
scheduler.init(conf); scheduler.init(conf);
scheduler.start(); scheduler.start();
@ -309,6 +310,7 @@ public class TestFifoScheduler {
rmContext.setNodeLabelManager(nlm); rmContext.setNodeLabelManager(nlm);
scheduler.setRMContext(rmContext); scheduler.setRMContext(rmContext);
((RMContextImpl) rmContext).setScheduler(scheduler);
scheduler.init(conf); scheduler.init(conf);
scheduler.start(); scheduler.start();
scheduler.reinitialize(new Configuration(), rmContext); scheduler.reinitialize(new Configuration(), rmContext);