From dfa5f198f0dc8eabf11cca3918d46362e24a8506 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 30 May 2013 04:15:05 +0000 Subject: [PATCH] YARN-638. Modified ResourceManager to restore RMDelegationTokens after restarting. Contributed by Jian He. svn merge --ignore-ancestry -c 1487720 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1487721 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/ResourceManager.java | 9 +- .../recovery/FileSystemRMStateStore.java | 173 ++++++++++++--- .../recovery/MemoryRMStateStore.java | 59 +++++ .../recovery/NullRMStateStore.java | 25 +++ .../recovery/RMStateStore.java | 115 +++++++++- .../RMDelegationTokenSecretManager.java | 146 ++++++++++++- .../yarn/server/resourcemanager/MockRM.java | 2 +- .../resourcemanager/TestClientRMService.java | 5 +- .../resourcemanager/TestClientRMTokens.java | 15 +- .../server/resourcemanager/TestRMRestart.java | 184 +++++++++++++--- .../recovery/TestRMStateStore.java | 41 +++- .../security/TestRMDelegationTokens.java | 206 ++++++++++++++++++ 13 files changed, 909 insertions(+), 74 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 90782d8fe3b..b8c3b82021c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -204,6 +204,9 @@ Release 2.0.5-beta - UNRELEASED YARN-714. Added NMTokens to be sent to AMs as part of heart-beat response. (Omkar Vinit Joshi via vinodkv) + YARN-638. Modified ResourceManager to restore RMDelegationTokens after + restarting. (Jian He via vinodkv) + OPTIMIZATIONS YARN-512. Log aggregation root directory check is more expensive than it diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e923b1e933b..b8208a29518 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -239,7 +239,7 @@ public class ResourceManager extends CompositeService implements Recoverable { // Register event handler for RMAppManagerEvents this.rmDispatcher.register(RMAppManagerEventType.class, this.rmAppManager); - this.rmDTSecretManager = createRMDelegationTokenSecretManager(); + this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext); clientRM = createClientRMService(); addService(clientRM); @@ -666,7 +666,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected RMDelegationTokenSecretManager - createRMDelegationTokenSecretManager() { + createRMDelegationTokenSecretManager(RMContext rmContext) { long secretKeyInterval = conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY, YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); @@ -678,7 +678,7 @@ public class ResourceManager extends CompositeService implements Recoverable { YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); return new RMDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, tokenRenewInterval, 3600000); + tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext); } protected ClientRMService createClientRMService() { @@ -745,6 +745,9 @@ public class ResourceManager extends CompositeService implements Recoverable { @Override public void recover(RMState state) throws Exception { + // recover RMdelegationTokenSecretManager + rmDTSecretManager.recover(state); + // recover applications rmAppManager.recover(state); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index db85c535b37..8da33737351 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.util.ArrayList; import java.util.List; @@ -33,11 +37,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -57,11 +63,19 @@ public class FileSystemRMStateStore extends RMStateStore { public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); private static final String ROOT_DIR_NAME = "FSRMStateRoot"; - + private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot"; + private static final String RM_APP_ROOT = "RMAppRoot"; + private static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; + private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; + private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = + "RMDTSequenceNumber_"; private FileSystem fs; - private Path fsRootDirPath; + private Path rootDirPath; + private Path rmDTSecretManagerRoot; + private Path rmAppRoot; + private Path dtSequenceNumberPath = null; @VisibleForTesting Path fsWorkingPath; @@ -70,11 +84,14 @@ public class FileSystemRMStateStore extends RMStateStore { throws Exception{ fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI)); - fsRootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT); + rmAppRoot = new Path(rootDirPath, RM_APP_ROOT); // create filesystem fs = fsWorkingPath.getFileSystem(conf); - fs.mkdirs(fsRootDirPath); + fs.mkdirs(rmDTSecretManagerRoot); + fs.mkdirs(rmAppRoot); } @Override @@ -84,15 +101,23 @@ public class FileSystemRMStateStore extends RMStateStore { @Override public synchronized RMState loadState() throws Exception { + RMState rmState = new RMState(); + // recover DelegationTokenSecretManager + loadRMDTSecretManagerState(rmState); + // recover RM applications + loadRMAppState(rmState); + return rmState; + } + + private void loadRMAppState(RMState rmState) throws Exception { try { - RMState state = new RMState(); - FileStatus[] childNodes = fs.listStatus(fsRootDirPath); + FileStatus[] childNodes = fs.listStatus(rmAppRoot); List attempts = new ArrayList(); for(FileStatus childNodeStatus : childNodes) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); - Path childNodePath = getNodePath(childNodeName); + Path childNodePath = getNodePath(rmAppRoot, childNodeName); byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){ // application @@ -107,7 +132,7 @@ public class FileSystemRMStateStore extends RMStateStore { appStateData.getUser()); // assert child node name is same as actual applicationId assert appId.equals(appState.context.getApplicationId()); - state.appState.put(appId, appState); + rmState.appState.put(appId, appState); } else if(childNodeName.startsWith( ApplicationAttemptId.appAttemptIdStrPrefix)) { // attempt @@ -139,7 +164,7 @@ public class FileSystemRMStateStore extends RMStateStore { // go through all attempts and add them to their apps for(ApplicationAttemptState attemptState : attempts) { ApplicationId appId = attemptState.getAttemptId().getApplicationId(); - ApplicationState appState = state.appState.get(appId); + ApplicationState appState = rmState.appState.get(appId); if(appState != null) { appState.attempts.put(attemptState.getAttemptId(), attemptState); } else { @@ -148,22 +173,49 @@ public class FileSystemRMStateStore extends RMStateStore { // application attempt nodes LOG.info("Application node not found for attempt: " + attemptState.getAttemptId()); - deleteFile(getNodePath(attemptState.getAttemptId().toString())); + deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString())); } } - - return state; } catch (Exception e) { LOG.error("Failed to load state.", e); throw e; } } + private void loadRMDTSecretManagerState(RMState rmState) throws Exception { + FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot); + + for(FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName); + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ByteArrayInputStream is = new ByteArrayInputStream(childData); + DataInputStream fsIn = new DataInputStream(is); + if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){ + DelegationKey key = new DelegationKey(); + key.readFields(fsIn); + rmState.rmSecretManagerState.masterKeyState.add(key); + } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(); + identifier.readFields(fsIn); + long renewDate = fsIn.readLong(); + rmState.rmSecretManagerState.delegationTokenState.put(identifier, + renewDate); + } else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { + rmState.rmSecretManagerState.dtSequenceNumber = + Integer.parseInt(childNodeName.split("_")[1]); + }else { + LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager"); + } + fsIn.close(); + } + } + @Override public synchronized void storeApplicationState(String appId, - ApplicationStateDataPBImpl appStateDataPB) - throws Exception { - Path nodeCreatePath = getNodePath(appId); + ApplicationStateDataPBImpl appStateDataPB) throws Exception { + Path nodeCreatePath = getNodePath(rmAppRoot, appId); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -179,9 +231,8 @@ public class FileSystemRMStateStore extends RMStateStore { @Override public synchronized void storeApplicationAttemptState(String attemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) - throws Exception { - Path nodeCreatePath = getNodePath(attemptId); + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { + Path nodeCreatePath = getNodePath(rmAppRoot, attemptId); LOG.info("Storing info for attempt: " + attemptId + " at: " + nodeCreatePath); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); @@ -197,9 +248,9 @@ public class FileSystemRMStateStore extends RMStateStore { @Override public synchronized void removeApplicationState(ApplicationState appState) - throws Exception { + throws Exception { String appId = appState.getAppId().toString(); - Path nodeRemovePath = getNodePath(appId); + Path nodeRemovePath = getNodePath(rmAppRoot, appId); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); for(ApplicationAttemptId attemptId : appState.attempts.keySet()) { @@ -208,13 +259,76 @@ public class FileSystemRMStateStore extends RMStateStore { } public synchronized void removeApplicationAttemptState(String attemptId) - throws Exception { - Path nodeRemovePath = getNodePath(attemptId); + throws Exception { + Path nodeRemovePath = getNodePath(rmAppRoot, attemptId); LOG.info("Removing info for attempt: " + attemptId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); } + @Override + public synchronized void storeRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier identifier, Long renewDate, + int latestSequenceNumber) throws Exception { + Path nodeCreatePath = + getNodePath(rmDTSecretManagerRoot, + DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os); + LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); + identifier.write(fsOut); + fsOut.writeLong(renewDate); + writeFile(nodeCreatePath, os.toByteArray()); + fsOut.close(); + + // store sequence number + Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot, + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber); + LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + + latestSequenceNumber); + if (dtSequenceNumberPath == null) { + if (!createFile(latestSequenceNumberPath)) { + throw new Exception("Failed to create " + latestSequenceNumberPath); + } + } else { + if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) { + throw new Exception("Failed to rename " + dtSequenceNumberPath); + } + } + dtSequenceNumberPath = latestSequenceNumberPath; + } + + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier identifier) throws Exception { + Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, + DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); + deleteFile(nodeCreatePath); + } + + @Override + public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey) + throws Exception { + Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, + DELEGATION_KEY_PREFIX + masterKey.getKeyId()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os); + LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId()); + masterKey.write(fsOut); + writeFile(nodeCreatePath, os.toByteArray()); + fsOut.close(); + } + + @Override + public synchronized void + removeRMDTMasterKeyState(DelegationKey masterKey) throws Exception { + Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, + DELEGATION_KEY_PREFIX + masterKey.getKeyId()); + LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId()); + deleteFile(nodeCreatePath); + } + // FileSystem related code private void deleteFile(Path deletePath) throws Exception { @@ -228,18 +342,25 @@ public class FileSystemRMStateStore extends RMStateStore { // state data will not be that "long" byte[] data = new byte[(int)len]; fsIn.readFully(data); + fsIn.close(); return data; } private void writeFile(Path outputPath, byte[] data) throws Exception { FSDataOutputStream fsOut = fs.create(outputPath, false); fsOut.write(data); - fsOut.flush(); fsOut.close(); } - @VisibleForTesting - Path getNodePath(String nodeName) { - return new Path(fsRootDirPath, nodeName); + private boolean renameFile(Path src, Path dst) throws Exception { + return fs.rename(src, dst); + } + + private boolean createFile(Path newFile) throws Exception { + return fs.createNewFile(newFile); + } + + private Path getNodePath(Path root, String nodeName) { + return new Path(root, nodeName); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index f9b3c2f36d1..0f18b159b9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -19,14 +19,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import java.io.IOException; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -49,6 +53,12 @@ public class MemoryRMStateStore extends RMStateStore { // return a copy of the state to allow for modification of the real state RMState returnState = new RMState(); returnState.appState.putAll(state.appState); + returnState.rmSecretManagerState.getMasterKeyState() + .addAll(state.rmSecretManagerState.getMasterKeyState()); + returnState.rmSecretManagerState.getTokenState().putAll( + state.rmSecretManagerState.getTokenState()); + returnState.rmSecretManagerState.dtSequenceNumber = + state.rmSecretManagerState.dtSequenceNumber; return returnState; } @@ -113,4 +123,53 @@ public class MemoryRMStateStore extends RMStateStore { ApplicationState removed = state.appState.remove(appId); assert removed != null; } + + @Override + public synchronized void storeRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + Map rmDTState = + state.rmSecretManagerState.getTokenState(); + if (rmDTState.containsKey(rmDTIdentifier)) { + IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier + + "is already stored."); + LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e); + throw e; + } + rmDTState.put(rmDTIdentifier, renewDate); + state.rmSecretManagerState.dtSequenceNumber = latestSequenceNumber; + } + + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception{ + Map rmDTState = + state.rmSecretManagerState.getTokenState(); + rmDTState.remove(rmDTIdentifier); + } + + @Override + public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey) + throws Exception { + Set rmDTMasterKeyState = + state.rmSecretManagerState.getMasterKeyState(); + + if (rmDTMasterKeyState.contains(delegationKey)) { + IOException e = new IOException("RMDTMasterKey with keyID: " + + delegationKey.getKeyId() + " is already stored"); + LOG.info("Error storing info for RMDTMasterKey with keyID: " + + delegationKey.getKeyId(), e); + throw e; + } + state.getRMDTSecretManagerState().getMasterKeyState().add(delegationKey); + LOG.info("rmDTMasterKeyState SIZE: " + rmDTMasterKeyState.size()); + } + + @Override + public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey) + throws Exception { + Set rmDTMasterKeyState = + state.rmSecretManagerState.getMasterKeyState(); + rmDTMasterKeyState.remove(delegationKey); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 29bdbb03ca4..4abd256731d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -18,10 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; + import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @Unstable public class NullRMStateStore extends RMStateStore { @@ -59,4 +62,26 @@ public class NullRMStateStore extends RMStateStore { // Do nothing } + @Override + public void storeRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + // Do nothing + } + + @Override + public void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier) + throws Exception { + // Do nothing + } + + @Override + public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { + // Do nothing + } + + @Override + public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { + // Do nothing + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index cb4b5099b0c..066502289fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -57,7 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt */ public abstract class RMStateStore { public static final Log LOG = LogFactory.getLog(RMStateStore.class); - + /** * State of an application attempt */ @@ -121,17 +125,46 @@ public abstract class RMStateStore { return user; } } - + + public static class RMDTSecretManagerState { + // DTIdentifier -> renewDate + Map delegationTokenState = + new HashMap(); + + Set masterKeyState = + new HashSet(); + + int dtSequenceNumber = 0; + + public Map getTokenState() { + return delegationTokenState; + } + + public Set getMasterKeyState() { + return masterKeyState; + } + + public int getDTSequenceNumber() { + return dtSequenceNumber; + } + } + /** * State of the ResourceManager */ public static class RMState { - Map appState = - new HashMap(); - + Map appState = + new HashMap(); + + RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState(); + public Map getApplicationState() { return appState; } + + public RMDTSecretManagerState getRMDTSecretManagerState() { + return rmSecretManagerState; + } } private Dispatcher rmDispatcher; @@ -235,8 +268,76 @@ public abstract class RMStateStore { protected abstract void storeApplicationAttemptState(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; - - + + + /** + * RMDTSecretManager call this to store the state of a delegation token + * and sequence number + */ + public synchronized void storeRMDelegationTokenAndSequenceNumber( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate, + latestSequenceNumber); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of + * RMDelegationToken and sequence number + */ + protected abstract void storeRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception; + + /** + * RMDTSecretManager call this to remove the state of a delegation token + */ + public synchronized void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) + throws Exception { + removeRMDelegationTokenState(rmDTIdentifier); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of RMDelegationToken + */ + protected abstract void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception; + + /** + * RMDTSecretManager call this to store the state of a master key + */ + public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) + throws Exception { + storeRMDTMasterKeyState(delegationKey); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of + * DelegationToken Master Key + */ + protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey) + throws Exception; + + /** + * RMDTSecretManager call this to remove the state of a master key + */ + public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) + throws Exception { + removeRMDTMasterKeyState(delegationKey); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of + * DelegationToken Master Key + */ + protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey) + throws Exception; + /** * Non-blocking API * ResourceManager services call this to remove an application from the state diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index f70605cb303..23939defb42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -18,10 +18,26 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; + +import com.google.common.annotations.VisibleForTesting; /** * A ResourceManager specific delegation token secret manager. @@ -30,8 +46,13 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class RMDelegationTokenSecretManager - extends AbstractDelegationTokenSecretManager { +public class RMDelegationTokenSecretManager extends + AbstractDelegationTokenSecretManager implements + Recoverable { + private static final Log LOG = LogFactory + .getLog(RMDelegationTokenSecretManager.class); + + protected final RMContext rmContext; /** * Create a secret manager @@ -46,13 +67,132 @@ public class RMDelegationTokenSecretManager public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval) { + long delegationTokenRemoverScanInterval, + RMContext rmContext) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + this.rmContext = rmContext; } @Override public RMDelegationTokenIdentifier createIdentifier() { return new RMDelegationTokenIdentifier(); } + + @Override + protected void storeNewMasterKey(DelegationKey newKey) { + try { + LOG.info("storing master key with keyID " + newKey.getKeyId()); + rmContext.getStateStore().storeRMDTMasterKey(newKey); + } catch (Exception e) { + LOG.error("Error in storing master key with KeyID: " + newKey.getKeyId()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void removeStoredMasterKey(DelegationKey key) { + try { + LOG.info("removing master key with keyID " + key.getKeyId()); + rmContext.getStateStore().removeRMDTMasterKey(key); + } catch (Exception e) { + LOG.error("Error in removing master key with KeyID: " + key.getKeyId()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void storeNewToken(RMDelegationTokenIdentifier identifier, + long renewDate) { + try { + LOG.info("storing RMDelegation token with sequence number: " + + identifier.getSequenceNumber()); + rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber( + identifier, renewDate, identifier.getSequenceNumber()); + } catch (Exception e) { + LOG.error("Error in storing RMDelegationToken with sequence number: " + + identifier.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void updateStoredToken(RMDelegationTokenIdentifier id, + long renewDate) { + try { + LOG.info("updating RMDelegation token with sequence number: " + + id.getSequenceNumber()); + rmContext.getStateStore().removeRMDelegationToken(id, + delegationTokenSequenceNumber); + rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id, + renewDate, id.getSequenceNumber()); + } catch (Exception e) { + LOG.error("Error in updating persisted RMDelegationToken with sequence number: " + + id.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void removeStoredToken(RMDelegationTokenIdentifier ident) + throws IOException { + try { + LOG.info("removing RMDelegation token with sequence number: " + + ident.getSequenceNumber()); + rmContext.getStateStore().removeRMDelegationToken(ident, + delegationTokenSequenceNumber); + } catch (Exception e) { + LOG.error("Error in removing RMDelegationToken with sequence number: " + + ident.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + + @Private + @VisibleForTesting + public synchronized Set getAllMasterKeys() { + HashSet keySet = new HashSet(); + keySet.addAll(allKeys.values()); + return keySet; + } + + @Private + @VisibleForTesting + public synchronized Map getAllTokens() { + Map allTokens = + new HashMap(); + + for (Map.Entry entry : currentTokens.entrySet()) { + allTokens.put(entry.getKey(), entry.getValue().getRenewDate()); + } + return allTokens; + } + + @Private + @VisibleForTesting + public int getLatestDTSequenceNumber() { + return delegationTokenSequenceNumber; + } + + @Override + public void recover(RMState rmState) throws Exception { + + LOG.info("recovering RMDelegationTokenSecretManager."); + // recover RMDTMasterKeys + for (DelegationKey dtKey : rmState.getRMDTSecretManagerState() + .getMasterKeyState()) { + addKey(dtKey); + } + + // recover RMDelegationTokens + Map rmDelegationTokens = + rmState.getRMDTSecretManagerState().getTokenState(); + this.delegationTokenSequenceNumber = + rmState.getRMDTSecretManagerState().getDTSequenceNumber(); + for (Map.Entry entry : rmDelegationTokens + .entrySet()) { + addPersistedDelegationToken(entry.getKey(), entry.getValue()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 977c7689044..08577c87af3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -291,7 +291,7 @@ public class MockRM extends ResourceManager { @Override protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), getResourceScheduler(), - rmAppManager, applicationACLsManager, null) { + rmAppManager, applicationACLsManager, rmDTSecretManager) { @Override public void start() { // override to not start rpc handler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index e0ee009ae64..93e1125ec6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -90,7 +91,9 @@ public class TestClientRMService { @BeforeClass public static void setupSecretManager() throws IOException { - dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); + dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext); dtsm.startThreads(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java index 0d079f89f8d..9ee72a79c14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.net.InetAddress; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -454,10 +456,15 @@ public class TestClientRMTokens { return mockSched; } - private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager( - long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) { - RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager( - secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000); + private static RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(long secretKeyInterval, + long tokenMaxLifetime, long tokenRenewInterval) { + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); + + RMDelegationTokenSecretManager rmDtSecretManager = + new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, + tokenRenewInterval, 3600000, rmContext); return rmDtSecretManager; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 36b30713b79..a54a277a278 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -25,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -33,14 +37,18 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -58,24 +66,32 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestRMRestart { - - @Test - public void testRMRestart() throws Exception { + + private YarnConfiguration conf; + + @Before + public void setup() { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); + } + + @Test + public void testRMRestart() throws Exception { Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -331,13 +347,6 @@ public class TestRMRestart { @Test public void testRMRestartOnMaxAppAttempts() throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -411,13 +420,6 @@ public class TestRMRestart { @Test public void testDelegationTokenRestoredInDelegationTokenRenewer() throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); @@ -496,13 +498,6 @@ public class TestRMRestart { @Test public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); @@ -577,7 +572,142 @@ public class TestRMRestart { rm2.stop(); } - class TestSecurityMockRM extends MockRM { + @Test + public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + Map rmDTState = + rmState.getRMDTSecretManagerState().getTokenState(); + Set rmDTMasterKeyState = + rmState.getRMDTSecretManagerState().getMasterKeyState(); + + MockRM rm1 = new TestSecurityMockRM(conf, memStore); + rm1.start(); + + // create an empty credential + Credentials ts = new Credentials(); + + // request a token and add into credential + GetDelegationTokenRequest request1 = mock(GetDelegationTokenRequest.class); + when(request1.getRenewer()).thenReturn("renewer1"); + GetDelegationTokenResponse response1 = + rm1.getClientRMService().getDelegationToken(request1); + DelegationToken delegationToken1 = response1.getRMDelegationToken(); + Token token1 = + ProtoUtils.convertFromProtoFormat(delegationToken1, null); + RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier(); + + HashSet tokenIdentSet = + new HashSet(); + ts.addToken(token1.getService(), token1); + tokenIdentSet.add(dtId1); + + // submit an app with customized credential + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, ts); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + + // assert all master keys are saved + Set allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys(); + Assert.assertEquals(allKeysRM1, rmDTMasterKeyState); + + // assert all tokens are saved + Map allTokensRM1 = + rm1.getRMDTSecretManager().getAllTokens(); + Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet()); + Assert.assertEquals(allTokensRM1, rmDTState); + + // assert sequence number is saved + Assert.assertEquals( + rm1.getRMDTSecretManager().getLatestDTSequenceNumber(), + rmState.getRMDTSecretManagerState().getDTSequenceNumber()); + + // request one more token + GetDelegationTokenRequest request2 = mock(GetDelegationTokenRequest.class); + when(request2.getRenewer()).thenReturn("renewer2"); + GetDelegationTokenResponse response2 = + rm1.getClientRMService().getDelegationToken(request2); + DelegationToken delegationToken2 = response2.getRMDelegationToken(); + Token token2 = + ProtoUtils.convertFromProtoFormat(delegationToken2, null); + RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier(); + + // cancel token2 + try{ + rm1.getRMDTSecretManager().cancelToken(token2, + UserGroupInformation.getCurrentUser().getUserName()); + } catch(Exception e) { + Assert.fail(); + } + + // Assert the token which has the latest delegationTokenSequenceNumber is removed + Assert.assertEquals( + rm1.getRMDTSecretManager().getLatestDTSequenceNumber(), + dtId2.getSequenceNumber()); + Assert.assertFalse(rmDTState.containsKey(dtId2)); + + // start new RM + MockRM rm2 = new TestSecurityMockRM(conf, memStore); + rm2.start(); + + // assert master keys and tokens are populated back to DTSecretManager + Map allTokensRM2 = + rm2.getRMDTSecretManager().getAllTokens(); + Assert.assertEquals(allTokensRM1, allTokensRM2); + // rm2 has its own master keys when it starts, we use containsAll here + Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys() + .containsAll(allKeysRM1)); + + // assert sequenceNumber is properly recovered, + // even though the token which has max sequenceNumber is not stored + Assert.assertEquals(rm1.getRMDTSecretManager().getLatestDTSequenceNumber(), + rm2.getRMDTSecretManager().getLatestDTSequenceNumber()); + + // renewDate before renewing + Long renewDateBeforeRenew = allTokensRM2.get(dtId1); + try{ + // renew recovered token + rm2.getRMDTSecretManager().renewToken(token1, "renewer1"); + } catch(Exception e) { + Assert.fail(); + } + + allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + Long renewDateAfterRenew = allTokensRM2.get(dtId1); + // assert token is renewed + Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew); + + // assert new token is added into state store + Assert.assertTrue(rmDTState.containsValue(renewDateAfterRenew)); + // assert old token is removed from state store + Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew)); + + try{ + rm2.getRMDTSecretManager().cancelToken(token1, + UserGroupInformation.getCurrentUser().getUserName()); + } catch(Exception e) { + Assert.fail(); + } + + // assert token is removed from state after its cancelled + allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + Assert.assertFalse(allTokensRM2.containsKey(dtId1)); + Assert.assertFalse(rmDTState.containsKey(dtId1)); + + // stop the RM + rm1.stop(); + rm2.stop(); + } + + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { super(conf, store); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index 67109d65cdb..75b5d9fe788 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -31,6 +31,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import junit.framework.Assert; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -53,8 +56,10 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -111,7 +116,8 @@ public class TestRMStateStore { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); try { TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); - testRMStateStore(fsTester); + testRMAppStateStore(fsTester); + testRMDTSecretManagerStateStore(fsTester); } finally { cluster.shutdown(); } @@ -218,7 +224,7 @@ public class TestRMStateStore { } @SuppressWarnings("unchecked") - void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { + void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { long submitTime = System.currentTimeMillis(); Configuration conf = new YarnConfiguration(); RMStateStore store = stateStoreHelper.getRMStateStore(); @@ -334,6 +340,37 @@ public class TestRMStateStore { store.close(); } + public void testRMDTSecretManagerStateStore( + RMStateStoreHelper stateStoreHelper) throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setDispatcher(dispatcher); + + // store RM delegation token; + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(new Text("owner1"), + new Text("renewer1"), new Text("realuser1")); + Long renewDate1 = new Long(System.currentTimeMillis()); + int sequenceNumber = 1111; + store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1, + sequenceNumber); + Map token1 = + new HashMap(); + token1.put(dtId1, renewDate1); + + // store delegation key; + DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes()); + HashSet keySet = new HashSet(); + keySet.add(key); + store.storeRMDTMasterKey(key); + + RMDTSecretManagerState secretManagerState = + store.loadState().getRMDTSecretManagerState(); + Assert.assertEquals(token1, secretManagerState.getTokenState()); + Assert.assertEquals(keySet, secretManagerState.getMasterKeyState()); + Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber()); + } + private List> generateTokens(ApplicationAttemptId attemptId, ApplicationTokenSecretManager appTokenMgr, ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java new file mode 100644 index 00000000000..e72f7f56770 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java @@ -0,0 +1,206 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.util.ProtoUtils; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestRMDelegationTokens { + + private YarnConfiguration conf; + + @Before + public void setup() { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); + } + + @Test(timeout = 15000) + public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmDTState = + rmState.getRMDTSecretManagerState().getTokenState(); + Set rmDTMasterKeyState = + rmState.getRMDTSecretManagerState().getMasterKeyState(); + + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + // on rm start, two master keys are created. + // One is created at RMDTSecretMgr.startThreads.updateCurrentKey(); + // the other is created on the first run of + // tokenRemoverThread.rollMasterKey() + + RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + // assert all master keys are saved + Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState); + Set expiringKeys = new HashSet(); + expiringKeys.addAll(dtSecretManager.getAllMasterKeys()); + + // record the current key + DelegationKey oldCurrentKey = + ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey(); + + // request to generate a RMDelegationToken + GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class); + when(request.getRenewer()).thenReturn("renewer1"); + GetDelegationTokenResponse response = + rm1.getClientRMService().getDelegationToken(request); + DelegationToken delegationToken = response.getRMDelegationToken(); + Token token1 = + ProtoUtils.convertFromProtoFormat(delegationToken, null); + RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier(); + + // wait for the first rollMasterKey + while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys + .get() < 1){ + Thread.sleep(200); + } + + // assert old-current-key and new-current-key exist + Assert.assertTrue(rmDTMasterKeyState.contains(oldCurrentKey)); + DelegationKey newCurrentKey = + ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey(); + Assert.assertTrue(rmDTMasterKeyState.contains(newCurrentKey)); + + // wait for token to expire + // rollMasterKey is called every 1 second. + while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys + .get() < 6) { + Thread.sleep(200); + } + + Assert.assertFalse(rmDTState.containsKey(dtId1)); + rm1.stop(); + } + + @Test(timeout = 15000) + public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Set rmDTMasterKeyState = + rmState.getRMDTSecretManagerState().getMasterKeyState(); + + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + + // assert all master keys are saved + Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState); + Set expiringKeys = new HashSet(); + expiringKeys.addAll(dtSecretManager.getAllMasterKeys()); + + // wait for expiringKeys to expire + while (true) { + boolean allExpired = true; + for (DelegationKey key : expiringKeys) { + if (rmDTMasterKeyState.contains(key)) { + allExpired = false; + } + } + if (allExpired) + break; + Thread.sleep(500); + } + } + + class MyMockRM extends TestSecurityMockRM { + + public MyMockRM(Configuration conf, RMStateStore store) { + super(conf, store); + } + + @Override + protected RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(RMContext rmContext) { + // KeyUpdateInterval-> 1 seconds + // TokenMaxLifetime-> 2 seconds. + return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000, + rmContext); + } + } + + public class TestRMDelegationTokenSecretManager extends + RMDelegationTokenSecretManager { + public AtomicInteger numUpdatedKeys = new AtomicInteger(0); + + public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval, RMContext rmContext) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval, + rmContext); + } + + @Override + protected void storeNewMasterKey(DelegationKey newKey) { + super.storeNewMasterKey(newKey); + numUpdatedKeys.incrementAndGet(); + } + + public DelegationKey getCurrentKey() { + for (int keyId : allKeys.keySet()) { + if (keyId == currentId) { + return allKeys.get(keyId); + } + } + return null; + } + } + +}