YARN-4349. Support CallerContext in YARN. Contributed by Wangda Tan

This commit is contained in:
Jian He 2015-11-23 17:18:59 -08:00
parent 4bff073b4d
commit 8676a118a1
27 changed files with 318 additions and 61 deletions

View File

@ -23,6 +23,7 @@ import java.io.PrintStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallerContext;
/**
* A utility to help run {@link Tool}s.
@ -58,6 +59,11 @@ public class ToolRunner {
*/
public static int run(Configuration conf, Tool tool, String[] args)
throws Exception{
if (CallerContext.getCurrent() == null) {
CallerContext ctx = new CallerContext.Builder("CLI").build();
CallerContext.setCurrent(ctx);
}
if(conf == null) {
conf = new Configuration();
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -85,6 +86,9 @@ class YarnChild {
long jvmIdLong = Long.parseLong(args[3]);
JVMId jvmId = new JVMId(firstTaskid.getJobID(),
firstTaskid.getTaskType() == TaskType.MAP, jvmIdLong);
CallerContext.setCurrent(
new CallerContext.Builder("mr_" + firstTaskid.toString()).build());
// initialize metrics
DefaultMetricsSystem.initialize(

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
@ -1544,6 +1545,10 @@ public class MRAppMaster extends CompositeService {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
if (applicationAttemptId != null) {
CallerContext.setCurrent(new CallerContext.Builder(
"mr_appmaster_" + applicationAttemptId.toString()).build());
}
long appSubmitTime = Long.parseLong(appSubmitTimeStr);

View File

@ -253,6 +253,8 @@ Release 2.8.0 - UNRELEASED
YARN-4184. Remove update reservation state api from state store as its not used by
ReservationSystem (Sean Po via asuresh)
YARN-4349. Support CallerContext in YARN. (wtan via jianhe)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -75,6 +75,12 @@ public class ApplicationMetricsConstants {
public static final String LATEST_APP_ATTEMPT_EVENT_INFO =
"YARN_APPLICATION_LATEST_APP_ATTEMPT";
public static final String YARN_APP_CALLER_CONTEXT =
"YARN_APPLICATION_CALLER_CONTEXT";
public static final String YARN_APP_CALLER_SIGNATURE =
"YARN_APPLICATION_CALLER_SIGNATURE";
public static final String APP_TAGS_INFO = "YARN_APPLICATION_TAGS";

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -552,6 +553,7 @@ public class ClientRMService extends AbstractService implements
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
CallerContext callerContext = CallerContext.getCurrent();
// ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be
@ -566,7 +568,7 @@ public class ClientRMService extends AbstractService implements
LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
"Exception in submitting application", applicationId, callerContext);
throw RPCUtil.getRemoteException(ie);
}
@ -603,13 +605,13 @@ public class ClientRMService extends AbstractService implements
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
"ClientRMService", applicationId, callerContext);
} catch (YarnException e) {
LOG.info("Exception in submitting application with id " +
applicationId.getId(), e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
"Exception in submitting application", applicationId, callerContext);
throw e;
}
@ -694,6 +696,7 @@ public class ClientRMService extends AbstractService implements
KillApplicationRequest request) throws YarnException {
ApplicationId applicationId = request.getApplicationId();
CallerContext callerContext = CallerContext.getCurrent();
UserGroupInformation callerUGI;
try {
@ -702,7 +705,7 @@ public class ClientRMService extends AbstractService implements
LOG.info("Error getting UGI ", ie);
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "ClientRMService" , "Error getting UGI",
applicationId);
applicationId, callerContext);
throw RPCUtil.getRemoteException(ie);
}
@ -710,7 +713,7 @@ public class ClientRMService extends AbstractService implements
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService",
"Trying to kill an absent application", applicationId);
"Trying to kill an absent application", applicationId, callerContext);
throw new ApplicationNotFoundException("Trying to kill an absent"
+ " application " + applicationId);
}
@ -721,7 +724,7 @@ public class ClientRMService extends AbstractService implements
AuditConstants.KILL_APP_REQUEST,
"User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId);
AuditConstants.UNAUTHORIZED_USER, applicationId, callerContext);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
@ -729,7 +732,8 @@ public class ClientRMService extends AbstractService implements
if (application.isAppFinalStateStored()) {
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId,
callerContext);
return KillApplicationResponse.newInstance(true);
}

View File

@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;

View File

@ -17,10 +17,12 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -35,7 +37,8 @@ public class RMAuditLogger {
private static final Log LOG = LogFactory.getLog(RMAuditLogger.class);
static enum Keys {USER, OPERATION, TARGET, RESULT, IP, PERMISSIONS,
DESCRIPTION, APPID, APPATTEMPTID, CONTAINERID}
DESCRIPTION, APPID, APPATTEMPTID, CONTAINERID,
CALLERCONTEXT, CALLERSIGNATURE}
public static class AuditConstants {
static final String SUCCESS = "SUCCESS";
@ -69,12 +72,20 @@ public class RMAuditLogger {
public static final String UPDATE_RESERVATION_REQUEST = "Update Reservation Request";
public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request";
}
static String createSuccessLog(String user, String operation, String target,
ApplicationId appId, ApplicationAttemptId attemptId,
ContainerId containerId) {
return createSuccessLog(user, operation, target, appId, attemptId,
containerId, null);
}
/**
* A helper api for creating an audit log for a successful event.
*/
static String createSuccessLog(String user, String operation, String target,
ApplicationId appId, ApplicationAttemptId attemptId, ContainerId containerId) {
ApplicationId appId, ApplicationAttemptId attemptId,
ContainerId containerId, CallerContext callerContext) {
StringBuilder b = new StringBuilder();
start(Keys.USER, user, b);
addRemoteIP(b);
@ -90,8 +101,32 @@ public class RMAuditLogger {
if (containerId != null) {
add(Keys.CONTAINERID, containerId.toString(), b);
}
appendCallerContext(b, callerContext);
return b.toString();
}
private static void appendCallerContext(StringBuilder sb, CallerContext callerContext) {
String context = null;
byte[] signature = null;
if (callerContext != null) {
context = callerContext.getContext();
signature = callerContext.getSignature();
}
if (context != null) {
add(Keys.CALLERCONTEXT, context, sb);
}
if (signature != null) {
try {
String sigStr = new String(signature, "UTF-8");
add(Keys.CALLERSIGNATURE, sigStr, sb);
} catch (UnsupportedEncodingException e) {
// ignore this signature
}
}
}
/**
* Create a readable and parseable audit log string for a successful event.
@ -134,6 +169,14 @@ public class RMAuditLogger {
null));
}
}
public static void logSuccess(String user, String operation, String target,
ApplicationId appId, CallerContext callerContext) {
if (LOG.isInfoEnabled()) {
LOG.info(createSuccessLog(user, operation, target, appId, null, null,
callerContext));
}
}
/**
@ -171,13 +214,11 @@ public class RMAuditLogger {
LOG.info(createSuccessLog(user, operation, target, null, null, null));
}
}
/**
* A helper api for creating an audit log for a failure event.
*/
static String createFailureLog(String user, String operation, String perm,
String target, String description, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) {
ApplicationAttemptId attemptId, ContainerId containerId,
CallerContext callerContext) {
StringBuilder b = new StringBuilder();
start(Keys.USER, user, b);
addRemoteIP(b);
@ -195,9 +236,20 @@ public class RMAuditLogger {
if (containerId != null) {
add(Keys.CONTAINERID, containerId.toString(), b);
}
appendCallerContext(b, callerContext);
return b.toString();
}
/**
* A helper api for creating an audit log for a failure event.
*/
static String createFailureLog(String user, String operation, String perm,
String target, String description, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) {
return createFailureLog(user, operation, perm, target, description, appId,
attemptId, containerId, null);
}
/**
* Create a readable and parseable audit log string for a failed event.
*
@ -246,7 +298,15 @@ public class RMAuditLogger {
appId, attemptId, null));
}
}
public static void logFailure(String user, String operation, String perm,
String target, String description, ApplicationId appId,
CallerContext callerContext) {
if (LOG.isWarnEnabled()) {
LOG.warn(createFailureLog(user, operation, perm, target, description,
appId, null, null, callerContext));
}
}
/**
* Create a readable and parseable audit log string for a failed event.

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.Set;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
@ -37,6 +38,8 @@ public class ApplicationCreatedEvent extends
private Priority applicationPriority;
private String appNodeLabelsExpression;
private String amNodeLabelsExpression;
private final CallerContext callerContext;
public ApplicationCreatedEvent(ApplicationId appId,
String name,
@ -49,7 +52,8 @@ public class ApplicationCreatedEvent extends
boolean unmanagedApplication,
Priority applicationPriority,
String appNodeLabelsExpression,
String amNodeLabelsExpression) {
String amNodeLabelsExpression,
CallerContext callerContext) {
super(SystemMetricsEventType.APP_CREATED, createdTime);
this.appId = appId;
this.name = name;
@ -62,6 +66,7 @@ public class ApplicationCreatedEvent extends
this.applicationPriority = applicationPriority;
this.appNodeLabelsExpression = appNodeLabelsExpression;
this.amNodeLabelsExpression = amNodeLabelsExpression;
this.callerContext = callerContext;
}
@Override
@ -112,4 +117,8 @@ public class ApplicationCreatedEvent extends
public String getAmNodeLabelsExpression() {
return amNodeLabelsExpression;
}
public CallerContext getCallerContext() {
return callerContext;
}
}

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -114,7 +113,8 @@ public class SystemMetricsPublisher extends CompositeService {
appSubmissionContext.getUnmanagedAM(),
appSubmissionContext.getPriority(),
app.getAppNodeLabelExpression(),
app.getAmNodeLabelExpression()));
app.getAmNodeLabelExpression(),
app.getCallerContext()));
}
}
@ -122,10 +122,9 @@ public class SystemMetricsPublisher extends CompositeService {
public void appUpdated(RMApp app, long updatedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler()
.handle(
new ApplicationUpdatedEvent(app.getApplicationId(), app
.getQueue(), updatedTime, app
.getApplicationSubmissionContext().getPriority()));
.handle(new ApplicationUpdatedEvent(app.getApplicationId(),
app.getQueue(), updatedTime,
app.getApplicationSubmissionContext().getPriority()));
}
}
@ -284,6 +283,16 @@ public class SystemMetricsPublisher extends CompositeService {
event.getAppNodeLabelsExpression());
entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
event.getAmNodeLabelsExpression());
if (event.getCallerContext() != null) {
if (event.getCallerContext().getContext() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
event.getCallerContext().getContext());
}
if (event.getCallerContext().getSignature() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
event.getCallerContext().getSignature());
}
}
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
@ -741,8 +742,8 @@ public abstract class RMStateStore extends AbstractService {
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
ApplicationStateData.newInstance(app.getSubmitTime(),
app.getStartTime(), context, app.getUser(), app.getCallerContext());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
}
@ -964,9 +965,9 @@ public abstract class RMStateStore extends AbstractService {
@SuppressWarnings("unchecked")
public synchronized void removeApplication(RMApp app) {
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(),
app.getApplicationSubmissionContext(), app.getUser());
ApplicationStateData.newInstance(app.getSubmitTime(),
app.getStartTime(), app.getApplicationSubmissionContext(),
app.getUser(), app.getCallerContext());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
appState.attempts.put(appAttempt.getAppAttemptId(), null);
}

View File

@ -20,10 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -43,8 +45,8 @@ public abstract class ApplicationStateData {
public static ApplicationStateData newInstance(long submitTime,
long startTime, String user,
ApplicationSubmissionContext submissionContext,
RMAppState state, String diagnostics, long finishTime) {
ApplicationSubmissionContext submissionContext, RMAppState state,
String diagnostics, long finishTime, CallerContext callerContext) {
ApplicationStateData appState = Records.newRecord(ApplicationStateData.class);
appState.setSubmitTime(submitTime);
appState.setStartTime(startTime);
@ -53,12 +55,20 @@ public abstract class ApplicationStateData {
appState.setState(state);
appState.setDiagnostics(diagnostics);
appState.setFinishTime(finishTime);
appState.setCallerContext(callerContext);
return appState;
}
public static ApplicationStateData newInstance(long submitTime,
long startTime, ApplicationSubmissionContext context, String user,
CallerContext callerContext) {
return newInstance(submitTime, startTime, user, context, null, "", 0,
callerContext);
}
public static ApplicationStateData newInstance(long submitTime,
long startTime, ApplicationSubmissionContext context, String user) {
return newInstance(submitTime, startTime, user, context, null, "", 0);
return newInstance(submitTime, startTime, context, user, null);
}
public int getAttemptCount() {
@ -144,4 +154,8 @@ public abstract class ApplicationStateData {
public abstract long getFinishTime();
public abstract void setFinishTime(long finishTime);
public abstract CallerContext getCallerContext();
public abstract void setCallerContext(CallerContext callerContext);
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAp
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
public class ApplicationStateDataPBImpl extends ApplicationStateData {
@ -209,6 +212,37 @@ public class ApplicationStateDataPBImpl extends ApplicationStateData {
}
return false;
}
@Override
public CallerContext getCallerContext() {
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
RpcHeaderProtos.RPCCallerContextProto pbContext = p.getCallerContext();
if (pbContext != null) {
CallerContext context = new CallerContext.Builder(pbContext.getContext())
.setSignature(pbContext.getSignature().toByteArray()).build();
return context;
}
return null;
}
@Override
public void setCallerContext(CallerContext callerContext) {
if (callerContext != null) {
maybeInitBuilder();
RpcHeaderProtos.RPCCallerContextProto.Builder b = RpcHeaderProtos.RPCCallerContextProto
.newBuilder();
if (callerContext.getContext() != null) {
b.setContext(callerContext.getContext());
}
if (callerContext.getSignature() != null) {
b.setSignature(ByteString.copyFrom(callerContext.getSignature()));
}
builder.setCallerContext(b);
}
}
@Override
public String toString() {

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -254,4 +255,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
String getAmNodeLabelExpression();
String getAppNodeLabelExpression();
CallerContext getCallerContext();
}

View File

@ -45,6 +45,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -177,6 +178,8 @@ public class RMAppImpl implements RMApp, Recoverable {
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;
private ResourceRequest amReq;
private CallerContext callerContext;
Object transitionTodo;
@ -439,6 +442,8 @@ public class RMAppImpl implements RMApp, Recoverable {
this.stateMachine = stateMachineFactory.make(this);
this.callerContext = CallerContext.getCurrent();
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
@ -806,6 +811,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
this.callerContext = appState.getCallerContext();
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
@ -1115,10 +1121,11 @@ public class RMAppImpl implements RMApp, Recoverable {
default:
break;
}
ApplicationStateData appState =
ApplicationStateData.newInstance(this.submitTime, this.startTime,
this.user, this.submissionContext,
stateToBeStored, diags, this.storedFinishTime);
stateToBeStored, diags, this.storedFinishTime, this.callerContext);
this.rmContext.getStateStore().updateApplicationState(appState);
}
@ -1722,4 +1729,9 @@ public class RMAppImpl implements RMApp, Recoverable {
}
return amNodeLabelExpression;
}
@Override
public CallerContext getCallerContext() {
return callerContext;
}
}

View File

@ -1981,9 +1981,10 @@ public class CapacityScheduler extends
rmApp.getApplicationSubmissionContext().setPriority(appPriority);
// Update to state store
ApplicationStateData appState = ApplicationStateData.newInstance(
rmApp.getSubmitTime(), rmApp.getStartTime(),
rmApp.getApplicationSubmissionContext(), rmApp.getUser());
ApplicationStateData appState =
ApplicationStateData.newInstance(rmApp.getSubmitTime(),
rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(),
rmApp.getUser(), rmApp.getCallerContext());
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
false);

View File

@ -25,6 +25,7 @@ package hadoop.yarn;
import "yarn_server_common_protos.proto";
import "yarn_protos.proto";
import "yarn_security_token.proto";
import "RpcHeader.proto";
////////////////////////////////////////////////////////////////////////
////// RM recovery related records /////////////////////////////////////
@ -67,6 +68,7 @@ message ApplicationStateDataProto {
optional RMAppStateProto application_state = 5;
optional string diagnostics = 6 [default = "N/A"];
optional int64 finish_time = 7;
optional hadoop.common.RPCCallerContextProto caller_context = 8;
}
message ApplicationAttemptStateDataProto {

View File

@ -25,6 +25,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestRPC.TestImpl;
@ -50,6 +51,8 @@ public class TestRMAuditLogger {
private static final ApplicationId APPID = mock(ApplicationId.class);
private static final ApplicationAttemptId ATTEMPTID = mock(ApplicationAttemptId.class);
private static final ContainerId CONTAINERID = mock(ContainerId.class);
private static final String CALLER_CONTEXT = "context";
private static final byte[] CALLER_SIGNATURE = "signature".getBytes();
@Before
public void setUp() throws Exception {
@ -86,15 +89,20 @@ public class TestRMAuditLogger {
expLog.append("\tTARGET=tgt");
assertEquals(expLog.toString(), actLog.toString());
}
private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) {
testSuccessLogFormatHelper(checkIP, appId, attemptId, containerId, null);
}
/**
* Test the AuditLog format for successful events.
*/
private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) {
ApplicationAttemptId attemptId, ContainerId containerId,
CallerContext callerContext) {
String sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET,
appId, attemptId, containerId);
appId, attemptId, containerId, callerContext);
StringBuilder expLog = new StringBuilder();
expLog.append("USER=test\t");
if (checkIP) {
@ -112,6 +120,14 @@ public class TestRMAuditLogger {
if (containerId != null) {
expLog.append("\tCONTAINERID=container_1");
}
if (callerContext != null) {
if (callerContext.getContext() != null) {
expLog.append("\tCALLERCONTEXT=context");
}
if (callerContext.getSignature() != null) {
expLog.append("\tCALLERSIGNATURE=signature");
}
}
assertEquals(expLog.toString(), sLog);
}
@ -144,18 +160,33 @@ public class TestRMAuditLogger {
testSuccessLogFormatHelper(checkIP, APPID, null, CONTAINERID);
testSuccessLogFormatHelper(checkIP, null, ATTEMPTID, CONTAINERID);
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID);
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID, null);
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(null).setSignature(null).build());
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(CALLER_CONTEXT).setSignature(null).build());
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(null).setSignature(CALLER_SIGNATURE).build());
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(CALLER_CONTEXT).setSignature(CALLER_SIGNATURE)
.build());
testSuccessLogNulls(checkIP);
}
private void testFailureLogFormatHelper(boolean checkIP, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) {
testFailureLogFormatHelper(checkIP, appId, attemptId, containerId, null);
}
/**
* Test the AuditLog format for failure events.
*/
private void testFailureLogFormatHelper(boolean checkIP, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId) {
ApplicationAttemptId attemptId, ContainerId containerId,
CallerContext callerContext) {
String fLog =
RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC,
appId, attemptId, containerId);
appId, attemptId, containerId, callerContext);
StringBuilder expLog = new StringBuilder();
expLog.append("USER=test\t");
if (checkIP) {
@ -174,6 +205,14 @@ public class TestRMAuditLogger {
if (containerId != null) {
expLog.append("\tCONTAINERID=container_1");
}
if (callerContext != null) {
if (callerContext.getContext() != null) {
expLog.append("\tCALLERCONTEXT=context");
}
if (callerContext.getSignature() != null) {
expLog.append("\tCALLERSIGNATURE=signature");
}
}
assertEquals(expLog.toString(), fLog);
}
@ -190,6 +229,16 @@ public class TestRMAuditLogger {
testFailureLogFormatHelper(checkIP, APPID, null, CONTAINERID);
testFailureLogFormatHelper(checkIP, null, ATTEMPTID, CONTAINERID);
testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID);
testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(null).setSignature(null).build());
testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(CALLER_CONTEXT).setSignature(null).build());
testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(null).setSignature(CALLER_SIGNATURE).build());
testFailureLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(CALLER_CONTEXT).setSignature(CALLER_SIGNATURE)
.build());
}
/**

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -212,6 +213,10 @@ public abstract class MockAsm extends MockApps {
public String getAppNodeLabelExpression() {
throw new UnsupportedOperationException("Not supported yet.");
}
public CallerContext getCallerContext() {
throw new UnsupportedOperationException("Not supported yet.");
}
}
public static RMApp newApplication(int i) {

View File

@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -205,6 +206,8 @@ public class TestSystemMetricsPublisher {
Long.parseLong(entity.getOtherInfo()
.get(ApplicationMetricsConstants.APP_CPU_METRICS).toString()));
}
Assert.assertEquals("context", entity.getOtherInfo()
.get(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT));
boolean hasCreatedEvent = false;
boolean hasUpdatedEvent = false;
boolean hasFinishedEvent = false;
@ -426,6 +429,8 @@ public class TestSystemMetricsPublisher {
when(amReq.getNodeLabelExpression()).thenReturn("high-mem");
when(app.getAMResourceRequest()).thenReturn(amReq);
when(app.getAmNodeLabelExpression()).thenCallRealMethod();
when(app.getCallerContext())
.thenReturn(new CallerContext.Builder("context").build());
return app;
}

View File

@ -18,18 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
@ -39,16 +35,11 @@ import java.util.Map;
import javax.crypto.SecretKey;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -57,33 +48,43 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
public class RMStateStoreTestBase {
@ -165,6 +166,8 @@ public class RMStateStoreTestBase {
when(mockApp.getStartTime()).thenReturn(startTime);
when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
when(mockApp.getUser()).thenReturn("test");
when(mockApp.getCallerContext())
.thenReturn(new CallerContext.Builder("context").build());
store.storeNewApplication(mockApp);
return mockApp;
}
@ -322,6 +325,7 @@ public class RMStateStoreTestBase {
clientTokenKey1.getEncoded(),
attemptState.getAppAttemptTokens()
.getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
assertEquals("context", appState.getCallerContext().getContext());
attemptState = appState.getAttempt(attemptId2);
// attempt2 is loaded correctly
@ -340,7 +344,7 @@ public class RMStateStoreTestBase {
ApplicationStateData.newInstance(appState.getSubmitTime(),
appState.getStartTime(), appState.getUser(),
appState.getApplicationSubmissionContext(), RMAppState.FINISHED,
"appDiagnostics", 1234);
"appDiagnostics", 1234, appState.getCallerContext());
appState2.attempts.putAll(appState.attempts);
store.updateApplicationState(appState2);
@ -365,7 +369,7 @@ public class RMStateStoreTestBase {
ApplicationStateData dummyApp =
ApplicationStateData.newInstance(appState.getSubmitTime(),
appState.getStartTime(), appState.getUser(), dummyContext,
RMAppState.FINISHED, "appDiagnostics", 1234);
RMAppState.FINISHED, "appDiagnostics", 1234, null);
store.updateApplicationState(dummyApp);
ApplicationAttemptId dummyAttemptId =

View File

@ -392,7 +392,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
store.storeApplicationStateInternal(
ApplicationId.newInstance(100L, 1),
ApplicationStateData.newInstance(111, 111, "user", null,
RMAppState.ACCEPTED, "diagnostics", 333));
RMAppState.ACCEPTED, "diagnostics", 333, null));
} catch (Exception e) {
assertionFailedInThread.set(true);
e.printStackTrace();

View File

@ -488,6 +488,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appIdRemoved);
ApplicationStateData appStateRemoved =
ApplicationStateData.newInstance(
submitTime, startTime, context, "user1");

View File

@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -296,4 +297,8 @@ public class MockRMApp implements RMApp {
public String getAppNodeLabelExpression() {
return null;
}
public CallerContext getCallerContext() {
throw new UnsupportedOperationException("Not supported yet.");
}
}

View File

@ -397,7 +397,7 @@ public class TestRMAppTransitions {
// NEW => SUBMITTED event RMAppEventType.RECOVER
RMState state = new RMState();
ApplicationStateData appState =
ApplicationStateData.newInstance(123, 123, null, "user");
ApplicationStateData.newInstance(123, 123, null, "user", null);
state.getApplicationState().put(application.getApplicationId(), appState);
RMAppEvent event =
new RMAppRecoverEvent(application.getApplicationId(), state);
@ -1011,7 +1011,7 @@ public class TestRMAppTransitions {
ApplicationStateData appState =
ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(),
app.getUser(), app.getApplicationSubmissionContext(), rmAppState,
null, app.getFinishTime());
null, app.getFinishTime(), null);
applicationState.put(app.getApplicationId(), appState);
}

View File

@ -108,7 +108,11 @@ public class TestRMContainerImpl {
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getRMApps()).thenReturn(rmApps);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, "user", rmContext);
@ -202,8 +206,14 @@ public class TestRMContainerImpl {
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
when(rmContext.getRMApps()).thenReturn(appMap);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, "user", rmContext);
@ -388,6 +398,9 @@ public class TestRMContainerImpl {
public void testStoreAllContainerMetrics() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
MockRM rm1 = new MockRM(conf);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);

View File

@ -177,6 +177,7 @@ public class TestFifoScheduler {
mock(SystemMetricsPublisher.class));
Configuration conf = new Configuration();
((RMContextImpl) rmContext).setScheduler(scheduler);
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
@ -309,6 +310,7 @@ public class TestFifoScheduler {
rmContext.setNodeLabelManager(nlm);
scheduler.setRMContext(rmContext);
((RMContextImpl) rmContext).setScheduler(scheduler);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(new Configuration(), rmContext);