YARN-2581. Passed LogAggregationContext to NM via ContainerTokenIdentifier. Contributed by Xuan Gong.

(cherry picked from commit c86674a3a4)
This commit is contained in:
Zhijie Shen 2014-09-24 17:50:26 -07:00
parent 835ade7064
commit 3a2e400377
13 changed files with 239 additions and 17 deletions

View File

@ -70,6 +70,9 @@ Release 2.6.0 - UNRELEASED
YARN-2102. Added the concept of a Timeline Domain to handle read/write ACLs YARN-2102. Added the concept of a Timeline Domain to handle read/write ACLs
on Timeline service event data. (Zhijie Shen via vinodkv) on Timeline service event data. (Zhijie Shen via vinodkv)
YARN-2581. Passed LogAggregationContext to NM via ContainerTokenIdentifier.
(Xuan Gong via zjshen)
IMPROVEMENTS IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu YARN-2242. Improve exception information on AM launch crashes. (Li Lu

View File

@ -34,8 +34,11 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
/** /**
* TokenIdentifier for a container. Encodes {@link ContainerId}, * TokenIdentifier for a container. Encodes {@link ContainerId},
@ -59,10 +62,19 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
private long rmIdentifier; private long rmIdentifier;
private Priority priority; private Priority priority;
private long creationTime; private long creationTime;
private LogAggregationContext logAggregationContext;
public ContainerTokenIdentifier(ContainerId containerID, public ContainerTokenIdentifier(ContainerId containerID,
String hostName, String appSubmitter, Resource r, long expiryTimeStamp, String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
int masterKeyId, long rmIdentifier, Priority priority, long creationTime) { int masterKeyId, long rmIdentifier, Priority priority, long creationTime) {
this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
rmIdentifier, priority, creationTime, null);
}
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext) {
this.containerId = containerID; this.containerId = containerID;
this.nmHostAddr = hostName; this.nmHostAddr = hostName;
this.appSubmitter = appSubmitter; this.appSubmitter = appSubmitter;
@ -72,6 +84,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
this.rmIdentifier = rmIdentifier; this.rmIdentifier = rmIdentifier;
this.priority = priority; this.priority = priority;
this.creationTime = creationTime; this.creationTime = creationTime;
this.logAggregationContext = logAggregationContext;
} }
/** /**
@ -119,6 +132,10 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
return this.rmIdentifier; return this.rmIdentifier;
} }
public LogAggregationContext getLogAggregationContext() {
return this.logAggregationContext;
}
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this); LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
@ -138,6 +155,15 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
out.writeLong(this.rmIdentifier); out.writeLong(this.rmIdentifier);
out.writeInt(this.priority.getPriority()); out.writeInt(this.priority.getPriority());
out.writeLong(this.creationTime); out.writeLong(this.creationTime);
if (this.logAggregationContext == null) {
out.writeInt(-1);
} else {
byte[] logAggregationContext =
((LogAggregationContextPBImpl) this.logAggregationContext).getProto()
.toByteArray();
out.writeInt(logAggregationContext.length);
out.write(logAggregationContext);
}
} }
@Override @Override
@ -158,6 +184,14 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
this.rmIdentifier = in.readLong(); this.rmIdentifier = in.readLong();
this.priority = Priority.newInstance(in.readInt()); this.priority = Priority.newInstance(in.readInt());
this.creationTime = in.readLong(); this.creationTime = in.readLong();
int size = in.readInt();
if (size != -1) {
byte[] bytes = new byte[size];
in.readFully(bytes);
this.logAggregationContext =
new LogAggregationContextPBImpl(
LogAggregationContextProto.parseFrom(bytes));
}
} }
@Override @Override

View File

@ -72,9 +72,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -275,11 +277,17 @@ public class ContainerManagerImpl extends CompositeService implements
aclProto.getAcl()); aclProto.getAcl());
} }
LogAggregationContext logAggregationContext = null;
if (p.getLogAggregationContext() != null) {
logAggregationContext =
new LogAggregationContextPBImpl(p.getLogAggregationContext());
}
LOG.info("Recovering application " + appId); LOG.info("Recovering application " + appId);
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
creds, context); creds, context);
context.getApplications().put(appId, app); context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls)); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -719,13 +727,19 @@ public class ContainerManagerImpl extends CompositeService implements
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials, String user, Credentials credentials,
Map<ApplicationAccessType, String> appAcls) { Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
ContainerManagerApplicationProto.Builder builder = ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder(); ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId).getProto()); builder.setId(((ApplicationIdPBImpl) appId).getProto());
builder.setUser(user); builder.setUser(user);
if (logAggregationContext != null) {
builder.setLogAggregationContext((
(LogAggregationContextPBImpl)logAggregationContext).getProto());
}
builder.clearCredentials(); builder.clearCredentials();
if (credentials != null) { if (credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
@ -826,12 +840,16 @@ public class ContainerManagerImpl extends CompositeService implements
if (null == context.getApplications().putIfAbsent(applicationID, if (null == context.getApplications().putIfAbsent(applicationID,
application)) { application)) {
LOG.info("Creating a new application reference for app " + applicationID); LOG.info("Creating a new application reference for app " + applicationID);
LogAggregationContext logAggregationContext =
containerTokenIdentifier.getLogAggregationContext();
Map<ApplicationAccessType, String> appAcls = Map<ApplicationAccessType, String> appAcls =
container.getLaunchContext().getApplicationACLs(); container.getLaunchContext().getApplicationACLs();
context.getNMStateStore().storeApplication(applicationID, context.getNMStateStore().storeApplication(applicationID,
buildAppProto(applicationID, user, credentials, appAcls)); buildAppProto(applicationID, user, credentials, appAcls,
logAggregationContext));
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ApplicationInitEvent(applicationID, appAcls)); new ApplicationInitEvent(applicationID, appAcls,
logAggregationContext));
} }
this.context.getNMStateStore().storeContainer(containerId, request); this.context.getNMStateStore().storeContainer(containerId, request);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -54,6 +55,8 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import com.google.common.annotations.VisibleForTesting;
/** /**
* The state machine for the representation of an Application * The state machine for the representation of an Application
* within the NodeManager. * within the NodeManager.
@ -72,6 +75,8 @@ public class ApplicationImpl implements Application {
private static final Log LOG = LogFactory.getLog(Application.class); private static final Log LOG = LogFactory.getLog(Application.class);
private LogAggregationContext logAggregationContext;
Map<ContainerId, Container> containers = Map<ContainerId, Container> containers =
new HashMap<ContainerId, Container>(); new HashMap<ContainerId, Container>();
@ -234,10 +239,11 @@ public class ApplicationImpl implements Application {
app.applicationACLs = initEvent.getApplicationACLs(); app.applicationACLs = initEvent.getApplicationACLs();
app.aclsManager.addApplication(app.getAppId(), app.applicationACLs); app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
// Inform the logAggregator // Inform the logAggregator
app.logAggregationContext = initEvent.getLogAggregationContext();
app.dispatcher.getEventHandler().handle( app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user, new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
app.applicationACLs)); app.applicationACLs, app.logAggregationContext));
} }
} }
@ -467,4 +473,14 @@ public class ApplicationImpl implements Application {
public String toString() { public String toString() {
return appId.toString(); return appId.toString();
} }
@VisibleForTesting
public LogAggregationContext getLogAggregationContext() {
try {
this.readLock.lock();
return this.logAggregationContext;
} finally {
this.readLock.unlock();
}
}
} }

View File

@ -22,18 +22,31 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
public class ApplicationInitEvent extends ApplicationEvent { public class ApplicationInitEvent extends ApplicationEvent {
private final Map<ApplicationAccessType, String> applicationACLs; private final Map<ApplicationAccessType, String> applicationACLs;
private final LogAggregationContext logAggregationContext;
public ApplicationInitEvent(ApplicationId appId, public ApplicationInitEvent(ApplicationId appId,
Map<ApplicationAccessType, String> acls) { Map<ApplicationAccessType, String> acls) {
this(appId, acls, null);
}
public ApplicationInitEvent(ApplicationId appId,
Map<ApplicationAccessType, String> acls,
LogAggregationContext logAggregationContext) {
super(appId, ApplicationEventType.INIT_APPLICATION); super(appId, ApplicationEventType.INIT_APPLICATION);
this.applicationACLs = acls; this.applicationACLs = acls;
this.logAggregationContext = logAggregationContext;
} }
public Map<ApplicationAccessType, String> getApplicationACLs() { public Map<ApplicationAccessType, String> getApplicationACLs() {
return this.applicationACLs; return this.applicationACLs;
} }
public LogAggregationContext getLogAggregationContext() {
return this.logAggregationContext;
}
} }

View File

@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
public class LogHandlerAppStartedEvent extends LogHandlerEvent { public class LogHandlerAppStartedEvent extends LogHandlerEvent {
@ -32,16 +33,25 @@ public class LogHandlerAppStartedEvent extends LogHandlerEvent {
private final String user; private final String user;
private final Credentials credentials; private final Credentials credentials;
private final Map<ApplicationAccessType, String> appAcls; private final Map<ApplicationAccessType, String> appAcls;
private final LogAggregationContext logAggregationContext;
public LogHandlerAppStartedEvent(ApplicationId appId, String user, public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy, Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls) { Map<ApplicationAccessType, String> appAcls) {
this(appId, user, credentials, retentionPolicy, appAcls, null);
}
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
super(LogHandlerEventType.APPLICATION_STARTED); super(LogHandlerEventType.APPLICATION_STARTED);
this.applicationId = appId; this.applicationId = appId;
this.user = user; this.user = user;
this.credentials = credentials; this.credentials = credentials;
this.retentionPolicy = retentionPolicy; this.retentionPolicy = retentionPolicy;
this.appAcls = appAcls; this.appAcls = appAcls;
this.logAggregationContext = logAggregationContext;
} }
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
@ -64,4 +74,7 @@ public class LogHandlerAppStartedEvent extends LogHandlerEvent {
return this.appAcls; return this.appAcls;
} }
public LogAggregationContext getLogAggregationContext() {
return this.logAggregationContext;
}
} }

View File

@ -29,6 +29,7 @@ message ContainerManagerApplicationProto {
optional string user = 2; optional string user = 2;
optional bytes credentials = 3; optional bytes credentials = 3;
repeated ApplicationACLMapProto acls = 4; repeated ApplicationACLMapProto acls = 4;
optional LogAggregationContextProto log_aggregation_context = 5;
} }
message DeletionServiceDeleteTaskProto { message DeletionServiceDeleteTaskProto {

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -795,11 +796,20 @@ public class TestContainerManager extends BaseContainerManagerTest {
NodeId nodeId, String user, NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager) NMContainerTokenSecretManager containerTokenSecretManager)
throws IOException { throws IOException {
return createContainerToken(cId, rmIdentifier, nodeId, user,
containerTokenSecretManager, null);
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext)
throws IOException {
Resource r = BuilderUtils.newResource(1024, 1); Resource r = BuilderUtils.newResource(1024, 1);
ContainerTokenIdentifier containerTokenIdentifier = ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), user, r, new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
System.currentTimeMillis() + 100000L, 123, rmIdentifier, System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0); Priority.newInstance(0), 0, logAggregationContext);
Token containerToken = Token containerToken =
BuilderUtils BuilderUtils
.newContainerToken(nodeId, containerTokenSecretManager .newContainerToken(nodeId, containerTokenSecretManager

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
@ -126,8 +128,12 @@ public class TestContainerManagerRecovery {
ContainerLaunchContext clc = ContainerLaunchContext.newInstance( ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, containerEnv, containerCmds, serviceData, localResources, containerEnv, containerCmds, serviceData,
containerTokens, acls); containerTokens, acls);
// create the logAggregationContext
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance("includePattern", "excludePattern",
1000);
StartContainersResponse startResponse = startContainer(context, cm, cid, StartContainersResponse startResponse = startContainer(context, cm, cid,
clc); clc, logAggregationContext);
assertTrue(startResponse.getFailedRequests().isEmpty()); assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size()); assertEquals(1, context.getApplications().size());
Application app = context.getApplications().get(appId); Application app = context.getApplications().get(appId);
@ -157,6 +163,18 @@ public class TestContainerManagerRecovery {
assertEquals(1, context.getApplications().size()); assertEquals(1, context.getApplications().size());
app = context.getApplications().get(appId); app = context.getApplications().get(appId);
assertNotNull(app); assertNotNull(app);
// check whether LogAggregationContext is recovered correctly
LogAggregationContext recovered =
((ApplicationImpl) app).getLogAggregationContext();
assertNotNull(recovered);
assertEquals(logAggregationContext.getRollingIntervalSeconds(),
recovered.getRollingIntervalSeconds());
assertEquals(logAggregationContext.getIncludePattern(),
recovered.getIncludePattern());
assertEquals(logAggregationContext.getExcludePattern(),
recovered.getExcludePattern());
waitForAppState(app, ApplicationState.INITING); waitForAppState(app, ApplicationState.INITING);
assertTrue(context.getApplicationACLsManager().checkAccess( assertTrue(context.getApplicationACLsManager().checkAccess(
UserGroupInformation.createRemoteUser(modUser), UserGroupInformation.createRemoteUser(modUser),
@ -224,13 +242,14 @@ public class TestContainerManagerRecovery {
private StartContainersResponse startContainer(Context context, private StartContainersResponse startContainer(Context context,
final ContainerManagerImpl cm, ContainerId cid, final ContainerManagerImpl cm, ContainerId cid,
ContainerLaunchContext clc) throws Exception { ContainerLaunchContext clc, LogAggregationContext logAggregationContext)
throws Exception {
UserGroupInformation user = UserGroupInformation.createRemoteUser( UserGroupInformation user = UserGroupInformation.createRemoteUser(
cid.getApplicationAttemptId().toString()); cid.getApplicationAttemptId().toString());
StartContainerRequest scReq = StartContainerRequest.newInstance( StartContainerRequest scReq = StartContainerRequest.newInstance(
clc, TestContainerManager.createContainerToken(cid, 0, clc, TestContainerManager.createContainerToken(cid, 0,
context.getNodeId(), user.getShortUserName(), context.getNodeId(), user.getShortUserName(),
context.getContainerTokenSecretManager())); context.getContainerTokenSecretManager(), logAggregationContext));
final List<StartContainerRequest> scReqList = final List<StartContainerRequest> scReqList =
new ArrayList<StartContainerRequest>(); new ArrayList<StartContainerRequest>();
scReqList.add(scReq); scReqList.add(scReq);

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
@ -91,6 +92,7 @@ public class SchedulerApplicationAttempt {
private Resource amResource = Resources.none(); private Resource amResource = Resources.none();
private boolean unmanagedAM = true; private boolean unmanagedAM = true;
private boolean amRunning = false; private boolean amRunning = false;
private LogAggregationContext logAggregationContext;
protected List<RMContainer> newlyAllocatedContainers = protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>(); new ArrayList<RMContainer>();
@ -138,6 +140,8 @@ public class SchedulerApplicationAttempt {
.getApplicationSubmissionContext(); .getApplicationSubmissionContext();
if (appSubmissionContext != null) { if (appSubmissionContext != null) {
unmanagedAM = appSubmissionContext.getUnmanagedAM(); unmanagedAM = appSubmissionContext.getUnmanagedAM();
this.logAggregationContext =
appSubmissionContext.getLogAggregationContext();
} }
} }
} }
@ -444,7 +448,7 @@ public class SchedulerApplicationAttempt {
container.setContainerToken(rmContext.getContainerTokenSecretManager() container.setContainerToken(rmContext.getContainerTokenSecretManager()
.createContainerToken(container.getId(), container.getNodeId(), .createContainerToken(container.getId(), container.getNodeId(),
getUser(), container.getResource(), container.getPriority(), getUser(), container.getResource(), container.getPriority(),
rmContainer.getCreationTime())); rmContainer.getCreationTime(), this.logAggregationContext));
NMToken nmToken = NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container); getApplicationAttemptId(), container);

View File

@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -177,6 +178,25 @@ public class RMContainerTokenSecretManager extends
public Token createContainerToken(ContainerId containerId, NodeId nodeId, public Token createContainerToken(ContainerId containerId, NodeId nodeId,
String appSubmitter, Resource capability, Priority priority, String appSubmitter, Resource capability, Priority priority,
long createTime) { long createTime) {
return createContainerToken(containerId, nodeId, appSubmitter, capability,
priority, createTime, null);
}
/**
* Helper function for creating ContainerTokens
*
* @param containerId
* @param nodeId
* @param appSubmitter
* @param capability
* @param priority
* @param createTime
* @param logAggregationContext
* @return the container-token
*/
public Token createContainerToken(ContainerId containerId, NodeId nodeId,
String appSubmitter, Resource capability, Priority priority,
long createTime, LogAggregationContext logAggregationContext) {
byte[] password; byte[] password;
ContainerTokenIdentifier tokenIdentifier; ContainerTokenIdentifier tokenIdentifier;
long expiryTimeStamp = long expiryTimeStamp =
@ -189,7 +209,8 @@ public class RMContainerTokenSecretManager extends
new ContainerTokenIdentifier(containerId, nodeId.toString(), new ContainerTokenIdentifier(containerId, nodeId.toString(),
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
.getMasterKey().getKeyId(), .getMasterKey().getKeyId(),
ResourceManager.getClusterTimeStamp(), priority, createTime); ResourceManager.getClusterTimeStamp(), priority, createTime,
logAggregationContext);
password = this.createPassword(tokenIdentifier); password = this.createPassword(tokenIdentifier);
} finally { } finally {

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -278,7 +279,7 @@ public class MockRM extends ResourceManager {
boolean waitForAccepted, boolean keepContainers) throws Exception { boolean waitForAccepted, boolean keepContainers) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue, return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers, maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
false, null, 0); false, null, 0, null);
} }
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
@ -287,7 +288,7 @@ public class MockRM extends ResourceManager {
.getShortUserName(), null, false, null, .getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, attemptFailuresValidityInterval); false, null, attemptFailuresValidityInterval, null);
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
@ -297,14 +298,24 @@ public class MockRM extends ResourceManager {
ApplicationId applicationId) throws Exception { ApplicationId applicationId) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue, return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers, maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, 0); isAppIdProvided, applicationId, 0, null);
} }
public RMApp submitApp(int masterMemory,
LogAggregationContext logAggregationContext) throws Exception {
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, 0, logAggregationContext);
}
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue, Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType, int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval) ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext)
throws Exception { throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService(); ApplicationClientProtocol client = getClientRMService();
@ -342,6 +353,9 @@ public class MockRM extends ResourceManager {
} }
sub.setAMContainerSpec(clc); sub.setAMContainerSpec(clc);
sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
if (logAggregationContext != null) {
sub.setLogAggregationContext(logAggregationContext);
}
req.setApplicationSubmissionContext(sub); req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser = UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});

View File

@ -28,12 +28,14 @@ import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -195,6 +198,58 @@ public class TestContainerAllocation {
Assert.assertEquals(1, containers.size()); Assert.assertEquals(1, containers.size());
} }
// This is to test whether LogAggregationContext is passed into
// container tokens correctly
@Test
public void testLogAggregationContextPassedIntoContainerToken()
throws Exception {
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
MockNM nm2 = rm1.registerNode("127.0.0.1:2345", 8000);
// LogAggregationContext is set as null
Assert
.assertNull(getLogAggregationContextFromContainerToken(rm1, nm1, null));
// create a not-null LogAggregationContext
final int interval = 2000;
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance(
"includePattern", "excludePattern", interval);
LogAggregationContext returned =
getLogAggregationContextFromContainerToken(rm1, nm2,
logAggregationContext);
Assert.assertEquals("includePattern", returned.getIncludePattern());
Assert.assertEquals("excludePattern", returned.getExcludePattern());
Assert.assertEquals(interval, returned.getRollingIntervalSeconds());
rm1.stop();
}
private LogAggregationContext getLogAggregationContextFromContainerToken(
MockRM rm1, MockNM nm1, LogAggregationContext logAggregationContext)
throws Exception {
RMApp app2 = rm1.submitApp(200, logAggregationContext);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
nm1.nodeHeartbeat(true);
// request a container.
am2.allocate("127.0.0.1", 512, 1, new ArrayList<ContainerId>());
ContainerId containerId =
ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId, RMContainerState.ALLOCATED);
// acquire the container.
List<Container> containers =
am2.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
Assert.assertEquals(containerId, containers.get(0).getId());
// container token is generated.
Assert.assertNotNull(containers.get(0).getContainerToken());
ContainerTokenIdentifier token =
BuilderUtils.newContainerTokenIdentifier(containers.get(0)
.getContainerToken());
return token.getLogAggregationContext();
}
private volatile int numRetries = 0; private volatile int numRetries = 0;
private class TestRMSecretManagerService extends RMSecretManagerService { private class TestRMSecretManagerService extends RMSecretManagerService {
@ -210,10 +265,11 @@ public class TestContainerAllocation {
@Override @Override
public Token createContainerToken(ContainerId containerId, public Token createContainerToken(ContainerId containerId,
NodeId nodeId, String appSubmitter, Resource capability, NodeId nodeId, String appSubmitter, Resource capability,
Priority priority, long createTime) { Priority priority, long createTime,
LogAggregationContext logAggregationContext) {
numRetries++; numRetries++;
return super.createContainerToken(containerId, nodeId, appSubmitter, return super.createContainerToken(containerId, nodeId, appSubmitter,
capability, priority, createTime); capability, priority, createTime, logAggregationContext);
} }
}; };
} }