YARN-638. Modified ResourceManager to restore RMDelegationTokens after restarting. Contributed by Jian He.

svn merge --ignore-ancestry -c 1487720 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1487721 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-30 04:15:05 +00:00
parent dbd1fbb616
commit dfa5f198f0
13 changed files with 909 additions and 74 deletions

View File

@ -204,6 +204,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-714. Added NMTokens to be sent to AMs as part of heart-beat response.
(Omkar Vinit Joshi via vinodkv)
YARN-638. Modified ResourceManager to restore RMDelegationTokens after
restarting. (Jian He via vinodkv)
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -239,7 +239,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
// Register event handler for RMAppManagerEvents
this.rmDispatcher.register(RMAppManagerEventType.class,
this.rmAppManager);
this.rmDTSecretManager = createRMDelegationTokenSecretManager();
this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext);
clientRM = createClientRMService();
addService(clientRM);
@ -666,7 +666,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected RMDelegationTokenSecretManager
createRMDelegationTokenSecretManager() {
createRMDelegationTokenSecretManager(RMContext rmContext) {
long secretKeyInterval =
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@ -678,7 +678,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
return new RMDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000);
tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
}
protected ClientRMService createClientRMService() {
@ -745,6 +745,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
public void recover(RMState state) throws Exception {
// recover RMdelegationTokenSecretManager
rmDTSecretManager.recover(state);
// recover applications
rmAppManager.recover(state);
}

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.List;
@ -33,11 +37,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -57,11 +63,19 @@ public class FileSystemRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
private static final String ROOT_DIR_NAME = "FSRMStateRoot";
private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
private static final String RM_APP_ROOT = "RMAppRoot";
private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
private FileSystem fs;
private Path fsRootDirPath;
private Path rootDirPath;
private Path rmDTSecretManagerRoot;
private Path rmAppRoot;
private Path dtSequenceNumberPath = null;
@VisibleForTesting
Path fsWorkingPath;
@ -70,11 +84,14 @@ public class FileSystemRMStateStore extends RMStateStore {
throws Exception{
fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
fsRootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
// create filesystem
fs = fsWorkingPath.getFileSystem(conf);
fs.mkdirs(fsRootDirPath);
fs.mkdirs(rmDTSecretManagerRoot);
fs.mkdirs(rmAppRoot);
}
@Override
@ -84,15 +101,23 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
public synchronized RMState loadState() throws Exception {
RMState rmState = new RMState();
// recover DelegationTokenSecretManager
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
return rmState;
}
private void loadRMAppState(RMState rmState) throws Exception {
try {
RMState state = new RMState();
FileStatus[] childNodes = fs.listStatus(fsRootDirPath);
FileStatus[] childNodes = fs.listStatus(rmAppRoot);
List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>();
for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
Path childNodePath = getNodePath(childNodeName);
Path childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
// application
@ -107,7 +132,7 @@ public class FileSystemRMStateStore extends RMStateStore {
appStateData.getUser());
// assert child node name is same as actual applicationId
assert appId.equals(appState.context.getApplicationId());
state.appState.put(appId, appState);
rmState.appState.put(appId, appState);
} else if(childNodeName.startsWith(
ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt
@ -139,7 +164,7 @@ public class FileSystemRMStateStore extends RMStateStore {
// go through all attempts and add them to their apps
for(ApplicationAttemptState attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationState appState = state.appState.get(appId);
ApplicationState appState = rmState.appState.get(appId);
if(appState != null) {
appState.attempts.put(attemptState.getAttemptId(), attemptState);
} else {
@ -148,22 +173,49 @@ public class FileSystemRMStateStore extends RMStateStore {
// application attempt nodes
LOG.info("Application node not found for attempt: "
+ attemptState.getAttemptId());
deleteFile(getNodePath(attemptState.getAttemptId().toString()));
deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString()));
}
}
return state;
} catch (Exception e) {
LOG.error("Failed to load state.", e);
throw e;
}
}
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){
DelegationKey key = new DelegationKey();
key.readFields(fsIn);
rmState.rmSecretManagerState.masterKeyState.add(key);
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier();
identifier.readFields(fsIn);
long renewDate = fsIn.readLong();
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate);
} else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
rmState.rmSecretManagerState.dtSequenceNumber =
Integer.parseInt(childNodeName.split("_")[1]);
}else {
LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
}
fsIn.close();
}
}
@Override
public synchronized void storeApplicationState(String appId,
ApplicationStateDataPBImpl appStateDataPB)
throws Exception {
Path nodeCreatePath = getNodePath(appId);
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
Path nodeCreatePath = getNodePath(rmAppRoot, appId);
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@ -179,9 +231,8 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
public synchronized void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
Path nodeCreatePath = getNodePath(attemptId);
ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception {
Path nodeCreatePath = getNodePath(rmAppRoot, attemptId);
LOG.info("Storing info for attempt: " + attemptId
+ " at: " + nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@ -197,9 +248,9 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
public synchronized void removeApplicationState(ApplicationState appState)
throws Exception {
throws Exception {
String appId = appState.getAppId().toString();
Path nodeRemovePath = getNodePath(appId);
Path nodeRemovePath = getNodePath(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath);
for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
@ -208,13 +259,76 @@ public class FileSystemRMStateStore extends RMStateStore {
}
public synchronized void removeApplicationAttemptState(String attemptId)
throws Exception {
Path nodeRemovePath = getNodePath(attemptId);
throws Exception {
Path nodeRemovePath = getNodePath(rmAppRoot, attemptId);
LOG.info("Removing info for attempt: " + attemptId
+ " at: " + nodeRemovePath);
deleteFile(nodeRemovePath);
}
@Override
public synchronized void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier identifier, Long renewDate,
int latestSequenceNumber) throws Exception {
Path nodeCreatePath =
getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
identifier.write(fsOut);
fsOut.writeLong(renewDate);
writeFile(nodeCreatePath, os.toByteArray());
fsOut.close();
// store sequence number
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
+ latestSequenceNumber);
if (dtSequenceNumberPath == null) {
if (!createFile(latestSequenceNumberPath)) {
throw new Exception("Failed to create " + latestSequenceNumberPath);
}
} else {
if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
throw new Exception("Failed to rename " + dtSequenceNumberPath);
}
}
dtSequenceNumberPath = latestSequenceNumberPath;
}
@Override
public synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier identifier) throws Exception {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
deleteFile(nodeCreatePath);
}
@Override
public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
throws Exception {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_KEY_PREFIX + masterKey.getKeyId());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
masterKey.write(fsOut);
writeFile(nodeCreatePath, os.toByteArray());
fsOut.close();
}
@Override
public synchronized void
removeRMDTMasterKeyState(DelegationKey masterKey) throws Exception {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_KEY_PREFIX + masterKey.getKeyId());
LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId());
deleteFile(nodeCreatePath);
}
// FileSystem related code
private void deleteFile(Path deletePath) throws Exception {
@ -228,18 +342,25 @@ public class FileSystemRMStateStore extends RMStateStore {
// state data will not be that "long"
byte[] data = new byte[(int)len];
fsIn.readFully(data);
fsIn.close();
return data;
}
private void writeFile(Path outputPath, byte[] data) throws Exception {
FSDataOutputStream fsOut = fs.create(outputPath, false);
fsOut.write(data);
fsOut.flush();
fsOut.close();
}
@VisibleForTesting
Path getNodePath(String nodeName) {
return new Path(fsRootDirPath, nodeName);
private boolean renameFile(Path src, Path dst) throws Exception {
return fs.rename(src, dst);
}
private boolean createFile(Path newFile) throws Exception {
return fs.createNewFile(newFile);
}
private Path getNodePath(Path root, String nodeName) {
return new Path(root, nodeName);
}
}

View File

@ -19,14 +19,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -49,6 +53,12 @@ public class MemoryRMStateStore extends RMStateStore {
// return a copy of the state to allow for modification of the real state
RMState returnState = new RMState();
returnState.appState.putAll(state.appState);
returnState.rmSecretManagerState.getMasterKeyState()
.addAll(state.rmSecretManagerState.getMasterKeyState());
returnState.rmSecretManagerState.getTokenState().putAll(
state.rmSecretManagerState.getTokenState());
returnState.rmSecretManagerState.dtSequenceNumber =
state.rmSecretManagerState.dtSequenceNumber;
return returnState;
}
@ -113,4 +123,53 @@ public class MemoryRMStateStore extends RMStateStore {
ApplicationState removed = state.appState.remove(appId);
assert removed != null;
}
@Override
public synchronized void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
Map<RMDelegationTokenIdentifier, Long> rmDTState =
state.rmSecretManagerState.getTokenState();
if (rmDTState.containsKey(rmDTIdentifier)) {
IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier
+ "is already stored.");
LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e);
throw e;
}
rmDTState.put(rmDTIdentifier, renewDate);
state.rmSecretManagerState.dtSequenceNumber = latestSequenceNumber;
}
@Override
public synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception{
Map<RMDelegationTokenIdentifier, Long> rmDTState =
state.rmSecretManagerState.getTokenState();
rmDTState.remove(rmDTIdentifier);
}
@Override
public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception {
Set<DelegationKey> rmDTMasterKeyState =
state.rmSecretManagerState.getMasterKeyState();
if (rmDTMasterKeyState.contains(delegationKey)) {
IOException e = new IOException("RMDTMasterKey with keyID: "
+ delegationKey.getKeyId() + " is already stored");
LOG.info("Error storing info for RMDTMasterKey with keyID: "
+ delegationKey.getKeyId(), e);
throw e;
}
state.getRMDTSecretManagerState().getMasterKeyState().add(delegationKey);
LOG.info("rmDTMasterKeyState SIZE: " + rmDTMasterKeyState.size());
}
@Override
public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception {
Set<DelegationKey> rmDTMasterKeyState =
state.rmSecretManagerState.getMasterKeyState();
rmDTMasterKeyState.remove(delegationKey);
}
}

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@Unstable
public class NullRMStateStore extends RMStateStore {
@ -59,4 +62,26 @@ public class NullRMStateStore extends RMStateStore {
// Do nothing
}
@Override
public void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
// Do nothing
}
@Override
public void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier)
throws Exception {
// Do nothing
}
@Override
public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
// Do nothing
}
@Override
public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
// Do nothing
}
}

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -30,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -57,7 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
*/
public abstract class RMStateStore {
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
/**
* State of an application attempt
*/
@ -121,17 +125,46 @@ public abstract class RMStateStore {
return user;
}
}
public static class RMDTSecretManagerState {
// DTIdentifier -> renewDate
Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
new HashMap<RMDelegationTokenIdentifier, Long>();
Set<DelegationKey> masterKeyState =
new HashSet<DelegationKey>();
int dtSequenceNumber = 0;
public Map<RMDelegationTokenIdentifier, Long> getTokenState() {
return delegationTokenState;
}
public Set<DelegationKey> getMasterKeyState() {
return masterKeyState;
}
public int getDTSequenceNumber() {
return dtSequenceNumber;
}
}
/**
* State of the ResourceManager
*/
public static class RMState {
Map<ApplicationId, ApplicationState> appState =
new HashMap<ApplicationId, ApplicationState>();
Map<ApplicationId, ApplicationState> appState =
new HashMap<ApplicationId, ApplicationState>();
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
public Map<ApplicationId, ApplicationState> getApplicationState() {
return appState;
}
public RMDTSecretManagerState getRMDTSecretManagerState() {
return rmSecretManagerState;
}
}
private Dispatcher rmDispatcher;
@ -235,8 +268,76 @@ public abstract class RMStateStore {
protected abstract void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception;
/**
* RMDTSecretManager call this to store the state of a delegation token
* and sequence number
*/
public synchronized void storeRMDelegationTokenAndSequenceNumber(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
latestSequenceNumber);
}
/**
* Blocking API
* Derived classes must implement this method to store the state of
* RMDelegationToken and sequence number
*/
protected abstract void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception;
/**
* RMDTSecretManager call this to remove the state of a delegation token
*/
public synchronized void removeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber)
throws Exception {
removeRMDelegationTokenState(rmDTIdentifier);
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of RMDelegationToken
*/
protected abstract void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception;
/**
* RMDTSecretManager call this to store the state of a master key
*/
public synchronized void storeRMDTMasterKey(DelegationKey delegationKey)
throws Exception {
storeRMDTMasterKeyState(delegationKey);
}
/**
* Blocking API
* Derived classes must implement this method to store the state of
* DelegationToken Master Key
*/
protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception;
/**
* RMDTSecretManager call this to remove the state of a master key
*/
public synchronized void removeRMDTMasterKey(DelegationKey delegationKey)
throws Exception {
removeRMDTMasterKeyState(delegationKey);
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of
* DelegationToken Master Key
*/
protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception;
/**
* Non-blocking API
* ResourceManager services call this to remove an application from the state

View File

@ -18,10 +18,26 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import com.google.common.annotations.VisibleForTesting;
/**
* A ResourceManager specific delegation token secret manager.
@ -30,8 +46,13 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RMDelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> {
public class RMDelegationTokenSecretManager extends
AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> implements
Recoverable {
private static final Log LOG = LogFactory
.getLog(RMDelegationTokenSecretManager.class);
protected final RMContext rmContext;
/**
* Create a secret manager
@ -46,13 +67,132 @@ public class RMDelegationTokenSecretManager
public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) {
long delegationTokenRemoverScanInterval,
RMContext rmContext) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.rmContext = rmContext;
}
@Override
public RMDelegationTokenIdentifier createIdentifier() {
return new RMDelegationTokenIdentifier();
}
@Override
protected void storeNewMasterKey(DelegationKey newKey) {
try {
LOG.info("storing master key with keyID " + newKey.getKeyId());
rmContext.getStateStore().storeRMDTMasterKey(newKey);
} catch (Exception e) {
LOG.error("Error in storing master key with KeyID: " + newKey.getKeyId());
ExitUtil.terminate(1, e);
}
}
@Override
protected void removeStoredMasterKey(DelegationKey key) {
try {
LOG.info("removing master key with keyID " + key.getKeyId());
rmContext.getStateStore().removeRMDTMasterKey(key);
} catch (Exception e) {
LOG.error("Error in removing master key with KeyID: " + key.getKeyId());
ExitUtil.terminate(1, e);
}
}
@Override
protected void storeNewToken(RMDelegationTokenIdentifier identifier,
long renewDate) {
try {
LOG.info("storing RMDelegation token with sequence number: "
+ identifier.getSequenceNumber());
rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(
identifier, renewDate, identifier.getSequenceNumber());
} catch (Exception e) {
LOG.error("Error in storing RMDelegationToken with sequence number: "
+ identifier.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
@Override
protected void updateStoredToken(RMDelegationTokenIdentifier id,
long renewDate) {
try {
LOG.info("updating RMDelegation token with sequence number: "
+ id.getSequenceNumber());
rmContext.getStateStore().removeRMDelegationToken(id,
delegationTokenSequenceNumber);
rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id,
renewDate, id.getSequenceNumber());
} catch (Exception e) {
LOG.error("Error in updating persisted RMDelegationToken with sequence number: "
+ id.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
@Override
protected void removeStoredToken(RMDelegationTokenIdentifier ident)
throws IOException {
try {
LOG.info("removing RMDelegation token with sequence number: "
+ ident.getSequenceNumber());
rmContext.getStateStore().removeRMDelegationToken(ident,
delegationTokenSequenceNumber);
} catch (Exception e) {
LOG.error("Error in removing RMDelegationToken with sequence number: "
+ ident.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
@Private
@VisibleForTesting
public synchronized Set<DelegationKey> getAllMasterKeys() {
HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
keySet.addAll(allKeys.values());
return keySet;
}
@Private
@VisibleForTesting
public synchronized Map<RMDelegationTokenIdentifier, Long> getAllTokens() {
Map<RMDelegationTokenIdentifier, Long> allTokens =
new HashMap<RMDelegationTokenIdentifier, Long>();
for (Map.Entry<RMDelegationTokenIdentifier,
DelegationTokenInformation> entry : currentTokens.entrySet()) {
allTokens.put(entry.getKey(), entry.getValue().getRenewDate());
}
return allTokens;
}
@Private
@VisibleForTesting
public int getLatestDTSequenceNumber() {
return delegationTokenSequenceNumber;
}
@Override
public void recover(RMState rmState) throws Exception {
LOG.info("recovering RMDelegationTokenSecretManager.");
// recover RMDTMasterKeys
for (DelegationKey dtKey : rmState.getRMDTSecretManagerState()
.getMasterKeyState()) {
addKey(dtKey);
}
// recover RMDelegationTokens
Map<RMDelegationTokenIdentifier, Long> rmDelegationTokens =
rmState.getRMDTSecretManagerState().getTokenState();
this.delegationTokenSequenceNumber =
rmState.getRMDTSecretManagerState().getDTSequenceNumber();
for (Map.Entry<RMDelegationTokenIdentifier, Long> entry : rmDelegationTokens
.entrySet()) {
addPersistedDelegationToken(entry.getKey(), entry.getValue());
}
}
}

View File

@ -291,7 +291,7 @@ public class MockRM extends ResourceManager {
@Override
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
rmAppManager, applicationACLsManager, null) {
rmAppManager, applicationACLsManager, rmDTSecretManager) {
@Override
public void start() {
// override to not start rpc handler

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -90,7 +91,9 @@ public class TestClientRMService {
@BeforeClass
public static void setupSecretManager() throws IOException {
dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext);
dtsm.startThreads();
}

View File

@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetAddress;
@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -454,10 +456,15 @@ public class TestClientRMTokens {
return mockSched;
}
private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager(
long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) {
RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager(
secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000);
private static RMDelegationTokenSecretManager
createRMDelegationTokenSecretManager(long secretKeyInterval,
long tokenMaxLifetime, long tokenRenewInterval) {
RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
RMDelegationTokenSecretManager rmDtSecretManager =
new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime,
tokenRenewInterval, 3600000, rmContext);
return rmDtSecretManager;
}
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -25,6 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -33,14 +37,18 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@ -58,24 +66,32 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestRMRestart {
@Test
public void testRMRestart() throws Exception {
private YarnConfiguration conf;
@Before
public void setup() {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf = new YarnConfiguration();
UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
}
@Test
public void testRMRestart() throws Exception {
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -331,13 +347,6 @@ public class TestRMRestart {
@Test
public void testRMRestartOnMaxAppAttempts() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -411,13 +420,6 @@ public class TestRMRestart {
@Test
public void testDelegationTokenRestoredInDelegationTokenRenewer()
throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
@ -496,13 +498,6 @@ public class TestRMRestart {
@Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
@ -577,7 +572,142 @@ public class TestRMRestart {
rm2.stop();
}
class TestSecurityMockRM extends MockRM {
@Test
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
Map<RMDelegationTokenIdentifier, Long> rmDTState =
rmState.getRMDTSecretManagerState().getTokenState();
Set<DelegationKey> rmDTMasterKeyState =
rmState.getRMDTSecretManagerState().getMasterKeyState();
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
// create an empty credential
Credentials ts = new Credentials();
// request a token and add into credential
GetDelegationTokenRequest request1 = mock(GetDelegationTokenRequest.class);
when(request1.getRenewer()).thenReturn("renewer1");
GetDelegationTokenResponse response1 =
rm1.getClientRMService().getDelegationToken(request1);
DelegationToken delegationToken1 = response1.getRMDelegationToken();
Token<RMDelegationTokenIdentifier> token1 =
ProtoUtils.convertFromProtoFormat(delegationToken1, null);
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
HashSet<RMDelegationTokenIdentifier> tokenIdentSet =
new HashSet<RMDelegationTokenIdentifier>();
ts.addToken(token1.getService(), token1);
tokenIdentSet.add(dtId1);
// submit an app with customized credential
RMApp app = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
// assert app info is saved
ApplicationState appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
// assert all master keys are saved
Set<DelegationKey> allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys();
Assert.assertEquals(allKeysRM1, rmDTMasterKeyState);
// assert all tokens are saved
Map<RMDelegationTokenIdentifier, Long> allTokensRM1 =
rm1.getRMDTSecretManager().getAllTokens();
Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet());
Assert.assertEquals(allTokensRM1, rmDTState);
// assert sequence number is saved
Assert.assertEquals(
rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
rmState.getRMDTSecretManagerState().getDTSequenceNumber());
// request one more token
GetDelegationTokenRequest request2 = mock(GetDelegationTokenRequest.class);
when(request2.getRenewer()).thenReturn("renewer2");
GetDelegationTokenResponse response2 =
rm1.getClientRMService().getDelegationToken(request2);
DelegationToken delegationToken2 = response2.getRMDelegationToken();
Token<RMDelegationTokenIdentifier> token2 =
ProtoUtils.convertFromProtoFormat(delegationToken2, null);
RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier();
// cancel token2
try{
rm1.getRMDTSecretManager().cancelToken(token2,
UserGroupInformation.getCurrentUser().getUserName());
} catch(Exception e) {
Assert.fail();
}
// Assert the token which has the latest delegationTokenSequenceNumber is removed
Assert.assertEquals(
rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
dtId2.getSequenceNumber());
Assert.assertFalse(rmDTState.containsKey(dtId2));
// start new RM
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
// assert master keys and tokens are populated back to DTSecretManager
Map<RMDelegationTokenIdentifier, Long> allTokensRM2 =
rm2.getRMDTSecretManager().getAllTokens();
Assert.assertEquals(allTokensRM1, allTokensRM2);
// rm2 has its own master keys when it starts, we use containsAll here
Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys()
.containsAll(allKeysRM1));
// assert sequenceNumber is properly recovered,
// even though the token which has max sequenceNumber is not stored
Assert.assertEquals(rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
rm2.getRMDTSecretManager().getLatestDTSequenceNumber());
// renewDate before renewing
Long renewDateBeforeRenew = allTokensRM2.get(dtId1);
try{
// renew recovered token
rm2.getRMDTSecretManager().renewToken(token1, "renewer1");
} catch(Exception e) {
Assert.fail();
}
allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
Long renewDateAfterRenew = allTokensRM2.get(dtId1);
// assert token is renewed
Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew);
// assert new token is added into state store
Assert.assertTrue(rmDTState.containsValue(renewDateAfterRenew));
// assert old token is removed from state store
Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew));
try{
rm2.getRMDTSecretManager().cancelToken(token1,
UserGroupInformation.getCurrentUser().getUserName());
} catch(Exception e) {
Assert.fail();
}
// assert token is removed from state after its cancelled
allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
Assert.assertFalse(allTokensRM2.containsKey(dtId1));
Assert.assertFalse(rmDTState.containsKey(dtId1));
// stop the RM
rm1.stop();
rm2.stop();
}
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
super(conf, store);

View File

@ -31,6 +31,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -53,8 +56,10 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -111,7 +116,8 @@ public class TestRMStateStore {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
testRMStateStore(fsTester);
testRMAppStateStore(fsTester);
testRMDTSecretManagerStateStore(fsTester);
} finally {
cluster.shutdown();
}
@ -218,7 +224,7 @@ public class TestRMStateStore {
}
@SuppressWarnings("unchecked")
void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
long submitTime = System.currentTimeMillis();
Configuration conf = new YarnConfiguration();
RMStateStore store = stateStoreHelper.getRMStateStore();
@ -334,6 +340,37 @@ public class TestRMStateStore {
store.close();
}
public void testRMDTSecretManagerStateStore(
RMStateStoreHelper stateStoreHelper) throws Exception {
RMStateStore store = stateStoreHelper.getRMStateStore();
TestDispatcher dispatcher = new TestDispatcher();
store.setDispatcher(dispatcher);
// store RM delegation token;
RMDelegationTokenIdentifier dtId1 =
new RMDelegationTokenIdentifier(new Text("owner1"),
new Text("renewer1"), new Text("realuser1"));
Long renewDate1 = new Long(System.currentTimeMillis());
int sequenceNumber = 1111;
store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
sequenceNumber);
Map<RMDelegationTokenIdentifier, Long> token1 =
new HashMap<RMDelegationTokenIdentifier, Long>();
token1.put(dtId1, renewDate1);
// store delegation key;
DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes());
HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
keySet.add(key);
store.storeRMDTMasterKey(key);
RMDTSecretManagerState secretManagerState =
store.loadState().getRMDTSecretManagerState();
Assert.assertEquals(token1, secretManagerState.getTokenState());
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber());
}
private List<Token<?>> generateTokens(ApplicationAttemptId attemptId,
ApplicationTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) {

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.security;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestRMDelegationTokens {
private YarnConfiguration conf;
@Before
public void setup() {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
conf = new YarnConfiguration();
UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
}
@Test(timeout = 15000)
public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<RMDelegationTokenIdentifier, Long> rmDTState =
rmState.getRMDTSecretManagerState().getTokenState();
Set<DelegationKey> rmDTMasterKeyState =
rmState.getRMDTSecretManagerState().getMasterKeyState();
MockRM rm1 = new MyMockRM(conf, memStore);
rm1.start();
// on rm start, two master keys are created.
// One is created at RMDTSecretMgr.startThreads.updateCurrentKey();
// the other is created on the first run of
// tokenRemoverThread.rollMasterKey()
RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
// assert all master keys are saved
Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState);
Set<DelegationKey> expiringKeys = new HashSet<DelegationKey>();
expiringKeys.addAll(dtSecretManager.getAllMasterKeys());
// record the current key
DelegationKey oldCurrentKey =
((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey();
// request to generate a RMDelegationToken
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
when(request.getRenewer()).thenReturn("renewer1");
GetDelegationTokenResponse response =
rm1.getClientRMService().getDelegationToken(request);
DelegationToken delegationToken = response.getRMDelegationToken();
Token<RMDelegationTokenIdentifier> token1 =
ProtoUtils.convertFromProtoFormat(delegationToken, null);
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
// wait for the first rollMasterKey
while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
.get() < 1){
Thread.sleep(200);
}
// assert old-current-key and new-current-key exist
Assert.assertTrue(rmDTMasterKeyState.contains(oldCurrentKey));
DelegationKey newCurrentKey =
((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey();
Assert.assertTrue(rmDTMasterKeyState.contains(newCurrentKey));
// wait for token to expire
// rollMasterKey is called every 1 second.
while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
.get() < 6) {
Thread.sleep(200);
}
Assert.assertFalse(rmDTState.containsKey(dtId1));
rm1.stop();
}
@Test(timeout = 15000)
public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Set<DelegationKey> rmDTMasterKeyState =
rmState.getRMDTSecretManagerState().getMasterKeyState();
MockRM rm1 = new MyMockRM(conf, memStore);
rm1.start();
RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
// assert all master keys are saved
Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState);
Set<DelegationKey> expiringKeys = new HashSet<DelegationKey>();
expiringKeys.addAll(dtSecretManager.getAllMasterKeys());
// wait for expiringKeys to expire
while (true) {
boolean allExpired = true;
for (DelegationKey key : expiringKeys) {
if (rmDTMasterKeyState.contains(key)) {
allExpired = false;
}
}
if (allExpired)
break;
Thread.sleep(500);
}
}
class MyMockRM extends TestSecurityMockRM {
public MyMockRM(Configuration conf, RMStateStore store) {
super(conf, store);
}
@Override
protected RMDelegationTokenSecretManager
createRMDelegationTokenSecretManager(RMContext rmContext) {
// KeyUpdateInterval-> 1 seconds
// TokenMaxLifetime-> 2 seconds.
return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
rmContext);
}
}
public class TestRMDelegationTokenSecretManager extends
RMDelegationTokenSecretManager {
public AtomicInteger numUpdatedKeys = new AtomicInteger(0);
public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval, RMContext rmContext) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval,
rmContext);
}
@Override
protected void storeNewMasterKey(DelegationKey newKey) {
super.storeNewMasterKey(newKey);
numUpdatedKeys.incrementAndGet();
}
public DelegationKey getCurrentKey() {
for (int keyId : allKeys.keySet()) {
if (keyId == currentId) {
return allKeys.get(keyId);
}
}
return null;
}
}
}