From 74231f027607ff1a6fe7d72fad28108826963cf3 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 8 May 2013 06:20:22 +0000 Subject: [PATCH] YARN-582. Changed ResourceManager to recover Application token and client tokens for app attempt so that RM can be restarted while preserving current applications. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1480168 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 + ...erver_resourcemanager_service_protos.proto | 1 + .../server/resourcemanager/RMAppManager.java | 8 +- .../amlauncher/AMLauncher.java | 27 ++--- .../recovery/FileSystemRMStateStore.java | 19 +++- .../recovery/MemoryRMStateStore.java | 18 ++- .../recovery/NullRMStateStore.java | 4 +- .../recovery/RMStateStore.java | 76 +++++++++---- .../records/ApplicationAttemptStateData.java | 12 ++ .../ApplicationAttemptStateDataPBImpl.java | 49 ++++++++- .../pb}/ApplicationStateDataPBImpl.java | 3 +- .../resourcemanager/rmapp/RMAppImpl.java | 11 +- .../rmapp/attempt/RMAppAttempt.java | 12 +- .../rmapp/attempt/RMAppAttemptImpl.java | 96 ++++++++++++---- .../server/resourcemanager/TestRMRestart.java | 103 ++++++++++++++++-- .../recovery/TestRMStateStore.java | 93 ++++++++++++++-- 16 files changed, 439 insertions(+), 97 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/{ => impl/pb}/ApplicationAttemptStateDataPBImpl.java (71%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/{ => impl/pb}/ApplicationStateDataPBImpl.java (97%) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ff21281e923..4ce688532a1 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -225,6 +225,10 @@ Release 2.0.5-beta - UNRELEASED YARN-651. Changed PBClientImpls of ContainerManager and RMAdmin to throw IOExceptions also. (Xuan Gong via vinodkv) + YARN-582. Changed ResourceManager to recover Application token and client + tokens for app attempt so that RM can be restarted while preserving current + applications. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto index 52e21d7fe5a..142dc451bbb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto @@ -72,4 +72,5 @@ message ApplicationStateDataProto { message ApplicationAttemptStateDataProto { optional ApplicationAttemptIdProto attemptId = 1; optional ContainerProto master_container = 2; + optional bytes app_attempt_tokens = 3; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 5cc4de1a667..369c10afa7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -334,10 +334,6 @@ public void recover(RMState state) throws Exception { LOG.info("Recovering " + appStates.size() + " applications"); for(ApplicationState appState : appStates.values()) { boolean shouldRecover = true; - // re-submit the application - // this is going to send an app start event but since the async dispatcher - // has not started that event will be queued until we have completed re - // populating the state if(appState.getApplicationSubmissionContext().getUnmanagedAM()) { // do not recover unmanaged applications since current recovery // mechanism of restarting attempts does not work for them. @@ -367,6 +363,10 @@ public void recover(RMState state) throws Exception { shouldRecover = false; } + // re-submit the application + // this is going to send an app start event but since the async dispatcher + // has not started that event will be queued until we have completed re + // populating the state if(shouldRecover) { LOG.info("Recovering application " + appState.getAppId()); submitApplication(appState.getApplicationSubmissionContext(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index c664475e374..b87046be4ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -34,7 +34,6 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; @@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -203,25 +201,16 @@ private void setupTokensAndEnv( credentials.readTokenStorageStream(dibb); } - ApplicationTokenIdentifier id = new ApplicationTokenIdentifier( - application.getAppAttemptId()); - Token appMasterToken = - new Token(id, - this.rmContext.getApplicationTokenSecretManager()); - InetSocketAddress serviceAddr = conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - // normally the client should set the service after acquiring the token, - // but this token is directly provided to the AMs - SecurityUtil.setTokenService(appMasterToken, serviceAddr); - - // Add the ApplicationMaster token - credentials.addToken(appMasterToken.getService(), appMasterToken); + // Add application token + Token applicationToken = + application.getApplicationToken(); + if(applicationToken != null) { + credentials.addToken(applicationToken.getService(), applicationToken); + } DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); - container.setContainerTokens( - ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + container.setContainerTokens(ByteBuffer.wrap(dob.getData(), 0, + dob.getLength())); SecretKey clientSecretKey = this.rmContext.getClientToAMTokenSecretManager().getMasterKey( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index c4990daec59..03154b61b80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -31,13 +31,15 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -114,8 +116,17 @@ public synchronized RMState loadState() throws Exception { ApplicationAttemptStateDataPBImpl attemptStateData = new ApplicationAttemptStateDataPBImpl( ApplicationAttemptStateDataProto.parseFrom(childData)); - ApplicationAttemptState attemptState = new ApplicationAttemptState( - attemptId, attemptStateData.getMasterContainer()); + Credentials credentials = null; + if(attemptStateData.getAppAttemptTokens() != null){ + credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(attemptStateData.getAppAttemptTokens()); + credentials.readTokenStorageStream(dibb); + } + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), credentials); + // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); attempts.add(attemptState); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5fb1167d714..dd6fab50731 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -23,10 +23,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -79,8 +81,16 @@ public synchronized void storeApplicationAttemptState(String attemptIdStr, throws Exception { ApplicationAttemptId attemptId = ConverterUtils .toApplicationAttemptId(attemptIdStr); - ApplicationAttemptState attemptState = new ApplicationAttemptState( - attemptId, attemptStateData.getMasterContainer()); + Credentials credentials = null; + if(attemptStateData.getAppAttemptTokens() != null){ + DataInputByteBuffer dibb = new DataInputByteBuffer(); + credentials = new Credentials(); + dibb.reset(attemptStateData.getAppAttemptTokens()); + credentials.readTokenStorageStream(dibb); + } + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), credentials); ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index db044958f0d..29bdbb03ca4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -20,8 +20,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @Unstable public class NullRMStateStore extends RMStateStore { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index d5c5015cf34..e1400f46a1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -26,6 +27,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -34,8 +38,10 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -58,19 +64,25 @@ public abstract class RMStateStore { public static class ApplicationAttemptState { final ApplicationAttemptId attemptId; final Container masterContainer; - + final Credentials appAttemptTokens; + public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer) { + Container masterContainer, + Credentials appAttemptTokens) { this.attemptId = attemptId; this.masterContainer = masterContainer; + this.appAttemptTokens = appAttemptTokens; } - + public Container getMasterContainer() { return masterContainer; } public ApplicationAttemptId getAttemptId() { return attemptId; } + public Credentials getAppAttemptTokens() { + return appAttemptTokens; + } } /** @@ -199,10 +211,14 @@ protected abstract void storeApplicationState(String appId, * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt */ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { - ApplicationAttemptState attemptState = new ApplicationAttemptState( - appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + Credentials credentials = getTokensFromAppAttempt(appAttempt); + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), credentials); + dispatcher.getEventHandler().handle( - new RMStateStoreAppAttemptEvent(attemptState)); + new RMStateStoreAppAttemptEvent(attemptState)); } /** @@ -226,8 +242,10 @@ public synchronized void removeApplication(RMApp app) { ApplicationState appState = new ApplicationState( app.getSubmitTime(), app.getApplicationSubmissionContext()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { - ApplicationAttemptState attemptState = new ApplicationAttemptState( - appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + Credentials credentials = getTokensFromAppAttempt(appAttempt); + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), credentials); appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -249,7 +267,20 @@ public synchronized void removeApplication(ApplicationState appState) { */ protected abstract void removeApplicationState(ApplicationState appState) throws Exception; - + + private Credentials getTokensFromAppAttempt(RMAppAttempt appAttempt) { + Credentials credentials = new Credentials(); + Token appToken = appAttempt.getApplicationToken(); + if(appToken != null){ + credentials.addToken(appToken.getService(), appToken); + } + Token clientToken = appAttempt.getClientToken(); + if(clientToken != null){ + credentials.addToken(clientToken.getService(), clientToken); + } + return credentials; + } + // Dispatcher related code private synchronized void handleStoreEvent(RMStateStoreEvent event) { @@ -283,13 +314,22 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { ApplicationAttemptState attemptState = ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); Exception storedException = null; - ApplicationAttemptStateDataPBImpl attemptStateData = - new ApplicationAttemptStateDataPBImpl(); - attemptStateData.setAttemptId(attemptState.getAttemptId()); - attemptStateData.setMasterContainer(attemptState.getMasterContainer()); - LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); + Credentials credentials = attemptState.getAppAttemptTokens(); + ByteBuffer appAttemptTokens = null; try { + if(credentials != null){ + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + appAttemptTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + ApplicationAttemptStateDataPBImpl attemptStateData = + (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl + .newApplicationAttemptStateData(attemptState.getAttemptId(), + attemptState.getMasterContainer(), appAttemptTokens); + + LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); storeApplicationAttemptState(attemptState.getAttemptId().toString(), attemptStateData); } catch (Exception e) { @@ -358,7 +398,5 @@ private final class ForwardingEventHandler public void handle(RMStateStoreEvent event) { handleStoreEvent(event); } - } - -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 64e3ccbfcbd..2622b0ec001 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -49,4 +51,14 @@ public interface ApplicationAttemptStateData { public Container getMasterContainer(); public void setMasterContainer(Container container); + + /** + * The application attempt tokens that belong to this attempt + * @return The application attempt tokens that belong to this attempt + */ + @Public + @Unstable + public ByteBuffer getAppAttemptTokens(); + + public void setAppAttemptTokens(ByteBuffer attemptTokens); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java similarity index 71% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index d033f5c8837..67e434e537e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -16,20 +16,27 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; + +import java.nio.ByteBuffer; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; public class ApplicationAttemptStateDataPBImpl extends ProtoBase implements ApplicationAttemptStateData { - + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + ApplicationAttemptStateDataProto proto = ApplicationAttemptStateDataProto.getDefaultInstance(); ApplicationAttemptStateDataProto.Builder builder = null; @@ -37,7 +44,8 @@ public class ApplicationAttemptStateDataPBImpl private ApplicationAttemptId attemptId = null; private Container masterContainer = null; - + private ByteBuffer appAttemptTokens = null; + public ApplicationAttemptStateDataPBImpl() { builder = ApplicationAttemptStateDataProto.newBuilder(); } @@ -62,6 +70,9 @@ private void mergeLocalToBuilder() { if(this.masterContainer != null) { builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); } + if(this.appAttemptTokens != null) { + builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens)); + } } private void mergeLocalToProto() { @@ -123,4 +134,36 @@ public void setMasterContainer(Container container) { this.masterContainer = container; } + @Override + public ByteBuffer getAppAttemptTokens() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(appAttemptTokens != null) { + return appAttemptTokens; + } + if(!p.hasAppAttemptTokens()) { + return null; + } + this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens()); + return appAttemptTokens; + } + + @Override + public void setAppAttemptTokens(ByteBuffer attemptTokens) { + maybeInitBuilder(); + if(attemptTokens == null) { + builder.clearAppAttemptTokens(); + } + this.appAttemptTokens = attemptTokens; + } + + public static ApplicationAttemptStateData newApplicationAttemptStateData( + ApplicationAttemptId attemptId, Container container, + ByteBuffer attemptTokens) { + ApplicationAttemptStateData attemptStateData = + recordFactory.newRecordInstance(ApplicationAttemptStateData.class); + attemptStateData.setAttemptId(attemptId); + attemptStateData.setMasterContainer(container); + attemptStateData.setAppAttemptTokens(attemptTokens); + return attemptStateData; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index 0aa64b73ec1..c95d6e13344 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -16,13 +16,14 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; public class ApplicationStateDataPBImpl extends ProtoBase diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 874232bb165..7965fe26311 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -443,7 +445,14 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) { currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); trackingUrl = this.currentAttempt.getTrackingUrl(); origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl(); - clientToken = this.currentAttempt.getClientToken(); + Token attemptClientToken = + this.currentAttempt.getClientToken(); + if (attemptClientToken != null) { + clientToken = + BuilderUtils.newClientToken(attemptClientToken.getIdentifier(), + attemptClientToken.getKind().toString(), attemptClientToken + .getPassword(), attemptClientToken.getService().toString()); + } host = this.currentAttempt.getHost(); rpcPort = this.currentAttempt.getRpcPort(); appUsageReport = currentAttempt.getApplicationResourceUsageReport(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 510c4fea08b..b9c7eb29371 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -21,16 +21,18 @@ import java.util.List; import java.util.Set; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ClientToken; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** @@ -92,7 +94,7 @@ public interface RMAppAttempt extends EventHandler { * The token required by the clients to talk to the application attempt * @return the token required by the clients to talk to the application attempt */ - ClientToken getClientToken(); + Token getClientToken(); /** * Diagnostics information for the application attempt. @@ -146,6 +148,12 @@ public interface RMAppAttempt extends EventHandler { */ ApplicationSubmissionContext getSubmissionContext(); + /** + * The application token belonging to this app attempt + * @return The application token belonging to this app attempt + */ + Token getApplicationToken(); + /** * Get application container and resource usage information. * @return an ApplicationResourceUsageReport object. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index eaa15f53875..49db01bb009 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.yarn.util.StringHelper.pjoin; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -38,6 +39,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; @@ -45,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ClientToken; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -58,7 +61,10 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.ApplicationTokenSelector; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; +import org.apache.hadoop.yarn.security.client.ClientTokenSelector; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; @@ -123,8 +129,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private final WriteLock writeLock; private final ApplicationAttemptId applicationAttemptId; - private ClientToken clientToken; + private Token clientToken; private final ApplicationSubmissionContext submissionContext; + private Token applicationToken = null; //nodes on while this attempt's containers ran private final Set ranNodes = @@ -366,19 +373,6 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, this.scheduler = scheduler; this.masterService = masterService; - if (UserGroupInformation.isSecurityEnabled()) { - - this.rmContext.getClientToAMTokenSecretManager().registerApplication( - appAttemptId); - - Token token = - new Token(new ClientTokenIdentifier( - appAttemptId), this.rmContext.getClientToAMTokenSecretManager()); - this.clientToken = - BuilderUtils.newClientToken(token.getIdentifier(), token.getKind() - .toString(), token.getPassword(), token.getService().toString()); - } - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -502,10 +496,15 @@ private void setTrackingUrlToRMAppPage() { } @Override - public ClientToken getClientToken() { + public Token getClientToken() { return this.clientToken; } + @Override + public Token getApplicationToken() { + return this.applicationToken; + } + @Override public String getDiagnostics() { this.readLock.lock(); @@ -657,14 +656,42 @@ public void recover(RMState state) { ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId()); assert attemptState != null; setMasterContainer(attemptState.getMasterContainer()); - LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() + recoverAppAttemptTokens(attemptState.getAppAttemptTokens()); + LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() + " AttemptId: " + getAppAttemptId() + " MasterContainer: " + masterContainer); setDiagnostics("Attempt recovered after RM restart"); handle(new RMAppAttemptEvent(getAppAttemptId(), RMAppAttemptEventType.RECOVER)); } - + + private void recoverAppAttemptTokens(Credentials appAttemptTokens) { + if (appAttemptTokens == null) { + return; + } + if (UserGroupInformation.isSecurityEnabled()) { + + ClientTokenSelector clientTokenSelector = new ClientTokenSelector(); + this.clientToken = + clientTokenSelector.selectToken(new Text(), + appAttemptTokens.getAllTokens()); + + InetSocketAddress serviceAddr = conf.getSocketAddr( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + ApplicationTokenSelector appTokenSelector = new ApplicationTokenSelector(); + this.applicationToken = + appTokenSelector.selectToken( + SecurityUtil.buildTokenService(serviceAddr), + appAttemptTokens.getAllTokens()); + + // For now, no need to populate tokens back to + // ApplicationTokenSecretManager, because running attempts are rebooted + // Later in work-preserve restart, we'll create NEW->RUNNING transition + // in which the restored tokens will be added to the secret manager + } + } private static class BaseTransition implements SingleArcTransition { @@ -686,6 +713,36 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.masterService .registerAppAttempt(appAttempt.applicationAttemptId); + if (UserGroupInformation.isSecurityEnabled()) { + + appAttempt.rmContext.getClientToAMTokenSecretManager() + .registerApplication(appAttempt.applicationAttemptId); + + // create clientToken + appAttempt.clientToken = + new Token(new ClientTokenIdentifier( + appAttempt.applicationAttemptId), + appAttempt.rmContext.getClientToAMTokenSecretManager()); + + // create application token + ApplicationTokenIdentifier id = + new ApplicationTokenIdentifier(appAttempt.applicationAttemptId); + Token applicationToken = + new Token(id, + appAttempt.rmContext.getApplicationTokenSecretManager()); + InetSocketAddress serviceAddr = + appAttempt.conf.getSocketAddr( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + // normally the client should set the service after acquiring the + // token, but this token is directly provided to the AMs + SecurityUtil.setTokenService(applicationToken, serviceAddr); + + appAttempt.applicationToken = applicationToken; + + } + // Add the application to the scheduler appAttempt.eventHandler.handle( new AppAddedSchedulerEvent(appAttempt.applicationAttemptId, @@ -992,7 +1049,6 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.rmContext.getAMFinishingMonitor().unregister( appAttempt.getAppAttemptId()); - // Unregister from the ClientTokenSecretManager if (UserGroupInformation.isSecurityEnabled()) { appAttempt.rmContext.getClientToAMTokenSecretManager() @@ -1191,7 +1247,7 @@ public long getStartTime() { this.readLock.unlock(); } } - + private void launchAttempt(){ // Send event to launch the AM Container eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index fded9fbc982..30ae6ce5246 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -404,7 +404,8 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { } @Test - public void testTokenRestoredOnRMrestart() throws Exception { + public void testDelegationTokenRestoredInDelegationTokenRenewer() + throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); ExitUtil.disableSystemExit(); @@ -423,7 +424,7 @@ public void testTokenRestoredOnRMrestart() throws Exception { Map rmAppState = rmState.getApplicationState(); - MockRM rm1 = new MyMockRM(conf, memStore); + MockRM rm1 = new TestSecurityMockRM(conf, memStore); rm1.start(); HashSet> tokenSet = @@ -461,21 +462,26 @@ public void testTokenRestoredOnRMrestart() throws Exception { ApplicationState appState = rmAppState.get(app.getApplicationId()); Assert.assertNotNull(appState); + // assert delegation tokens exist in rm1 DelegationTokenRenewr + Assert.assertEquals(tokenSet, rm1.getRMContext() + .getDelegationTokenRenewer().getDelegationTokens()); + // assert delegation tokens are saved DataOutputBuffer dob = new DataOutputBuffer(); ts.writeTokenStorageToStream(dob); ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + securityTokens.rewind(); Assert.assertEquals(securityTokens, appState .getApplicationSubmissionContext().getAMContainerSpec() .getContainerTokens()); // start new RM - MockRM rm2 = new MyMockRM(conf, memStore); + MockRM rm2 = new TestSecurityMockRM(conf, memStore); rm2.start(); - // verify tokens are properly populated back to DelegationTokenRenewer - Assert.assertEquals(tokenSet, rm1.getRMContext() + // verify tokens are properly populated back to rm2 DelegationTokenRenewer + Assert.assertEquals(tokenSet, rm2.getRMContext() .getDelegationTokenRenewer().getDelegationTokens()); // stop the RM @@ -483,9 +489,92 @@ public void testTokenRestoredOnRMrestart() throws Exception { rm2.stop(); } - class MyMockRM extends MockRM { + @Test + public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); - public MyMockRM(Configuration conf, RMStateStore store) { + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + MockRM rm1 = new TestSecurityMockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submit an app + RMApp app1 = + rm1.submitApp(200, "name", "user", + new HashMap(), "default"); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app1.getApplicationId()); + Assert.assertNotNull(appState); + + // Allocate the AM + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + + // assert attempt info is saved + ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); + Assert.assertNotNull(attemptState); + Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), + attemptState.getMasterContainer().getId()); + + // the appToken and clientToken that are generated when RMAppAttempt is created, + HashSet> tokenSet = new HashSet>(); + tokenSet.add(attempt1.getApplicationToken()); + tokenSet.add(attempt1.getClientToken()); + + // assert application Token is saved + HashSet> savedTokens = new HashSet>(); + savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens()); + Assert.assertEquals(tokenSet, savedTokens); + + // start new RM + MockRM rm2 = new TestSecurityMockRM(conf, memStore); + rm2.start(); + + RMApp loadedApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1); + + // assert loaded attempt recovered attempt tokens + Assert.assertNotNull(loadedAttempt1); + savedTokens.clear(); + savedTokens.add(loadedAttempt1.getApplicationToken()); + savedTokens.add(loadedAttempt1.getClientToken()); + Assert.assertEquals(tokenSet, savedTokens); + + // assert clientToken is recovered back to api-versioned clientToken + Assert.assertEquals(attempt1.getClientToken(), + loadedAttempt1.getClientToken()); + + // Not testing ApplicationTokenSecretManager has the password populated back, + // that is needed in work-preserving restart + + rm1.stop(); + rm2.stop(); + } + + class TestSecurityMockRM extends MockRM { + + public TestSecurityMockRM(Configuration conf, RMStateStore store) { super(conf, store); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index 440908feacb..a245e54692c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -18,14 +18,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; -import org.junit.Test; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,6 +39,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -44,13 +51,18 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Test; public class TestRMStateStore { @@ -141,7 +153,7 @@ public void addOrphanAttemptIfNeeded(RMStateStore testStore, ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId( "appattempt_1352994193343_0003_000001"); storeAttempt(testStore, attemptId, - "container_1352994193343_0003_01_000001", dispatcher); + "container_1352994193343_0003_01_000001", null, null, dispatcher); } @Override @@ -186,14 +198,17 @@ void storeApp(RMStateStore store, ApplicationId appId, long time) } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, - String containerIdStr, TestDispatcher dispatcher) - throws Exception { + String containerIdStr, Token appToken, + Token clientToken, TestDispatcher dispatcher) + throws Exception { Container container = new ContainerPBImpl(); container.setId(ConverterUtils.toContainerId(containerIdStr)); RMAppAttempt mockAttempt = mock(RMAppAttempt.class); when(mockAttempt.getAppAttemptId()).thenReturn(attemptId); when(mockAttempt.getMasterContainer()).thenReturn(container); + when(mockAttempt.getApplicationToken()).thenReturn(appToken); + when(mockAttempt.getClientToken()).thenReturn(clientToken); dispatcher.attemptId = attemptId; dispatcher.storedException = null; store.storeApplicationAttempt(mockAttempt); @@ -201,30 +216,58 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, return container.getId(); } + @SuppressWarnings("unchecked") void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { long submitTime = System.currentTimeMillis(); + Configuration conf = new YarnConfiguration(); RMStateStore store = stateStoreHelper.getRMStateStore(); TestDispatcher dispatcher = new TestDispatcher(); store.setDispatcher(dispatcher); + ApplicationTokenSecretManager appTokenMgr = + new ApplicationTokenSecretManager(conf); + ClientToAMTokenSecretManagerInRM clientTokenMgr = + new ClientToAMTokenSecretManagerInRM(); + ApplicationAttemptId attemptId1 = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0001_000001"); ApplicationId appId1 = attemptId1.getApplicationId(); storeApp(store, appId1, submitTime); + + // create application token1 for attempt1 + List> appAttemptToken1 = + generateTokens(attemptId1, appTokenMgr, clientTokenMgr, conf); + HashSet> attemptTokenSet1 = new HashSet>(); + attemptTokenSet1.addAll(appAttemptToken1); + ContainerId containerId1 = storeAttempt(store, attemptId1, - "container_1352994193343_0001_01_000001", dispatcher); + "container_1352994193343_0001_01_000001", + (Token) (appAttemptToken1.get(0)), + (Token)(appAttemptToken1.get(1)), + dispatcher); + String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; ApplicationAttemptId attemptId2 = - ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + + // create application token2 for attempt2 + List> appAttemptToken2 = + generateTokens(attemptId2, appTokenMgr, clientTokenMgr, conf); + HashSet> attemptTokenSet2 = new HashSet>(); + attemptTokenSet2.addAll(appAttemptToken2); + ContainerId containerId2 = storeAttempt(store, attemptId2, - "container_1352994193343_0001_02_000001", dispatcher); + "container_1352994193343_0001_02_000001", + (Token) (appAttemptToken2.get(0)), + (Token)(appAttemptToken2.get(1)), + dispatcher); ApplicationAttemptId attemptIdRemoved = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); storeApp(store, appIdRemoved, submitTime); storeAttempt(store, attemptIdRemoved, - "container_1352994193343_0002_01_000001", dispatcher); + "container_1352994193343_0002_01_000001", null, null, dispatcher); RMApp mockRemovedApp = mock(RMApp.class); HashMap attempts = @@ -268,12 +311,21 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { assertEquals(attemptId1, attemptState.getAttemptId()); // attempt1 container is loaded correctly assertEquals(containerId1, attemptState.getMasterContainer().getId()); + // attempt1 applicationToken is loaded correctly + HashSet> savedTokens = new HashSet>(); + savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens()); + assertEquals(attemptTokenSet1, savedTokens); + attemptState = appState.getAttempt(attemptId2); // attempt2 is loaded correctly assertNotNull(attemptState); assertEquals(attemptId2, attemptState.getAttemptId()); // attempt2 container is loaded correctly assertEquals(containerId2, attemptState.getMasterContainer().getId()); + // attempt2 applicationToken is loaded correctly + savedTokens.clear(); + savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens()); + assertEquals(attemptTokenSet2, savedTokens); // assert store is in expected state after everything is cleaned assertTrue(stateStoreHelper.isFinalStateValid()); @@ -281,4 +333,23 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { store.close(); } + private List> generateTokens(ApplicationAttemptId attemptId, + ApplicationTokenSecretManager appTokenMgr, + ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) { + ApplicationTokenIdentifier appTokenId = + new ApplicationTokenIdentifier(attemptId); + Token appToken = + new Token(appTokenId, appTokenMgr); + appToken.setService(new Text("appToken service")); + + ClientTokenIdentifier clientTokenId = new ClientTokenIdentifier(attemptId); + clientTokenMgr.registerApplication(attemptId); + Token clientToken = + new Token(clientTokenId, clientTokenMgr); + clientToken.setService(new Text("clientToken service")); + List> tokenPair = new ArrayList>(); + tokenPair.add(0, appToken); + tokenPair.add(1, clientToken); + return tokenPair; + } }