YARN-582. Changed ResourceManager to recover Application token and client tokens for app attempt so that RM can be restarted while preserving current applications. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1480168 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-08 06:20:22 +00:00
parent 33ed6e2838
commit 74231f0276
16 changed files with 439 additions and 97 deletions

View File

@ -225,6 +225,10 @@ Release 2.0.5-beta - UNRELEASED
YARN-651. Changed PBClientImpls of ContainerManager and RMAdmin to throw
IOExceptions also. (Xuan Gong via vinodkv)
YARN-582. Changed ResourceManager to recover Application token and client
tokens for app attempt so that RM can be restarted while preserving current
applications. (Jian He via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -72,4 +72,5 @@ message ApplicationStateDataProto {
message ApplicationAttemptStateDataProto {
optional ApplicationAttemptIdProto attemptId = 1;
optional ContainerProto master_container = 2;
optional bytes app_attempt_tokens = 3;
}

View File

@ -334,10 +334,6 @@ public void recover(RMState state) throws Exception {
LOG.info("Recovering " + appStates.size() + " applications");
for(ApplicationState appState : appStates.values()) {
boolean shouldRecover = true;
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
// populating the state
if(appState.getApplicationSubmissionContext().getUnmanagedAM()) {
// do not recover unmanaged applications since current recovery
// mechanism of restarting attempts does not work for them.
@ -367,6 +363,10 @@ public void recover(RMState state) throws Exception {
shouldRecover = false;
}
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
// populating the state
if(shouldRecover) {
LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(),

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
@ -48,7 +47,6 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -203,25 +201,16 @@ private void setupTokensAndEnv(
credentials.readTokenStorageStream(dibb);
}
ApplicationTokenIdentifier id = new ApplicationTokenIdentifier(
application.getAppAttemptId());
Token<ApplicationTokenIdentifier> appMasterToken =
new Token<ApplicationTokenIdentifier>(id,
this.rmContext.getApplicationTokenSecretManager());
InetSocketAddress serviceAddr = conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
// normally the client should set the service after acquiring the token,
// but this token is directly provided to the AMs
SecurityUtil.setTokenService(appMasterToken, serviceAddr);
// Add the ApplicationMaster token
credentials.addToken(appMasterToken.getService(), appMasterToken);
// Add application token
Token<ApplicationTokenIdentifier> applicationToken =
application.getApplicationToken();
if(applicationToken != null) {
credentials.addToken(applicationToken.getService(), applicationToken);
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
container.setContainerTokens(
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
container.setContainerTokens(ByteBuffer.wrap(dob.getData(), 0,
dob.getLength()));
SecretKey clientSecretKey =
this.rmContext.getClientToAMTokenSecretManager().getMasterKey(

View File

@ -31,13 +31,15 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@ -114,8 +116,17 @@ public synchronized RMState loadState() throws Exception {
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
ApplicationAttemptState attemptState = new ApplicationAttemptState(
attemptId, attemptStateData.getMasterContainer());
Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){
credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials);
// assert child node name is same as application attempt id
assert attemptId.equals(attemptState.getAttemptId());
attempts.add(attemptState);

View File

@ -23,10 +23,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@ -79,8 +81,16 @@ public synchronized void storeApplicationAttemptState(String attemptIdStr,
throws Exception {
ApplicationAttemptId attemptId = ConverterUtils
.toApplicationAttemptId(attemptIdStr);
ApplicationAttemptState attemptState = new ApplicationAttemptState(
attemptId, attemptStateData.getMasterContainer());
Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){
DataInputByteBuffer dibb = new DataInputByteBuffer();
credentials = new Credentials();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials);
ApplicationState appState = state.getApplicationState().get(
attemptState.getAttemptId().getApplicationId());

View File

@ -20,8 +20,8 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@Unstable
public class NullRMStateStore extends RMStateStore {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@ -26,6 +27,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -34,8 +38,10 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -58,19 +64,25 @@ public abstract class RMStateStore {
public static class ApplicationAttemptState {
final ApplicationAttemptId attemptId;
final Container masterContainer;
final Credentials appAttemptTokens;
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer) {
Container masterContainer,
Credentials appAttemptTokens) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptTokens = appAttemptTokens;
}
public Container getMasterContainer() {
return masterContainer;
}
public ApplicationAttemptId getAttemptId() {
return attemptId;
}
public Credentials getAppAttemptTokens() {
return appAttemptTokens;
}
}
/**
@ -199,10 +211,14 @@ protected abstract void storeApplicationState(String appId,
* RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
*/
public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
ApplicationAttemptState attemptState = new ApplicationAttemptState(
appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
Credentials credentials = getTokensFromAppAttempt(appAttempt);
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials);
dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
new RMStateStoreAppAttemptEvent(attemptState));
}
/**
@ -226,8 +242,10 @@ public synchronized void removeApplication(RMApp app) {
ApplicationState appState = new ApplicationState(
app.getSubmitTime(), app.getApplicationSubmissionContext());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
ApplicationAttemptState attemptState = new ApplicationAttemptState(
appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
Credentials credentials = getTokensFromAppAttempt(appAttempt);
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials);
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@ -249,7 +267,20 @@ public synchronized void removeApplication(ApplicationState appState) {
*/
protected abstract void removeApplicationState(ApplicationState appState)
throws Exception;
private Credentials getTokensFromAppAttempt(RMAppAttempt appAttempt) {
Credentials credentials = new Credentials();
Token<ApplicationTokenIdentifier> appToken = appAttempt.getApplicationToken();
if(appToken != null){
credentials.addToken(appToken.getService(), appToken);
}
Token<ClientTokenIdentifier> clientToken = appAttempt.getClientToken();
if(clientToken != null){
credentials.addToken(clientToken.getService(), clientToken);
}
return credentials;
}
// Dispatcher related code
private synchronized void handleStoreEvent(RMStateStoreEvent event) {
@ -283,13 +314,22 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) {
ApplicationAttemptState attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
Exception storedException = null;
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl();
attemptStateData.setAttemptId(attemptState.getAttemptId());
attemptStateData.setMasterContainer(attemptState.getMasterContainer());
LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
Credentials credentials = attemptState.getAppAttemptTokens();
ByteBuffer appAttemptTokens = null;
try {
if(credentials != null){
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
appAttemptTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
ApplicationAttemptStateDataPBImpl attemptStateData =
(ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
.newApplicationAttemptStateData(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens);
LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
storeApplicationAttemptState(attemptState.getAttemptId().toString(),
attemptStateData);
} catch (Exception e) {
@ -358,7 +398,5 @@ private final class ForwardingEventHandler
public void handle(RMStateStoreEvent event) {
handleStoreEvent(event);
}
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -49,4 +51,14 @@ public interface ApplicationAttemptStateData {
public Container getMasterContainer();
public void setMasterContainer(Container container);
/**
* The application attempt tokens that belong to this attempt
* @return The application attempt tokens that belong to this attempt
*/
@Public
@Unstable
public ByteBuffer getAppAttemptTokens();
public void setAppAttemptTokens(ByteBuffer attemptTokens);
}

View File

@ -16,20 +16,27 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
public class ApplicationAttemptStateDataPBImpl
extends ProtoBase<ApplicationAttemptStateDataProto>
implements ApplicationAttemptStateData {
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
ApplicationAttemptStateDataProto proto =
ApplicationAttemptStateDataProto.getDefaultInstance();
ApplicationAttemptStateDataProto.Builder builder = null;
@ -37,7 +44,8 @@ public class ApplicationAttemptStateDataPBImpl
private ApplicationAttemptId attemptId = null;
private Container masterContainer = null;
private ByteBuffer appAttemptTokens = null;
public ApplicationAttemptStateDataPBImpl() {
builder = ApplicationAttemptStateDataProto.newBuilder();
}
@ -62,6 +70,9 @@ private void mergeLocalToBuilder() {
if(this.masterContainer != null) {
builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
}
if(this.appAttemptTokens != null) {
builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens));
}
}
private void mergeLocalToProto() {
@ -123,4 +134,36 @@ public void setMasterContainer(Container container) {
this.masterContainer = container;
}
@Override
public ByteBuffer getAppAttemptTokens() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if(appAttemptTokens != null) {
return appAttemptTokens;
}
if(!p.hasAppAttemptTokens()) {
return null;
}
this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens());
return appAttemptTokens;
}
@Override
public void setAppAttemptTokens(ByteBuffer attemptTokens) {
maybeInitBuilder();
if(attemptTokens == null) {
builder.clearAppAttemptTokens();
}
this.appAttemptTokens = attemptTokens;
}
public static ApplicationAttemptStateData newApplicationAttemptStateData(
ApplicationAttemptId attemptId, Container container,
ByteBuffer attemptTokens) {
ApplicationAttemptStateData attemptStateData =
recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
attemptStateData.setMasterContainer(container);
attemptStateData.setAppAttemptTokens(attemptTokens);
return attemptStateData;
}
}

View File

@ -16,13 +16,14 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class ApplicationStateDataPBImpl
extends ProtoBase<ApplicationStateDataProto>

View File

@ -32,6 +32,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -47,6 +48,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@ -443,7 +445,14 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
clientToken = this.currentAttempt.getClientToken();
Token<ClientTokenIdentifier> attemptClientToken =
this.currentAttempt.getClientToken();
if (attemptClientToken != null) {
clientToken =
BuilderUtils.newClientToken(attemptClientToken.getIdentifier(),
attemptClientToken.getKind().toString(), attemptClientToken
.getPassword(), attemptClientToken.getService().toString());
}
host = this.currentAttempt.getHost();
rpcPort = this.currentAttempt.getRpcPort();
appUsageReport = currentAttempt.getApplicationResourceUsageReport();

View File

@ -21,16 +21,18 @@
import java.util.List;
import java.util.Set;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ClientToken;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/**
@ -92,7 +94,7 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
* The token required by the clients to talk to the application attempt
* @return the token required by the clients to talk to the application attempt
*/
ClientToken getClientToken();
Token<ClientTokenIdentifier> getClientToken();
/**
* Diagnostics information for the application attempt.
@ -146,6 +148,12 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
*/
ApplicationSubmissionContext getSubmissionContext();
/**
* The application token belonging to this app attempt
* @return The application token belonging to this app attempt
*/
Token<ApplicationTokenIdentifier> getApplicationToken();
/**
* Get application container and resource usage information.
* @return an ApplicationResourceUsageReport object.

View File

@ -20,6 +20,7 @@
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@ -38,6 +39,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
@ -45,7 +49,6 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ClientToken;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -58,7 +61,10 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.ApplicationTokenSelector;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
@ -123,8 +129,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private final WriteLock writeLock;
private final ApplicationAttemptId applicationAttemptId;
private ClientToken clientToken;
private Token<ClientTokenIdentifier> clientToken;
private final ApplicationSubmissionContext submissionContext;
private Token<ApplicationTokenIdentifier> applicationToken = null;
//nodes on while this attempt's containers ran
private final Set<NodeId> ranNodes =
@ -366,19 +373,6 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
this.scheduler = scheduler;
this.masterService = masterService;
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getClientToAMTokenSecretManager().registerApplication(
appAttemptId);
Token<ClientTokenIdentifier> token =
new Token<ClientTokenIdentifier>(new ClientTokenIdentifier(
appAttemptId), this.rmContext.getClientToAMTokenSecretManager());
this.clientToken =
BuilderUtils.newClientToken(token.getIdentifier(), token.getKind()
.toString(), token.getPassword(), token.getService().toString());
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@ -502,10 +496,15 @@ private void setTrackingUrlToRMAppPage() {
}
@Override
public ClientToken getClientToken() {
public Token<ClientTokenIdentifier> getClientToken() {
return this.clientToken;
}
@Override
public Token<ApplicationTokenIdentifier> getApplicationToken() {
return this.applicationToken;
}
@Override
public String getDiagnostics() {
this.readLock.lock();
@ -657,14 +656,42 @@ public void recover(RMState state) {
ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
assert attemptState != null;
setMasterContainer(attemptState.getMasterContainer());
LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
recoverAppAttemptTokens(attemptState.getAppAttemptTokens());
LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
+ " AttemptId: " + getAppAttemptId()
+ " MasterContainer: " + masterContainer);
setDiagnostics("Attempt recovered after RM restart");
handle(new RMAppAttemptEvent(getAppAttemptId(),
RMAppAttemptEventType.RECOVER));
}
private void recoverAppAttemptTokens(Credentials appAttemptTokens) {
if (appAttemptTokens == null) {
return;
}
if (UserGroupInformation.isSecurityEnabled()) {
ClientTokenSelector clientTokenSelector = new ClientTokenSelector();
this.clientToken =
clientTokenSelector.selectToken(new Text(),
appAttemptTokens.getAllTokens());
InetSocketAddress serviceAddr = conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
ApplicationTokenSelector appTokenSelector = new ApplicationTokenSelector();
this.applicationToken =
appTokenSelector.selectToken(
SecurityUtil.buildTokenService(serviceAddr),
appAttemptTokens.getAllTokens());
// For now, no need to populate tokens back to
// ApplicationTokenSecretManager, because running attempts are rebooted
// Later in work-preserve restart, we'll create NEW->RUNNING transition
// in which the restored tokens will be added to the secret manager
}
}
private static class BaseTransition implements
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
@ -686,6 +713,36 @@ public void transition(RMAppAttemptImpl appAttempt,
appAttempt.masterService
.registerAppAttempt(appAttempt.applicationAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.rmContext.getClientToAMTokenSecretManager()
.registerApplication(appAttempt.applicationAttemptId);
// create clientToken
appAttempt.clientToken =
new Token<ClientTokenIdentifier>(new ClientTokenIdentifier(
appAttempt.applicationAttemptId),
appAttempt.rmContext.getClientToAMTokenSecretManager());
// create application token
ApplicationTokenIdentifier id =
new ApplicationTokenIdentifier(appAttempt.applicationAttemptId);
Token<ApplicationTokenIdentifier> applicationToken =
new Token<ApplicationTokenIdentifier>(id,
appAttempt.rmContext.getApplicationTokenSecretManager());
InetSocketAddress serviceAddr =
appAttempt.conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
// normally the client should set the service after acquiring the
// token, but this token is directly provided to the AMs
SecurityUtil.setTokenService(applicationToken, serviceAddr);
appAttempt.applicationToken = applicationToken;
}
// Add the application to the scheduler
appAttempt.eventHandler.handle(
new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,
@ -992,7 +1049,6 @@ public void transition(RMAppAttemptImpl appAttempt,
appAttempt.rmContext.getAMFinishingMonitor().unregister(
appAttempt.getAppAttemptId());
// Unregister from the ClientTokenSecretManager
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.rmContext.getClientToAMTokenSecretManager()
@ -1191,7 +1247,7 @@ public long getStartTime() {
this.readLock.unlock();
}
}
private void launchAttempt(){
// Send event to launch the AM Container
eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));

View File

@ -404,7 +404,8 @@ public void testRMRestartOnMaxAppAttempts() throws Exception {
}
@Test
public void testTokenRestoredOnRMrestart() throws Exception {
public void testDelegationTokenRestoredInDelegationTokenRenewer()
throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
@ -423,7 +424,7 @@ public void testTokenRestoredOnRMrestart() throws Exception {
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new MyMockRM(conf, memStore);
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
HashSet<Token<RMDelegationTokenIdentifier>> tokenSet =
@ -461,21 +462,26 @@ public void testTokenRestoredOnRMrestart() throws Exception {
ApplicationState appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
// assert delegation tokens exist in rm1 DelegationTokenRenewr
Assert.assertEquals(tokenSet, rm1.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
// assert delegation tokens are saved
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
securityTokens.rewind();
Assert.assertEquals(securityTokens, appState
.getApplicationSubmissionContext().getAMContainerSpec()
.getContainerTokens());
// start new RM
MockRM rm2 = new MyMockRM(conf, memStore);
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
// verify tokens are properly populated back to DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm1.getRMContext()
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm2.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
// stop the RM
@ -483,9 +489,92 @@ public void testTokenRestoredOnRMrestart() throws Exception {
rm2.stop();
}
class MyMockRM extends MockRM {
@Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
public MyMockRM(Configuration conf, RMStateStore store) {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// submit an app
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), "default");
// assert app info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
// Allocate the AM
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
// assert attempt info is saved
ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
// the appToken and clientToken that are generated when RMAppAttempt is created,
HashSet<Token<?>> tokenSet = new HashSet<Token<?>>();
tokenSet.add(attempt1.getApplicationToken());
tokenSet.add(attempt1.getClientToken());
// assert application Token is saved
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
Assert.assertEquals(tokenSet, savedTokens);
// start new RM
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
RMApp loadedApp1 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1);
// assert loaded attempt recovered attempt tokens
Assert.assertNotNull(loadedAttempt1);
savedTokens.clear();
savedTokens.add(loadedAttempt1.getApplicationToken());
savedTokens.add(loadedAttempt1.getClientToken());
Assert.assertEquals(tokenSet, savedTokens);
// assert clientToken is recovered back to api-versioned clientToken
Assert.assertEquals(attempt1.getClientToken(),
loadedAttempt1.getClientToken());
// Not testing ApplicationTokenSecretManager has the password populated back,
// that is needed in work-preserving restart
rm1.stop();
rm2.stop();
}
class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
super(conf, store);
}

View File

@ -18,14 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -34,6 +39,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -44,13 +51,18 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
public class TestRMStateStore {
@ -141,7 +153,7 @@ public void addOrphanAttemptIfNeeded(RMStateStore testStore,
ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(
"appattempt_1352994193343_0003_000001");
storeAttempt(testStore, attemptId,
"container_1352994193343_0003_01_000001", dispatcher);
"container_1352994193343_0003_01_000001", null, null, dispatcher);
}
@Override
@ -186,14 +198,17 @@ void storeApp(RMStateStore store, ApplicationId appId, long time)
}
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
String containerIdStr, TestDispatcher dispatcher)
throws Exception {
String containerIdStr, Token<ApplicationTokenIdentifier> appToken,
Token<ClientTokenIdentifier> clientToken, TestDispatcher dispatcher)
throws Exception {
Container container = new ContainerPBImpl();
container.setId(ConverterUtils.toContainerId(containerIdStr));
RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
when(mockAttempt.getMasterContainer()).thenReturn(container);
when(mockAttempt.getApplicationToken()).thenReturn(appToken);
when(mockAttempt.getClientToken()).thenReturn(clientToken);
dispatcher.attemptId = attemptId;
dispatcher.storedException = null;
store.storeApplicationAttempt(mockAttempt);
@ -201,30 +216,58 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
return container.getId();
}
@SuppressWarnings("unchecked")
void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
long submitTime = System.currentTimeMillis();
Configuration conf = new YarnConfiguration();
RMStateStore store = stateStoreHelper.getRMStateStore();
TestDispatcher dispatcher = new TestDispatcher();
store.setDispatcher(dispatcher);
ApplicationTokenSecretManager appTokenMgr =
new ApplicationTokenSecretManager(conf);
ClientToAMTokenSecretManagerInRM clientTokenMgr =
new ClientToAMTokenSecretManagerInRM();
ApplicationAttemptId attemptId1 = ConverterUtils
.toApplicationAttemptId("appattempt_1352994193343_0001_000001");
ApplicationId appId1 = attemptId1.getApplicationId();
storeApp(store, appId1, submitTime);
// create application token1 for attempt1
List<Token<?>> appAttemptToken1 =
generateTokens(attemptId1, appTokenMgr, clientTokenMgr, conf);
HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
attemptTokenSet1.addAll(appAttemptToken1);
ContainerId containerId1 = storeAttempt(store, attemptId1,
"container_1352994193343_0001_01_000001", dispatcher);
"container_1352994193343_0001_01_000001",
(Token<ApplicationTokenIdentifier>) (appAttemptToken1.get(0)),
(Token<ClientTokenIdentifier>)(appAttemptToken1.get(1)),
dispatcher);
String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
ApplicationAttemptId attemptId2 =
ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
// create application token2 for attempt2
List<Token<?>> appAttemptToken2 =
generateTokens(attemptId2, appTokenMgr, clientTokenMgr, conf);
HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
attemptTokenSet2.addAll(appAttemptToken2);
ContainerId containerId2 = storeAttempt(store, attemptId2,
"container_1352994193343_0001_02_000001", dispatcher);
"container_1352994193343_0001_02_000001",
(Token<ApplicationTokenIdentifier>) (appAttemptToken2.get(0)),
(Token<ClientTokenIdentifier>)(appAttemptToken2.get(1)),
dispatcher);
ApplicationAttemptId attemptIdRemoved = ConverterUtils
.toApplicationAttemptId("appattempt_1352994193343_0002_000001");
ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
storeApp(store, appIdRemoved, submitTime);
storeAttempt(store, attemptIdRemoved,
"container_1352994193343_0002_01_000001", dispatcher);
"container_1352994193343_0002_01_000001", null, null, dispatcher);
RMApp mockRemovedApp = mock(RMApp.class);
HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
@ -268,12 +311,21 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
assertEquals(attemptId1, attemptState.getAttemptId());
// attempt1 container is loaded correctly
assertEquals(containerId1, attemptState.getMasterContainer().getId());
// attempt1 applicationToken is loaded correctly
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
assertEquals(attemptTokenSet1, savedTokens);
attemptState = appState.getAttempt(attemptId2);
// attempt2 is loaded correctly
assertNotNull(attemptState);
assertEquals(attemptId2, attemptState.getAttemptId());
// attempt2 container is loaded correctly
assertEquals(containerId2, attemptState.getMasterContainer().getId());
// attempt2 applicationToken is loaded correctly
savedTokens.clear();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
assertEquals(attemptTokenSet2, savedTokens);
// assert store is in expected state after everything is cleaned
assertTrue(stateStoreHelper.isFinalStateValid());
@ -281,4 +333,23 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
store.close();
}
private List<Token<?>> generateTokens(ApplicationAttemptId attemptId,
ApplicationTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) {
ApplicationTokenIdentifier appTokenId =
new ApplicationTokenIdentifier(attemptId);
Token<ApplicationTokenIdentifier> appToken =
new Token<ApplicationTokenIdentifier>(appTokenId, appTokenMgr);
appToken.setService(new Text("appToken service"));
ClientTokenIdentifier clientTokenId = new ClientTokenIdentifier(attemptId);
clientTokenMgr.registerApplication(attemptId);
Token<ClientTokenIdentifier> clientToken =
new Token<ClientTokenIdentifier>(clientTokenId, clientTokenMgr);
clientToken.setService(new Text("clientToken service"));
List<Token<?>> tokenPair = new ArrayList<Token<?>>();
tokenPair.add(0, appToken);
tokenPair.add(1, clientToken);
return tokenPair;
}
}