YARN-638. Modified ResourceManager to restore RMDelegationTokens after restarting. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1487720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-30 04:14:26 +00:00
parent 1858c7e52e
commit 29902cd53c
13 changed files with 909 additions and 74 deletions

View File

@ -224,6 +224,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-714. Added NMTokens to be sent to AMs as part of heart-beat response. YARN-714. Added NMTokens to be sent to AMs as part of heart-beat response.
(Omkar Vinit Joshi via vinodkv) (Omkar Vinit Joshi via vinodkv)
YARN-638. Modified ResourceManager to restore RMDelegationTokens after
restarting. (Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -239,7 +239,7 @@ public synchronized void init(Configuration conf) {
// Register event handler for RMAppManagerEvents // Register event handler for RMAppManagerEvents
this.rmDispatcher.register(RMAppManagerEventType.class, this.rmDispatcher.register(RMAppManagerEventType.class,
this.rmAppManager); this.rmAppManager);
this.rmDTSecretManager = createRMDelegationTokenSecretManager(); this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext);
clientRM = createClientRMService(); clientRM = createClientRMService();
addService(clientRM); addService(clientRM);
@ -666,7 +666,7 @@ protected ResourceTrackerService createResourceTrackerService() {
} }
protected RMDelegationTokenSecretManager protected RMDelegationTokenSecretManager
createRMDelegationTokenSecretManager() { createRMDelegationTokenSecretManager(RMContext rmContext) {
long secretKeyInterval = long secretKeyInterval =
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY, conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@ -678,7 +678,7 @@ protected ResourceTrackerService createResourceTrackerService() {
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
return new RMDelegationTokenSecretManager(secretKeyInterval, return new RMDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000); tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
} }
protected ClientRMService createClientRMService() { protected ClientRMService createClientRMService() {
@ -745,6 +745,9 @@ public ApplicationTokenSecretManager getApplicationTokenSecretManager(){
@Override @Override
public void recover(RMState state) throws Exception { public void recover(RMState state) throws Exception {
// recover RMdelegationTokenSecretManager
rmDTSecretManager.recover(state);
// recover applications // recover applications
rmAppManager.recover(state); rmAppManager.recover(state);
} }

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery; package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -33,11 +37,13 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -57,11 +63,19 @@ public class FileSystemRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
private static final String ROOT_DIR_NAME = "FSRMStateRoot"; private static final String ROOT_DIR_NAME = "FSRMStateRoot";
private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
private static final String RM_APP_ROOT = "RMAppRoot";
private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
private FileSystem fs; private FileSystem fs;
private Path fsRootDirPath; private Path rootDirPath;
private Path rmDTSecretManagerRoot;
private Path rmAppRoot;
private Path dtSequenceNumberPath = null;
@VisibleForTesting @VisibleForTesting
Path fsWorkingPath; Path fsWorkingPath;
@ -70,11 +84,14 @@ public synchronized void initInternal(Configuration conf)
throws Exception{ throws Exception{
fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI)); fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
fsRootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
// create filesystem // create filesystem
fs = fsWorkingPath.getFileSystem(conf); fs = fsWorkingPath.getFileSystem(conf);
fs.mkdirs(fsRootDirPath); fs.mkdirs(rmDTSecretManagerRoot);
fs.mkdirs(rmAppRoot);
} }
@Override @Override
@ -84,15 +101,23 @@ protected synchronized void closeInternal() throws Exception {
@Override @Override
public synchronized RMState loadState() throws Exception { public synchronized RMState loadState() throws Exception {
RMState rmState = new RMState();
// recover DelegationTokenSecretManager
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
return rmState;
}
private void loadRMAppState(RMState rmState) throws Exception {
try { try {
RMState state = new RMState(); FileStatus[] childNodes = fs.listStatus(rmAppRoot);
FileStatus[] childNodes = fs.listStatus(fsRootDirPath);
List<ApplicationAttemptState> attempts = List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>(); new ArrayList<ApplicationAttemptState>();
for(FileStatus childNodeStatus : childNodes) { for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile(); assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName(); String childNodeName = childNodeStatus.getPath().getName();
Path childNodePath = getNodePath(childNodeName); Path childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){ if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
// application // application
@ -107,7 +132,7 @@ public synchronized RMState loadState() throws Exception {
appStateData.getUser()); appStateData.getUser());
// assert child node name is same as actual applicationId // assert child node name is same as actual applicationId
assert appId.equals(appState.context.getApplicationId()); assert appId.equals(appState.context.getApplicationId());
state.appState.put(appId, appState); rmState.appState.put(appId, appState);
} else if(childNodeName.startsWith( } else if(childNodeName.startsWith(
ApplicationAttemptId.appAttemptIdStrPrefix)) { ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt // attempt
@ -139,7 +164,7 @@ public synchronized RMState loadState() throws Exception {
// go through all attempts and add them to their apps // go through all attempts and add them to their apps
for(ApplicationAttemptState attemptState : attempts) { for(ApplicationAttemptState attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId(); ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationState appState = state.appState.get(appId); ApplicationState appState = rmState.appState.get(appId);
if(appState != null) { if(appState != null) {
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} else { } else {
@ -148,22 +173,49 @@ public synchronized RMState loadState() throws Exception {
// application attempt nodes // application attempt nodes
LOG.info("Application node not found for attempt: " LOG.info("Application node not found for attempt: "
+ attemptState.getAttemptId()); + attemptState.getAttemptId());
deleteFile(getNodePath(attemptState.getAttemptId().toString())); deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString()));
} }
} }
return state;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to load state.", e); LOG.error("Failed to load state.", e);
throw e; throw e;
} }
} }
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){
DelegationKey key = new DelegationKey();
key.readFields(fsIn);
rmState.rmSecretManagerState.masterKeyState.add(key);
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier();
identifier.readFields(fsIn);
long renewDate = fsIn.readLong();
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate);
} else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
rmState.rmSecretManagerState.dtSequenceNumber =
Integer.parseInt(childNodeName.split("_")[1]);
}else {
LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
}
fsIn.close();
}
}
@Override @Override
public synchronized void storeApplicationState(String appId, public synchronized void storeApplicationState(String appId,
ApplicationStateDataPBImpl appStateDataPB) ApplicationStateDataPBImpl appStateDataPB) throws Exception {
throws Exception { Path nodeCreatePath = getNodePath(rmAppRoot, appId);
Path nodeCreatePath = getNodePath(appId);
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
@ -179,9 +231,8 @@ public synchronized void storeApplicationState(String appId,
@Override @Override
public synchronized void storeApplicationAttemptState(String attemptId, public synchronized void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB) ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception {
throws Exception { Path nodeCreatePath = getNodePath(rmAppRoot, attemptId);
Path nodeCreatePath = getNodePath(attemptId);
LOG.info("Storing info for attempt: " + attemptId LOG.info("Storing info for attempt: " + attemptId
+ " at: " + nodeCreatePath); + " at: " + nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@ -197,9 +248,9 @@ public synchronized void storeApplicationAttemptState(String attemptId,
@Override @Override
public synchronized void removeApplicationState(ApplicationState appState) public synchronized void removeApplicationState(ApplicationState appState)
throws Exception { throws Exception {
String appId = appState.getAppId().toString(); String appId = appState.getAppId().toString();
Path nodeRemovePath = getNodePath(appId); Path nodeRemovePath = getNodePath(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath); deleteFile(nodeRemovePath);
for(ApplicationAttemptId attemptId : appState.attempts.keySet()) { for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
@ -208,13 +259,76 @@ public synchronized void removeApplicationState(ApplicationState appState)
} }
public synchronized void removeApplicationAttemptState(String attemptId) public synchronized void removeApplicationAttemptState(String attemptId)
throws Exception { throws Exception {
Path nodeRemovePath = getNodePath(attemptId); Path nodeRemovePath = getNodePath(rmAppRoot, attemptId);
LOG.info("Removing info for attempt: " + attemptId LOG.info("Removing info for attempt: " + attemptId
+ " at: " + nodeRemovePath); + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath); deleteFile(nodeRemovePath);
} }
@Override
public synchronized void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier identifier, Long renewDate,
int latestSequenceNumber) throws Exception {
Path nodeCreatePath =
getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
identifier.write(fsOut);
fsOut.writeLong(renewDate);
writeFile(nodeCreatePath, os.toByteArray());
fsOut.close();
// store sequence number
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
+ latestSequenceNumber);
if (dtSequenceNumberPath == null) {
if (!createFile(latestSequenceNumberPath)) {
throw new Exception("Failed to create " + latestSequenceNumberPath);
}
} else {
if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
throw new Exception("Failed to rename " + dtSequenceNumberPath);
}
}
dtSequenceNumberPath = latestSequenceNumberPath;
}
@Override
public synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier identifier) throws Exception {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
deleteFile(nodeCreatePath);
}
@Override
public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
throws Exception {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_KEY_PREFIX + masterKey.getKeyId());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
masterKey.write(fsOut);
writeFile(nodeCreatePath, os.toByteArray());
fsOut.close();
}
@Override
public synchronized void
removeRMDTMasterKeyState(DelegationKey masterKey) throws Exception {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_KEY_PREFIX + masterKey.getKeyId());
LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId());
deleteFile(nodeCreatePath);
}
// FileSystem related code // FileSystem related code
private void deleteFile(Path deletePath) throws Exception { private void deleteFile(Path deletePath) throws Exception {
@ -228,18 +342,25 @@ private byte[] readFile(Path inputPath, long len) throws Exception {
// state data will not be that "long" // state data will not be that "long"
byte[] data = new byte[(int)len]; byte[] data = new byte[(int)len];
fsIn.readFully(data); fsIn.readFully(data);
fsIn.close();
return data; return data;
} }
private void writeFile(Path outputPath, byte[] data) throws Exception { private void writeFile(Path outputPath, byte[] data) throws Exception {
FSDataOutputStream fsOut = fs.create(outputPath, false); FSDataOutputStream fsOut = fs.create(outputPath, false);
fsOut.write(data); fsOut.write(data);
fsOut.flush();
fsOut.close(); fsOut.close();
} }
@VisibleForTesting private boolean renameFile(Path src, Path dst) throws Exception {
Path getNodePath(String nodeName) { return fs.rename(src, dst);
return new Path(fsRootDirPath, nodeName); }
private boolean createFile(Path newFile) throws Exception {
return fs.createNewFile(newFile);
}
private Path getNodePath(Path root, String nodeName) {
return new Path(root, nodeName);
} }
} }

View File

@ -19,14 +19,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery; package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -49,6 +53,12 @@ public synchronized RMState loadState() throws Exception {
// return a copy of the state to allow for modification of the real state // return a copy of the state to allow for modification of the real state
RMState returnState = new RMState(); RMState returnState = new RMState();
returnState.appState.putAll(state.appState); returnState.appState.putAll(state.appState);
returnState.rmSecretManagerState.getMasterKeyState()
.addAll(state.rmSecretManagerState.getMasterKeyState());
returnState.rmSecretManagerState.getTokenState().putAll(
state.rmSecretManagerState.getTokenState());
returnState.rmSecretManagerState.dtSequenceNumber =
state.rmSecretManagerState.dtSequenceNumber;
return returnState; return returnState;
} }
@ -113,4 +123,53 @@ public synchronized void removeApplicationState(ApplicationState appState)
ApplicationState removed = state.appState.remove(appId); ApplicationState removed = state.appState.remove(appId);
assert removed != null; assert removed != null;
} }
@Override
public synchronized void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
Map<RMDelegationTokenIdentifier, Long> rmDTState =
state.rmSecretManagerState.getTokenState();
if (rmDTState.containsKey(rmDTIdentifier)) {
IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier
+ "is already stored.");
LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e);
throw e;
}
rmDTState.put(rmDTIdentifier, renewDate);
state.rmSecretManagerState.dtSequenceNumber = latestSequenceNumber;
}
@Override
public synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception{
Map<RMDelegationTokenIdentifier, Long> rmDTState =
state.rmSecretManagerState.getTokenState();
rmDTState.remove(rmDTIdentifier);
}
@Override
public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception {
Set<DelegationKey> rmDTMasterKeyState =
state.rmSecretManagerState.getMasterKeyState();
if (rmDTMasterKeyState.contains(delegationKey)) {
IOException e = new IOException("RMDTMasterKey with keyID: "
+ delegationKey.getKeyId() + " is already stored");
LOG.info("Error storing info for RMDTMasterKey with keyID: "
+ delegationKey.getKeyId(), e);
throw e;
}
state.getRMDTSecretManagerState().getMasterKeyState().add(delegationKey);
LOG.info("rmDTMasterKeyState SIZE: " + rmDTMasterKeyState.size());
}
@Override
public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception {
Set<DelegationKey> rmDTMasterKeyState =
state.rmSecretManagerState.getMasterKeyState();
rmDTMasterKeyState.remove(delegationKey);
}
} }

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery; package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@Unstable @Unstable
public class NullRMStateStore extends RMStateStore { public class NullRMStateStore extends RMStateStore {
@ -59,4 +62,26 @@ protected void removeApplicationState(ApplicationState appState)
// Do nothing // Do nothing
} }
@Override
public void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
// Do nothing
}
@Override
public void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier)
throws Exception {
// Do nothing
}
@Override
public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
// Do nothing
}
@Override
public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
// Do nothing
}
} }

View File

@ -20,7 +20,9 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -30,6 +32,7 @@
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -42,6 +45,7 @@
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -57,7 +61,7 @@
*/ */
public abstract class RMStateStore { public abstract class RMStateStore {
public static final Log LOG = LogFactory.getLog(RMStateStore.class); public static final Log LOG = LogFactory.getLog(RMStateStore.class);
/** /**
* State of an application attempt * State of an application attempt
*/ */
@ -121,17 +125,46 @@ public String getUser() {
return user; return user;
} }
} }
public static class RMDTSecretManagerState {
// DTIdentifier -> renewDate
Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
new HashMap<RMDelegationTokenIdentifier, Long>();
Set<DelegationKey> masterKeyState =
new HashSet<DelegationKey>();
int dtSequenceNumber = 0;
public Map<RMDelegationTokenIdentifier, Long> getTokenState() {
return delegationTokenState;
}
public Set<DelegationKey> getMasterKeyState() {
return masterKeyState;
}
public int getDTSequenceNumber() {
return dtSequenceNumber;
}
}
/** /**
* State of the ResourceManager * State of the ResourceManager
*/ */
public static class RMState { public static class RMState {
Map<ApplicationId, ApplicationState> appState = Map<ApplicationId, ApplicationState> appState =
new HashMap<ApplicationId, ApplicationState>(); new HashMap<ApplicationId, ApplicationState>();
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
public Map<ApplicationId, ApplicationState> getApplicationState() { public Map<ApplicationId, ApplicationState> getApplicationState() {
return appState; return appState;
} }
public RMDTSecretManagerState getRMDTSecretManagerState() {
return rmSecretManagerState;
}
} }
private Dispatcher rmDispatcher; private Dispatcher rmDispatcher;
@ -235,8 +268,76 @@ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
protected abstract void storeApplicationAttemptState(String attemptId, protected abstract void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception; throws Exception;
/**
* RMDTSecretManager call this to store the state of a delegation token
* and sequence number
*/
public synchronized void storeRMDelegationTokenAndSequenceNumber(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
latestSequenceNumber);
}
/**
* Blocking API
* Derived classes must implement this method to store the state of
* RMDelegationToken and sequence number
*/
protected abstract void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception;
/**
* RMDTSecretManager call this to remove the state of a delegation token
*/
public synchronized void removeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber)
throws Exception {
removeRMDelegationTokenState(rmDTIdentifier);
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of RMDelegationToken
*/
protected abstract void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception;
/**
* RMDTSecretManager call this to store the state of a master key
*/
public synchronized void storeRMDTMasterKey(DelegationKey delegationKey)
throws Exception {
storeRMDTMasterKeyState(delegationKey);
}
/**
* Blocking API
* Derived classes must implement this method to store the state of
* DelegationToken Master Key
*/
protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception;
/**
* RMDTSecretManager call this to remove the state of a master key
*/
public synchronized void removeRMDTMasterKey(DelegationKey delegationKey)
throws Exception {
removeRMDTMasterKeyState(delegationKey);
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of
* DelegationToken Master Key
*/
protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception;
/** /**
* Non-blocking API * Non-blocking API
* ResourceManager services call this to remove an application from the state * ResourceManager services call this to remove an application from the state

View File

@ -18,10 +18,26 @@
package org.apache.hadoop.yarn.server.resourcemanager.security; package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import com.google.common.annotations.VisibleForTesting;
/** /**
* A ResourceManager specific delegation token secret manager. * A ResourceManager specific delegation token secret manager.
@ -30,8 +46,13 @@
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class RMDelegationTokenSecretManager public class RMDelegationTokenSecretManager extends
extends AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> { AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> implements
Recoverable {
private static final Log LOG = LogFactory
.getLog(RMDelegationTokenSecretManager.class);
protected final RMContext rmContext;
/** /**
* Create a secret manager * Create a secret manager
@ -46,13 +67,132 @@ public class RMDelegationTokenSecretManager
public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval, public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenMaxLifetime,
long delegationTokenRenewInterval, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) { long delegationTokenRemoverScanInterval,
RMContext rmContext) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval); delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.rmContext = rmContext;
} }
@Override @Override
public RMDelegationTokenIdentifier createIdentifier() { public RMDelegationTokenIdentifier createIdentifier() {
return new RMDelegationTokenIdentifier(); return new RMDelegationTokenIdentifier();
} }
@Override
protected void storeNewMasterKey(DelegationKey newKey) {
try {
LOG.info("storing master key with keyID " + newKey.getKeyId());
rmContext.getStateStore().storeRMDTMasterKey(newKey);
} catch (Exception e) {
LOG.error("Error in storing master key with KeyID: " + newKey.getKeyId());
ExitUtil.terminate(1, e);
}
}
@Override
protected void removeStoredMasterKey(DelegationKey key) {
try {
LOG.info("removing master key with keyID " + key.getKeyId());
rmContext.getStateStore().removeRMDTMasterKey(key);
} catch (Exception e) {
LOG.error("Error in removing master key with KeyID: " + key.getKeyId());
ExitUtil.terminate(1, e);
}
}
@Override
protected void storeNewToken(RMDelegationTokenIdentifier identifier,
long renewDate) {
try {
LOG.info("storing RMDelegation token with sequence number: "
+ identifier.getSequenceNumber());
rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(
identifier, renewDate, identifier.getSequenceNumber());
} catch (Exception e) {
LOG.error("Error in storing RMDelegationToken with sequence number: "
+ identifier.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
@Override
protected void updateStoredToken(RMDelegationTokenIdentifier id,
long renewDate) {
try {
LOG.info("updating RMDelegation token with sequence number: "
+ id.getSequenceNumber());
rmContext.getStateStore().removeRMDelegationToken(id,
delegationTokenSequenceNumber);
rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id,
renewDate, id.getSequenceNumber());
} catch (Exception e) {
LOG.error("Error in updating persisted RMDelegationToken with sequence number: "
+ id.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
@Override
protected void removeStoredToken(RMDelegationTokenIdentifier ident)
throws IOException {
try {
LOG.info("removing RMDelegation token with sequence number: "
+ ident.getSequenceNumber());
rmContext.getStateStore().removeRMDelegationToken(ident,
delegationTokenSequenceNumber);
} catch (Exception e) {
LOG.error("Error in removing RMDelegationToken with sequence number: "
+ ident.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
@Private
@VisibleForTesting
public synchronized Set<DelegationKey> getAllMasterKeys() {
HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
keySet.addAll(allKeys.values());
return keySet;
}
@Private
@VisibleForTesting
public synchronized Map<RMDelegationTokenIdentifier, Long> getAllTokens() {
Map<RMDelegationTokenIdentifier, Long> allTokens =
new HashMap<RMDelegationTokenIdentifier, Long>();
for (Map.Entry<RMDelegationTokenIdentifier,
DelegationTokenInformation> entry : currentTokens.entrySet()) {
allTokens.put(entry.getKey(), entry.getValue().getRenewDate());
}
return allTokens;
}
@Private
@VisibleForTesting
public int getLatestDTSequenceNumber() {
return delegationTokenSequenceNumber;
}
@Override
public void recover(RMState rmState) throws Exception {
LOG.info("recovering RMDelegationTokenSecretManager.");
// recover RMDTMasterKeys
for (DelegationKey dtKey : rmState.getRMDTSecretManagerState()
.getMasterKeyState()) {
addKey(dtKey);
}
// recover RMDelegationTokens
Map<RMDelegationTokenIdentifier, Long> rmDelegationTokens =
rmState.getRMDTSecretManagerState().getTokenState();
this.delegationTokenSequenceNumber =
rmState.getRMDTSecretManagerState().getDTSequenceNumber();
for (Map.Entry<RMDelegationTokenIdentifier, Long> entry : rmDelegationTokens
.entrySet()) {
addPersistedDelegationToken(entry.getKey(), entry.getValue());
}
}
} }

View File

@ -291,7 +291,7 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId)
@Override @Override
protected ClientRMService createClientRMService() { protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(), return new ClientRMService(getRMContext(), getResourceScheduler(),
rmAppManager, applicationACLsManager, null) { rmAppManager, applicationACLsManager, rmDTSecretManager) {
@Override @Override
public void start() { public void start() {
// override to not start rpc handler // override to not start rpc handler

View File

@ -65,6 +65,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -90,7 +91,9 @@ public class TestClientRMService {
@BeforeClass @BeforeClass
public static void setupSecretManager() throws IOException { public static void setupSecretManager() throws IOException {
dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000); RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext);
dtsm.startThreads(); dtsm.startThreads();
} }

View File

@ -28,6 +28,7 @@
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
@ -61,6 +62,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -454,10 +456,15 @@ private static ResourceScheduler createMockScheduler(Configuration conf) {
return mockSched; return mockSched;
} }
private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager( private static RMDelegationTokenSecretManager
long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) { createRMDelegationTokenSecretManager(long secretKeyInterval,
RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager( long tokenMaxLifetime, long tokenRenewInterval) {
secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000); RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
RMDelegationTokenSecretManager rmDtSecretManager =
new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime,
tokenRenewInterval, 3600000, rmContext);
return rmDtSecretManager; return rmDtSecretManager;
} }
} }

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -25,6 +28,7 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -33,14 +37,18 @@
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@ -58,24 +66,32 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestRMRestart { public class TestRMRestart {
@Test private YarnConfiguration conf;
public void testRMRestart() throws Exception {
@Before
public void setup() {
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit(); ExitUtil.disableSystemExit();
conf = new YarnConfiguration();
YarnConfiguration conf = new YarnConfiguration(); UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
}
@Test
public void testRMRestart() throws Exception {
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -331,13 +347,6 @@ public void testRMRestart() throws Exception {
@Test @Test
public void testRMRestartOnMaxAppAttempts() throws Exception { public void testRMRestartOnMaxAppAttempts() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -411,13 +420,6 @@ public void testRMRestartOnMaxAppAttempts() throws Exception {
@Test @Test
public void testDelegationTokenRestoredInDelegationTokenRenewer() public void testDelegationTokenRestoredInDelegationTokenRenewer()
throws Exception { throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos"); "kerberos");
@ -496,13 +498,6 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
@Test @Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos"); "kerberos");
@ -577,7 +572,142 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
rm2.stop(); rm2.stop();
} }
class TestSecurityMockRM extends MockRM { @Test
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
Map<RMDelegationTokenIdentifier, Long> rmDTState =
rmState.getRMDTSecretManagerState().getTokenState();
Set<DelegationKey> rmDTMasterKeyState =
rmState.getRMDTSecretManagerState().getMasterKeyState();
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
// create an empty credential
Credentials ts = new Credentials();
// request a token and add into credential
GetDelegationTokenRequest request1 = mock(GetDelegationTokenRequest.class);
when(request1.getRenewer()).thenReturn("renewer1");
GetDelegationTokenResponse response1 =
rm1.getClientRMService().getDelegationToken(request1);
DelegationToken delegationToken1 = response1.getRMDelegationToken();
Token<RMDelegationTokenIdentifier> token1 =
ProtoUtils.convertFromProtoFormat(delegationToken1, null);
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
HashSet<RMDelegationTokenIdentifier> tokenIdentSet =
new HashSet<RMDelegationTokenIdentifier>();
ts.addToken(token1.getService(), token1);
tokenIdentSet.add(dtId1);
// submit an app with customized credential
RMApp app = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
// assert app info is saved
ApplicationState appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
// assert all master keys are saved
Set<DelegationKey> allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys();
Assert.assertEquals(allKeysRM1, rmDTMasterKeyState);
// assert all tokens are saved
Map<RMDelegationTokenIdentifier, Long> allTokensRM1 =
rm1.getRMDTSecretManager().getAllTokens();
Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet());
Assert.assertEquals(allTokensRM1, rmDTState);
// assert sequence number is saved
Assert.assertEquals(
rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
rmState.getRMDTSecretManagerState().getDTSequenceNumber());
// request one more token
GetDelegationTokenRequest request2 = mock(GetDelegationTokenRequest.class);
when(request2.getRenewer()).thenReturn("renewer2");
GetDelegationTokenResponse response2 =
rm1.getClientRMService().getDelegationToken(request2);
DelegationToken delegationToken2 = response2.getRMDelegationToken();
Token<RMDelegationTokenIdentifier> token2 =
ProtoUtils.convertFromProtoFormat(delegationToken2, null);
RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier();
// cancel token2
try{
rm1.getRMDTSecretManager().cancelToken(token2,
UserGroupInformation.getCurrentUser().getUserName());
} catch(Exception e) {
Assert.fail();
}
// Assert the token which has the latest delegationTokenSequenceNumber is removed
Assert.assertEquals(
rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
dtId2.getSequenceNumber());
Assert.assertFalse(rmDTState.containsKey(dtId2));
// start new RM
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
// assert master keys and tokens are populated back to DTSecretManager
Map<RMDelegationTokenIdentifier, Long> allTokensRM2 =
rm2.getRMDTSecretManager().getAllTokens();
Assert.assertEquals(allTokensRM1, allTokensRM2);
// rm2 has its own master keys when it starts, we use containsAll here
Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys()
.containsAll(allKeysRM1));
// assert sequenceNumber is properly recovered,
// even though the token which has max sequenceNumber is not stored
Assert.assertEquals(rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
rm2.getRMDTSecretManager().getLatestDTSequenceNumber());
// renewDate before renewing
Long renewDateBeforeRenew = allTokensRM2.get(dtId1);
try{
// renew recovered token
rm2.getRMDTSecretManager().renewToken(token1, "renewer1");
} catch(Exception e) {
Assert.fail();
}
allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
Long renewDateAfterRenew = allTokensRM2.get(dtId1);
// assert token is renewed
Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew);
// assert new token is added into state store
Assert.assertTrue(rmDTState.containsValue(renewDateAfterRenew));
// assert old token is removed from state store
Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew));
try{
rm2.getRMDTSecretManager().cancelToken(token1,
UserGroupInformation.getCurrentUser().getUserName());
} catch(Exception e) {
Assert.fail();
}
// assert token is removed from state after its cancelled
allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
Assert.assertFalse(allTokensRM2.containsKey(dtId1));
Assert.assertFalse(rmDTState.containsKey(dtId1));
// stop the RM
rm1.stop();
rm2.stop();
}
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) { public TestSecurityMockRM(Configuration conf, RMStateStore store) {
super(conf, store); super(conf, store);

View File

@ -31,6 +31,8 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -41,6 +43,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -53,8 +56,10 @@
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -111,7 +116,8 @@ public void testFSRMStateStore() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try { try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
testRMStateStore(fsTester); testRMAppStateStore(fsTester);
testRMDTSecretManagerStateStore(fsTester);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
@ -218,7 +224,7 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
long submitTime = System.currentTimeMillis(); long submitTime = System.currentTimeMillis();
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
RMStateStore store = stateStoreHelper.getRMStateStore(); RMStateStore store = stateStoreHelper.getRMStateStore();
@ -334,6 +340,37 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
store.close(); store.close();
} }
public void testRMDTSecretManagerStateStore(
RMStateStoreHelper stateStoreHelper) throws Exception {
RMStateStore store = stateStoreHelper.getRMStateStore();
TestDispatcher dispatcher = new TestDispatcher();
store.setDispatcher(dispatcher);
// store RM delegation token;
RMDelegationTokenIdentifier dtId1 =
new RMDelegationTokenIdentifier(new Text("owner1"),
new Text("renewer1"), new Text("realuser1"));
Long renewDate1 = new Long(System.currentTimeMillis());
int sequenceNumber = 1111;
store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
sequenceNumber);
Map<RMDelegationTokenIdentifier, Long> token1 =
new HashMap<RMDelegationTokenIdentifier, Long>();
token1.put(dtId1, renewDate1);
// store delegation key;
DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes());
HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
keySet.add(key);
store.storeRMDTMasterKey(key);
RMDTSecretManagerState secretManagerState =
store.loadState().getRMDTSecretManagerState();
Assert.assertEquals(token1, secretManagerState.getTokenState());
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber());
}
private List<Token<?>> generateTokens(ApplicationAttemptId attemptId, private List<Token<?>> generateTokens(ApplicationAttemptId attemptId,
ApplicationTokenSecretManager appTokenMgr, ApplicationTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) { ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) {

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.security;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestRMDelegationTokens {
private YarnConfiguration conf;
@Before
public void setup() {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
conf = new YarnConfiguration();
UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
}
@Test(timeout = 15000)
public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<RMDelegationTokenIdentifier, Long> rmDTState =
rmState.getRMDTSecretManagerState().getTokenState();
Set<DelegationKey> rmDTMasterKeyState =
rmState.getRMDTSecretManagerState().getMasterKeyState();
MockRM rm1 = new MyMockRM(conf, memStore);
rm1.start();
// on rm start, two master keys are created.
// One is created at RMDTSecretMgr.startThreads.updateCurrentKey();
// the other is created on the first run of
// tokenRemoverThread.rollMasterKey()
RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
// assert all master keys are saved
Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState);
Set<DelegationKey> expiringKeys = new HashSet<DelegationKey>();
expiringKeys.addAll(dtSecretManager.getAllMasterKeys());
// record the current key
DelegationKey oldCurrentKey =
((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey();
// request to generate a RMDelegationToken
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
when(request.getRenewer()).thenReturn("renewer1");
GetDelegationTokenResponse response =
rm1.getClientRMService().getDelegationToken(request);
DelegationToken delegationToken = response.getRMDelegationToken();
Token<RMDelegationTokenIdentifier> token1 =
ProtoUtils.convertFromProtoFormat(delegationToken, null);
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
// wait for the first rollMasterKey
while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
.get() < 1){
Thread.sleep(200);
}
// assert old-current-key and new-current-key exist
Assert.assertTrue(rmDTMasterKeyState.contains(oldCurrentKey));
DelegationKey newCurrentKey =
((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey();
Assert.assertTrue(rmDTMasterKeyState.contains(newCurrentKey));
// wait for token to expire
// rollMasterKey is called every 1 second.
while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
.get() < 6) {
Thread.sleep(200);
}
Assert.assertFalse(rmDTState.containsKey(dtId1));
rm1.stop();
}
@Test(timeout = 15000)
public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Set<DelegationKey> rmDTMasterKeyState =
rmState.getRMDTSecretManagerState().getMasterKeyState();
MockRM rm1 = new MyMockRM(conf, memStore);
rm1.start();
RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
// assert all master keys are saved
Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState);
Set<DelegationKey> expiringKeys = new HashSet<DelegationKey>();
expiringKeys.addAll(dtSecretManager.getAllMasterKeys());
// wait for expiringKeys to expire
while (true) {
boolean allExpired = true;
for (DelegationKey key : expiringKeys) {
if (rmDTMasterKeyState.contains(key)) {
allExpired = false;
}
}
if (allExpired)
break;
Thread.sleep(500);
}
}
class MyMockRM extends TestSecurityMockRM {
public MyMockRM(Configuration conf, RMStateStore store) {
super(conf, store);
}
@Override
protected RMDelegationTokenSecretManager
createRMDelegationTokenSecretManager(RMContext rmContext) {
// KeyUpdateInterval-> 1 seconds
// TokenMaxLifetime-> 2 seconds.
return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
rmContext);
}
}
public class TestRMDelegationTokenSecretManager extends
RMDelegationTokenSecretManager {
public AtomicInteger numUpdatedKeys = new AtomicInteger(0);
public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval, RMContext rmContext) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval,
rmContext);
}
@Override
protected void storeNewMasterKey(DelegationKey newKey) {
super.storeNewMasterKey(newKey);
numUpdatedKeys.incrementAndGet();
}
public DelegationKey getCurrentKey() {
for (int keyId : allKeys.keySet()) {
if (keyId == currentId) {
return allKeys.get(keyId);
}
}
return null;
}
}
}