YARN-582. Changed ResourceManager to recover Application token and client tokens for app attempt so that RM can be restarted while preserving current applications. Contributed by Jian He.

svn merge --ignore-ancestry -c 1480168 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1480169 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-08 06:21:02 +00:00
parent e5ce72fb4e
commit 0ec689c9a8
16 changed files with 439 additions and 97 deletions

View File

@ -150,6 +150,10 @@ Release 2.0.5-beta - UNRELEASED
YARN-651. Changed PBClientImpls of ContainerManager and RMAdmin to throw YARN-651. Changed PBClientImpls of ContainerManager and RMAdmin to throw
IOExceptions also. (Xuan Gong via vinodkv) IOExceptions also. (Xuan Gong via vinodkv)
YARN-582. Changed ResourceManager to recover Application token and client
tokens for app attempt so that RM can be restarted while preserving current
applications. (Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -72,4 +72,5 @@ message ApplicationStateDataProto {
message ApplicationAttemptStateDataProto { message ApplicationAttemptStateDataProto {
optional ApplicationAttemptIdProto attemptId = 1; optional ApplicationAttemptIdProto attemptId = 1;
optional ContainerProto master_container = 2; optional ContainerProto master_container = 2;
optional bytes app_attempt_tokens = 3;
} }

View File

@ -334,10 +334,6 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
LOG.info("Recovering " + appStates.size() + " applications"); LOG.info("Recovering " + appStates.size() + " applications");
for(ApplicationState appState : appStates.values()) { for(ApplicationState appState : appStates.values()) {
boolean shouldRecover = true; boolean shouldRecover = true;
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
// populating the state
if(appState.getApplicationSubmissionContext().getUnmanagedAM()) { if(appState.getApplicationSubmissionContext().getUnmanagedAM()) {
// do not recover unmanaged applications since current recovery // do not recover unmanaged applications since current recovery
// mechanism of restarting attempts does not work for them. // mechanism of restarting attempts does not work for them.
@ -367,6 +363,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
shouldRecover = false; shouldRecover = false;
} }
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
// populating the state
if(shouldRecover) { if(shouldRecover) {
LOG.info("Recovering application " + appState.getAppId()); LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(), submitApplication(appState.getApplicationSubmissionContext(),

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -203,25 +201,16 @@ public class AMLauncher implements Runnable {
credentials.readTokenStorageStream(dibb); credentials.readTokenStorageStream(dibb);
} }
ApplicationTokenIdentifier id = new ApplicationTokenIdentifier( // Add application token
application.getAppAttemptId()); Token<ApplicationTokenIdentifier> applicationToken =
Token<ApplicationTokenIdentifier> appMasterToken = application.getApplicationToken();
new Token<ApplicationTokenIdentifier>(id, if(applicationToken != null) {
this.rmContext.getApplicationTokenSecretManager()); credentials.addToken(applicationToken.getService(), applicationToken);
InetSocketAddress serviceAddr = conf.getSocketAddr( }
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
// normally the client should set the service after acquiring the token,
// but this token is directly provided to the AMs
SecurityUtil.setTokenService(appMasterToken, serviceAddr);
// Add the ApplicationMaster token
credentials.addToken(appMasterToken.getService(), appMasterToken);
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob); credentials.writeTokenStorageToStream(dob);
container.setContainerTokens( container.setContainerTokens(ByteBuffer.wrap(dob.getData(), 0,
ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); dob.getLength()));
SecretKey clientSecretKey = SecretKey clientSecretKey =
this.rmContext.getClientToAMTokenSecretManager().getMasterKey( this.rmContext.getClientToAMTokenSecretManager().getMasterKey(

View File

@ -31,13 +31,15 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -114,8 +116,17 @@ public class FileSystemRMStateStore extends RMStateStore {
ApplicationAttemptStateDataPBImpl attemptStateData = ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl( new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData)); ApplicationAttemptStateDataProto.parseFrom(childData));
ApplicationAttemptState attemptState = new ApplicationAttemptState( Credentials credentials = null;
attemptId, attemptStateData.getMasterContainer()); if(attemptStateData.getAppAttemptTokens() != null){
credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials);
// assert child node name is same as application attempt id // assert child node name is same as application attempt id
assert attemptId.equals(attemptState.getAttemptId()); assert attemptId.equals(attemptState.getAttemptId());
attempts.add(attemptState); attempts.add(attemptState);

View File

@ -23,10 +23,12 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -79,8 +81,16 @@ public class MemoryRMStateStore extends RMStateStore {
throws Exception { throws Exception {
ApplicationAttemptId attemptId = ConverterUtils ApplicationAttemptId attemptId = ConverterUtils
.toApplicationAttemptId(attemptIdStr); .toApplicationAttemptId(attemptIdStr);
ApplicationAttemptState attemptState = new ApplicationAttemptState( Credentials credentials = null;
attemptId, attemptStateData.getMasterContainer()); if(attemptStateData.getAppAttemptTokens() != null){
DataInputByteBuffer dibb = new DataInputByteBuffer();
credentials = new Credentials();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials);
ApplicationState appState = state.getApplicationState().get( ApplicationState appState = state.getApplicationState().get(
attemptState.getAttemptId().getApplicationId()); attemptState.getAttemptId().getApplicationId());

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@Unstable @Unstable
public class NullRMStateStore extends RMStateStore { public class NullRMStateStore extends RMStateStore {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery; package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -26,6 +27,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -34,8 +38,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPB
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -58,11 +64,14 @@ public abstract class RMStateStore {
public static class ApplicationAttemptState { public static class ApplicationAttemptState {
final ApplicationAttemptId attemptId; final ApplicationAttemptId attemptId;
final Container masterContainer; final Container masterContainer;
final Credentials appAttemptTokens;
public ApplicationAttemptState(ApplicationAttemptId attemptId, public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer) { Container masterContainer,
Credentials appAttemptTokens) {
this.attemptId = attemptId; this.attemptId = attemptId;
this.masterContainer = masterContainer; this.masterContainer = masterContainer;
this.appAttemptTokens = appAttemptTokens;
} }
public Container getMasterContainer() { public Container getMasterContainer() {
@ -71,6 +80,9 @@ public abstract class RMStateStore {
public ApplicationAttemptId getAttemptId() { public ApplicationAttemptId getAttemptId() {
return attemptId; return attemptId;
} }
public Credentials getAppAttemptTokens() {
return appAttemptTokens;
}
} }
/** /**
@ -199,10 +211,14 @@ public abstract class RMStateStore {
* RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
*/ */
public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
ApplicationAttemptState attemptState = new ApplicationAttemptState( Credentials credentials = getTokensFromAppAttempt(appAttempt);
appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials);
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState)); new RMStateStoreAppAttemptEvent(attemptState));
} }
/** /**
@ -226,8 +242,10 @@ public abstract class RMStateStore {
ApplicationState appState = new ApplicationState( ApplicationState appState = new ApplicationState(
app.getSubmitTime(), app.getApplicationSubmissionContext()); app.getSubmitTime(), app.getApplicationSubmissionContext());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
ApplicationAttemptState attemptState = new ApplicationAttemptState( Credentials credentials = getTokensFromAppAttempt(appAttempt);
appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials);
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} }
@ -250,6 +268,19 @@ public abstract class RMStateStore {
protected abstract void removeApplicationState(ApplicationState appState) protected abstract void removeApplicationState(ApplicationState appState)
throws Exception; throws Exception;
private Credentials getTokensFromAppAttempt(RMAppAttempt appAttempt) {
Credentials credentials = new Credentials();
Token<ApplicationTokenIdentifier> appToken = appAttempt.getApplicationToken();
if(appToken != null){
credentials.addToken(appToken.getService(), appToken);
}
Token<ClientTokenIdentifier> clientToken = appAttempt.getClientToken();
if(clientToken != null){
credentials.addToken(clientToken.getService(), clientToken);
}
return credentials;
}
// Dispatcher related code // Dispatcher related code
private synchronized void handleStoreEvent(RMStateStoreEvent event) { private synchronized void handleStoreEvent(RMStateStoreEvent event) {
@ -283,13 +314,22 @@ public abstract class RMStateStore {
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
Exception storedException = null; Exception storedException = null;
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl();
attemptStateData.setAttemptId(attemptState.getAttemptId());
attemptStateData.setMasterContainer(attemptState.getMasterContainer());
LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); Credentials credentials = attemptState.getAppAttemptTokens();
ByteBuffer appAttemptTokens = null;
try { try {
if(credentials != null){
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
appAttemptTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
ApplicationAttemptStateDataPBImpl attemptStateData =
(ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
.newApplicationAttemptStateData(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens);
LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
storeApplicationAttemptState(attemptState.getAttemptId().toString(), storeApplicationAttemptState(attemptState.getAttemptId().toString(),
attemptStateData); attemptStateData);
} catch (Exception e) { } catch (Exception e) {
@ -358,7 +398,5 @@ public abstract class RMStateStore {
public void handle(RMStateStoreEvent event) { public void handle(RMStateStoreEvent event) {
handleStoreEvent(event); handleStoreEvent(event);
} }
} }
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -49,4 +51,14 @@ public interface ApplicationAttemptStateData {
public Container getMasterContainer(); public Container getMasterContainer();
public void setMasterContainer(Container container); public void setMasterContainer(Container container);
/**
* The application attempt tokens that belong to this attempt
* @return The application attempt tokens that belong to this attempt
*/
@Public
@Unstable
public ByteBuffer getAppAttemptTokens();
public void setAppAttemptTokens(ByteBuffer attemptTokens);
} }

View File

@ -16,19 +16,26 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
public class ApplicationAttemptStateDataPBImpl public class ApplicationAttemptStateDataPBImpl
extends ProtoBase<ApplicationAttemptStateDataProto> extends ProtoBase<ApplicationAttemptStateDataProto>
implements ApplicationAttemptStateData { implements ApplicationAttemptStateData {
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
ApplicationAttemptStateDataProto proto = ApplicationAttemptStateDataProto proto =
ApplicationAttemptStateDataProto.getDefaultInstance(); ApplicationAttemptStateDataProto.getDefaultInstance();
@ -37,6 +44,7 @@ implements ApplicationAttemptStateData {
private ApplicationAttemptId attemptId = null; private ApplicationAttemptId attemptId = null;
private Container masterContainer = null; private Container masterContainer = null;
private ByteBuffer appAttemptTokens = null;
public ApplicationAttemptStateDataPBImpl() { public ApplicationAttemptStateDataPBImpl() {
builder = ApplicationAttemptStateDataProto.newBuilder(); builder = ApplicationAttemptStateDataProto.newBuilder();
@ -62,6 +70,9 @@ implements ApplicationAttemptStateData {
if(this.masterContainer != null) { if(this.masterContainer != null) {
builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
} }
if(this.appAttemptTokens != null) {
builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens));
}
} }
private void mergeLocalToProto() { private void mergeLocalToProto() {
@ -123,4 +134,36 @@ implements ApplicationAttemptStateData {
this.masterContainer = container; this.masterContainer = container;
} }
@Override
public ByteBuffer getAppAttemptTokens() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if(appAttemptTokens != null) {
return appAttemptTokens;
}
if(!p.hasAppAttemptTokens()) {
return null;
}
this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens());
return appAttemptTokens;
}
@Override
public void setAppAttemptTokens(ByteBuffer attemptTokens) {
maybeInitBuilder();
if(attemptTokens == null) {
builder.clearAppAttemptTokens();
}
this.appAttemptTokens = attemptTokens;
}
public static ApplicationAttemptStateData newApplicationAttemptStateData(
ApplicationAttemptId attemptId, Container container,
ByteBuffer attemptTokens) {
ApplicationAttemptStateData attemptStateData =
recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
attemptStateData.setMasterContainer(container);
attemptStateData.setAppAttemptTokens(attemptTokens);
return attemptStateData;
}
} }

View File

@ -16,13 +16,14 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class ApplicationStateDataPBImpl public class ApplicationStateDataPBImpl
extends ProtoBase<ApplicationStateDataProto> extends ProtoBase<ApplicationStateDataProto>

View File

@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@ -443,7 +445,14 @@ public class RMAppImpl implements RMApp, Recoverable {
currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
trackingUrl = this.currentAttempt.getTrackingUrl(); trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl(); origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
clientToken = this.currentAttempt.getClientToken(); Token<ClientTokenIdentifier> attemptClientToken =
this.currentAttempt.getClientToken();
if (attemptClientToken != null) {
clientToken =
BuilderUtils.newClientToken(attemptClientToken.getIdentifier(),
attemptClientToken.getKind().toString(), attemptClientToken
.getPassword(), attemptClientToken.getService().toString());
}
host = this.currentAttempt.getHost(); host = this.currentAttempt.getHost();
rpcPort = this.currentAttempt.getRpcPort(); rpcPort = this.currentAttempt.getRpcPort();
appUsageReport = currentAttempt.getApplicationResourceUsageReport(); appUsageReport = currentAttempt.getApplicationResourceUsageReport();

View File

@ -21,16 +21,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ClientToken;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/** /**
@ -92,7 +94,7 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
* The token required by the clients to talk to the application attempt * The token required by the clients to talk to the application attempt
* @return the token required by the clients to talk to the application attempt * @return the token required by the clients to talk to the application attempt
*/ */
ClientToken getClientToken(); Token<ClientTokenIdentifier> getClientToken();
/** /**
* Diagnostics information for the application attempt. * Diagnostics information for the application attempt.
@ -146,6 +148,12 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
*/ */
ApplicationSubmissionContext getSubmissionContext(); ApplicationSubmissionContext getSubmissionContext();
/**
* The application token belonging to this app attempt
* @return The application token belonging to this app attempt
*/
Token<ApplicationTokenIdentifier> getApplicationToken();
/** /**
* Get application container and resource usage information. * Get application container and resource usage information.
* @return an ApplicationResourceUsageReport object. * @return an ApplicationResourceUsageReport object.

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import static org.apache.hadoop.yarn.util.StringHelper.pjoin; import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
@ -38,6 +39,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
@ -45,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ClientToken;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -58,7 +61,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.ApplicationTokenSelector;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
@ -123,8 +129,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private final WriteLock writeLock; private final WriteLock writeLock;
private final ApplicationAttemptId applicationAttemptId; private final ApplicationAttemptId applicationAttemptId;
private ClientToken clientToken; private Token<ClientTokenIdentifier> clientToken;
private final ApplicationSubmissionContext submissionContext; private final ApplicationSubmissionContext submissionContext;
private Token<ApplicationTokenIdentifier> applicationToken = null;
//nodes on while this attempt's containers ran //nodes on while this attempt's containers ran
private final Set<NodeId> ranNodes = private final Set<NodeId> ranNodes =
@ -366,19 +373,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.scheduler = scheduler; this.scheduler = scheduler;
this.masterService = masterService; this.masterService = masterService;
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getClientToAMTokenSecretManager().registerApplication(
appAttemptId);
Token<ClientTokenIdentifier> token =
new Token<ClientTokenIdentifier>(new ClientTokenIdentifier(
appAttemptId), this.rmContext.getClientToAMTokenSecretManager());
this.clientToken =
BuilderUtils.newClientToken(token.getIdentifier(), token.getKind()
.toString(), token.getPassword(), token.getService().toString());
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock(); this.readLock = lock.readLock();
this.writeLock = lock.writeLock(); this.writeLock = lock.writeLock();
@ -502,10 +496,15 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
@Override @Override
public ClientToken getClientToken() { public Token<ClientTokenIdentifier> getClientToken() {
return this.clientToken; return this.clientToken;
} }
@Override
public Token<ApplicationTokenIdentifier> getApplicationToken() {
return this.applicationToken;
}
@Override @Override
public String getDiagnostics() { public String getDiagnostics() {
this.readLock.lock(); this.readLock.lock();
@ -657,6 +656,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId()); ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
assert attemptState != null; assert attemptState != null;
setMasterContainer(attemptState.getMasterContainer()); setMasterContainer(attemptState.getMasterContainer());
recoverAppAttemptTokens(attemptState.getAppAttemptTokens());
LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
+ " AttemptId: " + getAppAttemptId() + " AttemptId: " + getAppAttemptId()
+ " MasterContainer: " + masterContainer); + " MasterContainer: " + masterContainer);
@ -665,6 +665,33 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.RECOVER)); RMAppAttemptEventType.RECOVER));
} }
private void recoverAppAttemptTokens(Credentials appAttemptTokens) {
if (appAttemptTokens == null) {
return;
}
if (UserGroupInformation.isSecurityEnabled()) {
ClientTokenSelector clientTokenSelector = new ClientTokenSelector();
this.clientToken =
clientTokenSelector.selectToken(new Text(),
appAttemptTokens.getAllTokens());
InetSocketAddress serviceAddr = conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
ApplicationTokenSelector appTokenSelector = new ApplicationTokenSelector();
this.applicationToken =
appTokenSelector.selectToken(
SecurityUtil.buildTokenService(serviceAddr),
appAttemptTokens.getAllTokens());
// For now, no need to populate tokens back to
// ApplicationTokenSecretManager, because running attempts are rebooted
// Later in work-preserve restart, we'll create NEW->RUNNING transition
// in which the restored tokens will be added to the secret manager
}
}
private static class BaseTransition implements private static class BaseTransition implements
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> { SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
@ -686,6 +713,36 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.masterService appAttempt.masterService
.registerAppAttempt(appAttempt.applicationAttemptId); .registerAppAttempt(appAttempt.applicationAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.rmContext.getClientToAMTokenSecretManager()
.registerApplication(appAttempt.applicationAttemptId);
// create clientToken
appAttempt.clientToken =
new Token<ClientTokenIdentifier>(new ClientTokenIdentifier(
appAttempt.applicationAttemptId),
appAttempt.rmContext.getClientToAMTokenSecretManager());
// create application token
ApplicationTokenIdentifier id =
new ApplicationTokenIdentifier(appAttempt.applicationAttemptId);
Token<ApplicationTokenIdentifier> applicationToken =
new Token<ApplicationTokenIdentifier>(id,
appAttempt.rmContext.getApplicationTokenSecretManager());
InetSocketAddress serviceAddr =
appAttempt.conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
// normally the client should set the service after acquiring the
// token, but this token is directly provided to the AMs
SecurityUtil.setTokenService(applicationToken, serviceAddr);
appAttempt.applicationToken = applicationToken;
}
// Add the application to the scheduler // Add the application to the scheduler
appAttempt.eventHandler.handle( appAttempt.eventHandler.handle(
new AppAddedSchedulerEvent(appAttempt.applicationAttemptId, new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,
@ -992,7 +1049,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.rmContext.getAMFinishingMonitor().unregister( appAttempt.rmContext.getAMFinishingMonitor().unregister(
appAttempt.getAppAttemptId()); appAttempt.getAppAttemptId());
// Unregister from the ClientTokenSecretManager // Unregister from the ClientTokenSecretManager
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.rmContext.getClientToAMTokenSecretManager() appAttempt.rmContext.getClientToAMTokenSecretManager()

View File

@ -404,7 +404,8 @@ public class TestRMRestart {
} }
@Test @Test
public void testTokenRestoredOnRMrestart() throws Exception { public void testDelegationTokenRestoredInDelegationTokenRenewer()
throws Exception {
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit(); ExitUtil.disableSystemExit();
@ -423,7 +424,7 @@ public class TestRMRestart {
Map<ApplicationId, ApplicationState> rmAppState = Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState(); rmState.getApplicationState();
MockRM rm1 = new MyMockRM(conf, memStore); MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start(); rm1.start();
HashSet<Token<RMDelegationTokenIdentifier>> tokenSet = HashSet<Token<RMDelegationTokenIdentifier>> tokenSet =
@ -461,21 +462,26 @@ public class TestRMRestart {
ApplicationState appState = rmAppState.get(app.getApplicationId()); ApplicationState appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState); Assert.assertNotNull(appState);
// assert delegation tokens exist in rm1 DelegationTokenRenewr
Assert.assertEquals(tokenSet, rm1.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
// assert delegation tokens are saved // assert delegation tokens are saved
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob); ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
securityTokens.rewind();
Assert.assertEquals(securityTokens, appState Assert.assertEquals(securityTokens, appState
.getApplicationSubmissionContext().getAMContainerSpec() .getApplicationSubmissionContext().getAMContainerSpec()
.getContainerTokens()); .getContainerTokens());
// start new RM // start new RM
MockRM rm2 = new MyMockRM(conf, memStore); MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start(); rm2.start();
// verify tokens are properly populated back to DelegationTokenRenewer // verify tokens are properly populated back to rm2 DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm1.getRMContext() Assert.assertEquals(tokenSet, rm2.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens()); .getDelegationTokenRenewer().getDelegationTokens());
// stop the RM // stop the RM
@ -483,9 +489,92 @@ public class TestRMRestart {
rm2.stop(); rm2.stop();
} }
class MyMockRM extends MockRM { @Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
public MyMockRM(Configuration conf, RMStateStore store) { YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// submit an app
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), "default");
// assert app info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
// Allocate the AM
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
// assert attempt info is saved
ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
// the appToken and clientToken that are generated when RMAppAttempt is created,
HashSet<Token<?>> tokenSet = new HashSet<Token<?>>();
tokenSet.add(attempt1.getApplicationToken());
tokenSet.add(attempt1.getClientToken());
// assert application Token is saved
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
Assert.assertEquals(tokenSet, savedTokens);
// start new RM
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
RMApp loadedApp1 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1);
// assert loaded attempt recovered attempt tokens
Assert.assertNotNull(loadedAttempt1);
savedTokens.clear();
savedTokens.add(loadedAttempt1.getApplicationToken());
savedTokens.add(loadedAttempt1.getClientToken());
Assert.assertEquals(tokenSet, savedTokens);
// assert clientToken is recovered back to api-versioned clientToken
Assert.assertEquals(attempt1.getClientToken(),
loadedAttempt1.getClientToken());
// Not testing ApplicationTokenSecretManager has the password populated back,
// that is needed in work-preserving restart
rm1.stop();
rm2.stop();
}
class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
super(conf, store); super(conf, store);
} }

View File

@ -18,14 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery; package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.junit.Test;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -34,6 +39,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -44,13 +51,18 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
public class TestRMStateStore { public class TestRMStateStore {
@ -141,7 +153,7 @@ public class TestRMStateStore {
ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId( ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(
"appattempt_1352994193343_0003_000001"); "appattempt_1352994193343_0003_000001");
storeAttempt(testStore, attemptId, storeAttempt(testStore, attemptId,
"container_1352994193343_0003_01_000001", dispatcher); "container_1352994193343_0003_01_000001", null, null, dispatcher);
} }
@Override @Override
@ -186,14 +198,17 @@ public class TestRMStateStore {
} }
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
String containerIdStr, TestDispatcher dispatcher) String containerIdStr, Token<ApplicationTokenIdentifier> appToken,
throws Exception { Token<ClientTokenIdentifier> clientToken, TestDispatcher dispatcher)
throws Exception {
Container container = new ContainerPBImpl(); Container container = new ContainerPBImpl();
container.setId(ConverterUtils.toContainerId(containerIdStr)); container.setId(ConverterUtils.toContainerId(containerIdStr));
RMAppAttempt mockAttempt = mock(RMAppAttempt.class); RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
when(mockAttempt.getAppAttemptId()).thenReturn(attemptId); when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
when(mockAttempt.getMasterContainer()).thenReturn(container); when(mockAttempt.getMasterContainer()).thenReturn(container);
when(mockAttempt.getApplicationToken()).thenReturn(appToken);
when(mockAttempt.getClientToken()).thenReturn(clientToken);
dispatcher.attemptId = attemptId; dispatcher.attemptId = attemptId;
dispatcher.storedException = null; dispatcher.storedException = null;
store.storeApplicationAttempt(mockAttempt); store.storeApplicationAttempt(mockAttempt);
@ -201,30 +216,58 @@ public class TestRMStateStore {
return container.getId(); return container.getId();
} }
@SuppressWarnings("unchecked")
void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
long submitTime = System.currentTimeMillis(); long submitTime = System.currentTimeMillis();
Configuration conf = new YarnConfiguration();
RMStateStore store = stateStoreHelper.getRMStateStore(); RMStateStore store = stateStoreHelper.getRMStateStore();
TestDispatcher dispatcher = new TestDispatcher(); TestDispatcher dispatcher = new TestDispatcher();
store.setDispatcher(dispatcher); store.setDispatcher(dispatcher);
ApplicationTokenSecretManager appTokenMgr =
new ApplicationTokenSecretManager(conf);
ClientToAMTokenSecretManagerInRM clientTokenMgr =
new ClientToAMTokenSecretManagerInRM();
ApplicationAttemptId attemptId1 = ConverterUtils ApplicationAttemptId attemptId1 = ConverterUtils
.toApplicationAttemptId("appattempt_1352994193343_0001_000001"); .toApplicationAttemptId("appattempt_1352994193343_0001_000001");
ApplicationId appId1 = attemptId1.getApplicationId(); ApplicationId appId1 = attemptId1.getApplicationId();
storeApp(store, appId1, submitTime); storeApp(store, appId1, submitTime);
// create application token1 for attempt1
List<Token<?>> appAttemptToken1 =
generateTokens(attemptId1, appTokenMgr, clientTokenMgr, conf);
HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
attemptTokenSet1.addAll(appAttemptToken1);
ContainerId containerId1 = storeAttempt(store, attemptId1, ContainerId containerId1 = storeAttempt(store, attemptId1,
"container_1352994193343_0001_01_000001", dispatcher); "container_1352994193343_0001_01_000001",
(Token<ApplicationTokenIdentifier>) (appAttemptToken1.get(0)),
(Token<ClientTokenIdentifier>)(appAttemptToken1.get(1)),
dispatcher);
String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
ApplicationAttemptId attemptId2 = ApplicationAttemptId attemptId2 =
ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
// create application token2 for attempt2
List<Token<?>> appAttemptToken2 =
generateTokens(attemptId2, appTokenMgr, clientTokenMgr, conf);
HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
attemptTokenSet2.addAll(appAttemptToken2);
ContainerId containerId2 = storeAttempt(store, attemptId2, ContainerId containerId2 = storeAttempt(store, attemptId2,
"container_1352994193343_0001_02_000001", dispatcher); "container_1352994193343_0001_02_000001",
(Token<ApplicationTokenIdentifier>) (appAttemptToken2.get(0)),
(Token<ClientTokenIdentifier>)(appAttemptToken2.get(1)),
dispatcher);
ApplicationAttemptId attemptIdRemoved = ConverterUtils ApplicationAttemptId attemptIdRemoved = ConverterUtils
.toApplicationAttemptId("appattempt_1352994193343_0002_000001"); .toApplicationAttemptId("appattempt_1352994193343_0002_000001");
ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
storeApp(store, appIdRemoved, submitTime); storeApp(store, appIdRemoved, submitTime);
storeAttempt(store, attemptIdRemoved, storeAttempt(store, attemptIdRemoved,
"container_1352994193343_0002_01_000001", dispatcher); "container_1352994193343_0002_01_000001", null, null, dispatcher);
RMApp mockRemovedApp = mock(RMApp.class); RMApp mockRemovedApp = mock(RMApp.class);
HashMap<ApplicationAttemptId, RMAppAttempt> attempts = HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
@ -268,12 +311,21 @@ public class TestRMStateStore {
assertEquals(attemptId1, attemptState.getAttemptId()); assertEquals(attemptId1, attemptState.getAttemptId());
// attempt1 container is loaded correctly // attempt1 container is loaded correctly
assertEquals(containerId1, attemptState.getMasterContainer().getId()); assertEquals(containerId1, attemptState.getMasterContainer().getId());
// attempt1 applicationToken is loaded correctly
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
assertEquals(attemptTokenSet1, savedTokens);
attemptState = appState.getAttempt(attemptId2); attemptState = appState.getAttempt(attemptId2);
// attempt2 is loaded correctly // attempt2 is loaded correctly
assertNotNull(attemptState); assertNotNull(attemptState);
assertEquals(attemptId2, attemptState.getAttemptId()); assertEquals(attemptId2, attemptState.getAttemptId());
// attempt2 container is loaded correctly // attempt2 container is loaded correctly
assertEquals(containerId2, attemptState.getMasterContainer().getId()); assertEquals(containerId2, attemptState.getMasterContainer().getId());
// attempt2 applicationToken is loaded correctly
savedTokens.clear();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
assertEquals(attemptTokenSet2, savedTokens);
// assert store is in expected state after everything is cleaned // assert store is in expected state after everything is cleaned
assertTrue(stateStoreHelper.isFinalStateValid()); assertTrue(stateStoreHelper.isFinalStateValid());
@ -281,4 +333,23 @@ public class TestRMStateStore {
store.close(); store.close();
} }
private List<Token<?>> generateTokens(ApplicationAttemptId attemptId,
ApplicationTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) {
ApplicationTokenIdentifier appTokenId =
new ApplicationTokenIdentifier(attemptId);
Token<ApplicationTokenIdentifier> appToken =
new Token<ApplicationTokenIdentifier>(appTokenId, appTokenMgr);
appToken.setService(new Text("appToken service"));
ClientTokenIdentifier clientTokenId = new ClientTokenIdentifier(attemptId);
clientTokenMgr.registerApplication(attemptId);
Token<ClientTokenIdentifier> clientToken =
new Token<ClientTokenIdentifier>(clientTokenId, clientTokenMgr);
clientToken.setService(new Text("clientToken service"));
List<Token<?>> tokenPair = new ArrayList<Token<?>>();
tokenPair.add(0, appToken);
tokenPair.add(1, clientToken);
return tokenPair;
}
} }