YARN-1776. Fixed DelegationToken renewal to survive RM failover. Contributed by Zhijie Shen
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1580154 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cbb3914c89
commit
ce56616037
|
@ -555,6 +555,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
YARN-1854. Fixed test failure in TestRMHA#testStartAndTransitions. (Rohith
|
YARN-1854. Fixed test failure in TestRMHA#testStartAndTransitions. (Rohith
|
||||||
Sharma KS via vinodkv)
|
Sharma KS via vinodkv)
|
||||||
|
|
||||||
|
YARN-1776. Fixed DelegationToken renewal to survive RM failover. (Zhijie
|
||||||
|
Shen via jianhe)
|
||||||
|
|
||||||
Release 2.3.1 - UNRELEASED
|
Release 2.3.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
|
@ -73,7 +74,9 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
protected FileSystem fs;
|
protected FileSystem fs;
|
||||||
|
|
||||||
private Path rootDirPath;
|
private Path rootDirPath;
|
||||||
private Path rmDTSecretManagerRoot;
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
Path rmDTSecretManagerRoot;
|
||||||
private Path rmAppRoot;
|
private Path rmAppRoot;
|
||||||
private Path dtSequenceNumberPath = null;
|
private Path dtSequenceNumberPath = null;
|
||||||
|
|
||||||
|
@ -157,6 +160,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
new ArrayList<ApplicationAttemptState>();
|
new ArrayList<ApplicationAttemptState>();
|
||||||
|
|
||||||
for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
|
for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
|
||||||
|
checkAndResumeUpdateOperation(appDir.getPath());
|
||||||
for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
|
for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
|
||||||
assert childNodeStatus.isFile();
|
assert childNodeStatus.isFile();
|
||||||
String childNodeName = childNodeStatus.getPath().getName();
|
String childNodeName = childNodeStatus.getPath().getName();
|
||||||
|
@ -250,7 +254,29 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkAndResumeUpdateOperation(Path path) throws Exception {
|
||||||
|
// Before loading the state information, check whether .new file exists.
|
||||||
|
// If it does, the prior updateFile is failed on half way. We need to
|
||||||
|
// complete replacing the old file first.
|
||||||
|
FileStatus[] newChildNodes =
|
||||||
|
fs.listStatus(path, new PathFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path path) {
|
||||||
|
return path.getName().endsWith(".new");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
for(FileStatus newChildNodeStatus : newChildNodes) {
|
||||||
|
assert newChildNodeStatus.isFile();
|
||||||
|
String newChildNodeName = newChildNodeStatus.getPath().getName();
|
||||||
|
String childNodeName = newChildNodeName.substring(
|
||||||
|
0, newChildNodeName.length() - ".new".length());
|
||||||
|
Path childNodePath =
|
||||||
|
new Path(newChildNodeStatus.getPath().getParent(), childNodeName);
|
||||||
|
replaceFile(newChildNodeStatus.getPath(), childNodePath);
|
||||||
|
}
|
||||||
|
}
|
||||||
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
|
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
|
||||||
|
checkAndResumeUpdateOperation(rmDTSecretManagerRoot);
|
||||||
FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
|
FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
|
||||||
|
|
||||||
for(FileStatus childNodeStatus : childNodes) {
|
for(FileStatus childNodeStatus : childNodes) {
|
||||||
|
@ -380,15 +406,44 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
public synchronized void storeRMDelegationTokenAndSequenceNumberState(
|
public synchronized void storeRMDelegationTokenAndSequenceNumberState(
|
||||||
RMDelegationTokenIdentifier identifier, Long renewDate,
|
RMDelegationTokenIdentifier identifier, Long renewDate,
|
||||||
int latestSequenceNumber) throws Exception {
|
int latestSequenceNumber) throws Exception {
|
||||||
|
storeOrUpdateRMDelegationTokenAndSequenceNumberState(
|
||||||
|
identifier, renewDate,latestSequenceNumber, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void removeRMDelegationTokenState(
|
||||||
|
RMDelegationTokenIdentifier identifier) throws Exception {
|
||||||
|
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
|
||||||
|
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
||||||
|
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
|
||||||
|
deleteFile(nodeCreatePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||||
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||||
|
int latestSequenceNumber) throws Exception {
|
||||||
|
storeOrUpdateRMDelegationTokenAndSequenceNumberState(
|
||||||
|
rmDTIdentifier, renewDate,latestSequenceNumber, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void storeOrUpdateRMDelegationTokenAndSequenceNumberState(
|
||||||
|
RMDelegationTokenIdentifier identifier, Long renewDate,
|
||||||
|
int latestSequenceNumber, boolean isUpdate) throws Exception {
|
||||||
Path nodeCreatePath =
|
Path nodeCreatePath =
|
||||||
getNodePath(rmDTSecretManagerRoot,
|
getNodePath(rmDTSecretManagerRoot,
|
||||||
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
||||||
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
||||||
DataOutputStream fsOut = new DataOutputStream(os);
|
DataOutputStream fsOut = new DataOutputStream(os);
|
||||||
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
|
|
||||||
identifier.write(fsOut);
|
identifier.write(fsOut);
|
||||||
fsOut.writeLong(renewDate);
|
fsOut.writeLong(renewDate);
|
||||||
writeFile(nodeCreatePath, os.toByteArray());
|
if (isUpdate) {
|
||||||
|
LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
|
||||||
|
updateFile(nodeCreatePath, os.toByteArray());
|
||||||
|
} else {
|
||||||
|
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
|
||||||
|
writeFile(nodeCreatePath, os.toByteArray());
|
||||||
|
}
|
||||||
fsOut.close();
|
fsOut.close();
|
||||||
|
|
||||||
// store sequence number
|
// store sequence number
|
||||||
|
@ -408,15 +463,6 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
dtSequenceNumberPath = latestSequenceNumberPath;
|
dtSequenceNumberPath = latestSequenceNumberPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void removeRMDelegationTokenState(
|
|
||||||
RMDelegationTokenIdentifier identifier) throws Exception {
|
|
||||||
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
|
|
||||||
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
|
||||||
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
|
|
||||||
deleteFile(nodeCreatePath);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
|
public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -477,14 +523,28 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
fs.rename(tempPath, outputPath);
|
fs.rename(tempPath, outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In order to make this update atomic as a part of write we will first write
|
||||||
|
* data to .new file and then rename it. Here we are assuming that rename is
|
||||||
|
* atomic for underlying file system.
|
||||||
|
*/
|
||||||
protected void updateFile(Path outputPath, byte[] data) throws Exception {
|
protected void updateFile(Path outputPath, byte[] data) throws Exception {
|
||||||
if (fs.exists(outputPath)) {
|
Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
|
||||||
deleteFile(outputPath);
|
// use writeFile to make sure .new file is created atomically
|
||||||
}
|
writeFile(newPath, data);
|
||||||
writeFile(outputPath, data);
|
replaceFile(newPath, outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean renameFile(Path src, Path dst) throws Exception {
|
protected void replaceFile(Path srcPath, Path dstPath) throws Exception {
|
||||||
|
if (fs.exists(dstPath)) {
|
||||||
|
deleteFile(dstPath);
|
||||||
|
}
|
||||||
|
fs.rename(srcPath, dstPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean renameFile(Path src, Path dst) throws Exception {
|
||||||
return fs.rename(src, dst);
|
return fs.rename(src, dst);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -492,7 +552,10 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
return fs.createNewFile(newFile);
|
return fs.createNewFile(newFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path getNodePath(Path root, String nodeName) {
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
Path getNodePath(Path root, String nodeName) {
|
||||||
return new Path(root, nodeName);
|
return new Path(root, nodeName);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -201,6 +200,15 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
rmDTState.remove(rmDTIdentifier);
|
rmDTState.remove(rmDTIdentifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||||
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||||
|
int latestSequenceNumber) throws Exception {
|
||||||
|
removeRMDelegationTokenState(rmDTIdentifier);
|
||||||
|
storeRMDelegationTokenAndSequenceNumberState(
|
||||||
|
rmDTIdentifier, renewDate, latestSequenceNumber);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey)
|
public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -239,4 +247,5 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
protected RMStateVersion getCurrentVersion() {
|
protected RMStateVersion getCurrentVersion() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,13 @@ public class NullRMStateStore extends RMStateStore {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||||
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||||
|
int latestSequenceNumber) throws Exception {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
|
public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
|
@ -125,4 +132,5 @@ public class NullRMStateStore extends RMStateStore {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -477,6 +477,30 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
protected abstract void removeRMDelegationTokenState(
|
protected abstract void removeRMDelegationTokenState(
|
||||||
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception;
|
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RMDTSecretManager call this to update the state of a delegation token
|
||||||
|
* and sequence number
|
||||||
|
*/
|
||||||
|
public synchronized void updateRMDelegationTokenAndSequenceNumber(
|
||||||
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||||
|
int latestSequenceNumber) {
|
||||||
|
try {
|
||||||
|
updateRMDelegationTokenAndSequenceNumberInternal(rmDTIdentifier, renewDate,
|
||||||
|
latestSequenceNumber);
|
||||||
|
} catch (Exception e) {
|
||||||
|
notifyStoreOperationFailed(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocking API
|
||||||
|
* Derived classes must implement this method to update the state of
|
||||||
|
* RMDelegationToken and sequence number
|
||||||
|
*/
|
||||||
|
protected abstract void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||||
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||||
|
int latestSequenceNumber) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RMDTSecretManager call this to store the state of a master key
|
* RMDTSecretManager call this to store the state of a master key
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -629,6 +629,54 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||||
int latestSequenceNumber) throws Exception {
|
int latestSequenceNumber) throws Exception {
|
||||||
ArrayList<Op> opList = new ArrayList<Op>();
|
ArrayList<Op> opList = new ArrayList<Op>();
|
||||||
|
addStoreOrUpdateOps(
|
||||||
|
opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
|
||||||
|
doMultiWithRetries(opList);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized void removeRMDelegationTokenState(
|
||||||
|
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
|
||||||
|
ArrayList<Op> opList = new ArrayList<Op>();
|
||||||
|
String nodeRemovePath =
|
||||||
|
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
||||||
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Removing RMDelegationToken_"
|
||||||
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
|
}
|
||||||
|
if (zkClient.exists(nodeRemovePath, true) != null) {
|
||||||
|
opList.add(Op.delete(nodeRemovePath, -1));
|
||||||
|
} else {
|
||||||
|
LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
|
||||||
|
}
|
||||||
|
doMultiWithRetries(opList);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||||
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||||
|
int latestSequenceNumber) throws Exception {
|
||||||
|
ArrayList<Op> opList = new ArrayList<Op>();
|
||||||
|
String nodeRemovePath =
|
||||||
|
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
||||||
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
|
if (zkClient.exists(nodeRemovePath, true) == null) {
|
||||||
|
// in case znode doesn't exist
|
||||||
|
addStoreOrUpdateOps(
|
||||||
|
opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
|
||||||
|
LOG.info("Attempted to update a non-existing znode " + nodeRemovePath);
|
||||||
|
} else {
|
||||||
|
// in case znode exists
|
||||||
|
addStoreOrUpdateOps(
|
||||||
|
opList, rmDTIdentifier, renewDate, latestSequenceNumber, true);
|
||||||
|
}
|
||||||
|
doMultiWithRetries(opList);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addStoreOrUpdateOps(ArrayList<Op> opList,
|
||||||
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||||
|
int latestSequenceNumber, boolean isUpdate) throws Exception {
|
||||||
// store RM delegation token
|
// store RM delegation token
|
||||||
String nodeCreatePath =
|
String nodeCreatePath =
|
||||||
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
||||||
|
@ -642,17 +690,21 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
rmDTIdentifier.write(tokenOut);
|
rmDTIdentifier.write(tokenOut);
|
||||||
tokenOut.writeLong(renewDate);
|
tokenOut.writeLong(renewDate);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Storing RMDelegationToken_" +
|
LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" +
|
||||||
rmDTIdentifier.getSequenceNumber());
|
rmDTIdentifier.getSequenceNumber());
|
||||||
}
|
}
|
||||||
|
|
||||||
opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl,
|
if (isUpdate) {
|
||||||
CreateMode.PERSISTENT));
|
opList.add(Op.setData(nodeCreatePath, tokenOs.toByteArray(), -1));
|
||||||
|
} else {
|
||||||
|
opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl,
|
||||||
|
CreateMode.PERSISTENT));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
seqOut.writeInt(latestSequenceNumber);
|
seqOut.writeInt(latestSequenceNumber);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Storing " + dtSequenceNumberPath +
|
LOG.debug((isUpdate ? "Storing " : "Updating ") + dtSequenceNumberPath +
|
||||||
". SequenceNumber: " + latestSequenceNumber);
|
". SequenceNumber: " + latestSequenceNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -661,21 +713,6 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
tokenOs.close();
|
tokenOs.close();
|
||||||
seqOs.close();
|
seqOs.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
doMultiWithRetries(opList);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected synchronized void removeRMDelegationTokenState(
|
|
||||||
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
|
|
||||||
String nodeRemovePath =
|
|
||||||
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
|
||||||
+ rmDTIdentifier.getSequenceNumber());
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Removing RMDelegationToken_"
|
|
||||||
+ rmDTIdentifier.getSequenceNumber());
|
|
||||||
}
|
|
||||||
deleteWithRetries(nodeRemovePath, -1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -707,7 +744,11 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
||||||
}
|
}
|
||||||
deleteWithRetries(nodeRemovePath, -1);
|
if (zkClient.exists(nodeRemovePath, true) != null) {
|
||||||
|
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
|
||||||
|
} else {
|
||||||
|
LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ZK related code
|
// ZK related code
|
||||||
|
@ -813,18 +854,6 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
doMultiWithRetries(Op.create(path, data, acl, mode));
|
doMultiWithRetries(Op.create(path, data, acl, mode));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteWithRetries(final String path, final int version)
|
|
||||||
throws Exception {
|
|
||||||
try {
|
|
||||||
doMultiWithRetries(Op.delete(path, version));
|
|
||||||
} catch (KeeperException.NoNodeException nne) {
|
|
||||||
// We tried to delete a node that doesn't exist
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Attempted to delete a non-existing znode " + path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
|
|
|
@ -122,9 +122,7 @@ public class RMDelegationTokenSecretManager extends
|
||||||
try {
|
try {
|
||||||
LOG.info("updating RMDelegation token with sequence number: "
|
LOG.info("updating RMDelegation token with sequence number: "
|
||||||
+ id.getSequenceNumber());
|
+ id.getSequenceNumber());
|
||||||
rmContext.getStateStore().removeRMDelegationToken(id,
|
rmContext.getStateStore().updateRMDelegationTokenAndSequenceNumber(id,
|
||||||
delegationTokenSequenceNumber);
|
|
||||||
rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id,
|
|
||||||
renewDate, id.getSequenceNumber());
|
renewDate, id.getSequenceNumber());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error in updating persisted RMDelegationToken with sequence number: "
|
LOG.error("Error in updating persisted RMDelegationToken with sequence number: "
|
||||||
|
|
|
@ -244,6 +244,9 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
store.close();
|
store.close();
|
||||||
|
|
||||||
|
// give tester a chance to modify app state in the store
|
||||||
|
modifyAppState();
|
||||||
|
|
||||||
// load state
|
// load state
|
||||||
store = stateStoreHelper.getRMStateStore();
|
store = stateStoreHelper.getRMStateStore();
|
||||||
store.setRMDispatcher(dispatcher);
|
store.setRMDispatcher(dispatcher);
|
||||||
|
@ -363,6 +366,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
int sequenceNumber = 1111;
|
int sequenceNumber = 1111;
|
||||||
store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
|
store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
|
||||||
sequenceNumber);
|
sequenceNumber);
|
||||||
|
modifyRMDelegationTokenState();
|
||||||
Map<RMDelegationTokenIdentifier, Long> token1 =
|
Map<RMDelegationTokenIdentifier, Long> token1 =
|
||||||
new HashMap<RMDelegationTokenIdentifier, Long>();
|
new HashMap<RMDelegationTokenIdentifier, Long>();
|
||||||
token1.put(dtId1, renewDate1);
|
token1.put(dtId1, renewDate1);
|
||||||
|
@ -380,6 +384,20 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
Assert.assertEquals(sequenceNumber,
|
Assert.assertEquals(sequenceNumber,
|
||||||
secretManagerState.getDTSequenceNumber());
|
secretManagerState.getDTSequenceNumber());
|
||||||
|
|
||||||
|
// update RM delegation token;
|
||||||
|
renewDate1 = new Long(System.currentTimeMillis());
|
||||||
|
++sequenceNumber;
|
||||||
|
store.updateRMDelegationTokenAndSequenceNumber(
|
||||||
|
dtId1, renewDate1, sequenceNumber);
|
||||||
|
token1.put(dtId1, renewDate1);
|
||||||
|
|
||||||
|
RMDTSecretManagerState updateSecretManagerState =
|
||||||
|
store.loadState().getRMDTSecretManagerState();
|
||||||
|
Assert.assertEquals(token1, updateSecretManagerState.getTokenState());
|
||||||
|
Assert.assertEquals(keySet, updateSecretManagerState.getMasterKeyState());
|
||||||
|
Assert.assertEquals(sequenceNumber,
|
||||||
|
updateSecretManagerState.getDTSequenceNumber());
|
||||||
|
|
||||||
// check to delete delegationKey
|
// check to delete delegationKey
|
||||||
store.removeRMDTMasterKey(key);
|
store.removeRMDTMasterKey(key);
|
||||||
keySet.clear();
|
keySet.clear();
|
||||||
|
@ -487,4 +505,13 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void modifyAppState() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void modifyRMDelegationTokenState() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,6 +49,8 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class);
|
public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class);
|
||||||
|
|
||||||
|
private TestFSRMStateStoreTester fsTester;
|
||||||
|
|
||||||
class TestFSRMStateStoreTester implements RMStateStoreHelper {
|
class TestFSRMStateStoreTester implements RMStateStoreHelper {
|
||||||
|
|
||||||
Path workingDirPathURI;
|
Path workingDirPathURI;
|
||||||
|
@ -134,7 +136,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
try {
|
try {
|
||||||
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
|
fsTester = new TestFSRMStateStoreTester(cluster);
|
||||||
// If the state store is FileSystemRMStateStore then add corrupted entry.
|
// If the state store is FileSystemRMStateStore then add corrupted entry.
|
||||||
// It should discard the entry and remove it from file system.
|
// It should discard the entry and remove it from file system.
|
||||||
FSDataOutputStream fsOut = null;
|
FSDataOutputStream fsOut = null;
|
||||||
|
@ -162,6 +164,36 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void modifyAppState() throws Exception {
|
||||||
|
// imitate appAttemptFile1 is still .new, but old one is deleted
|
||||||
|
String appAttemptIdStr1 = "appattempt_1352994193343_0001_000001";
|
||||||
|
ApplicationAttemptId attemptId1 =
|
||||||
|
ConverterUtils.toApplicationAttemptId(appAttemptIdStr1);
|
||||||
|
Path appDir =
|
||||||
|
fsTester.store.getAppDir(attemptId1.getApplicationId().toString());
|
||||||
|
Path appAttemptFile1 =
|
||||||
|
new Path(appDir, attemptId1.toString() + ".new");
|
||||||
|
FileSystemRMStateStore fileSystemRMStateStore =
|
||||||
|
(FileSystemRMStateStore) fsTester.getRMStateStore();
|
||||||
|
fileSystemRMStateStore.renameFile(appAttemptFile1,
|
||||||
|
new Path(appAttemptFile1.getParent(),
|
||||||
|
appAttemptFile1.getName() + ".new"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void modifyRMDelegationTokenState() throws Exception {
|
||||||
|
// imitate dt file is still .new, but old one is deleted
|
||||||
|
Path nodeCreatePath =
|
||||||
|
fsTester.store.getNodePath(fsTester.store.rmDTSecretManagerRoot,
|
||||||
|
FileSystemRMStateStore.DELEGATION_TOKEN_PREFIX + 0);
|
||||||
|
FileSystemRMStateStore fileSystemRMStateStore =
|
||||||
|
(FileSystemRMStateStore) fsTester.getRMStateStore();
|
||||||
|
fileSystemRMStateStore.renameFile(nodeCreatePath,
|
||||||
|
new Path(nodeCreatePath.getParent(),
|
||||||
|
nodeCreatePath.getName() + ".new"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testFSRMStateStoreClientRetry() throws Exception {
|
public void testFSRMStateStoreClientRetry() throws Exception {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
Loading…
Reference in New Issue