Merge r1580154 from trunk. YARN-1776. Fixed DelegationToken renewal to survive RM failover. Contributed by Zhijie Shen

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1580156 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-03-22 04:14:42 +00:00
parent 1bb24f2c18
commit 21b5ec140a
10 changed files with 248 additions and 56 deletions

View File

@ -540,6 +540,9 @@ Release 2.4.0 - UNRELEASED
YARN-1854. Fixed test failure in TestRMHA#testStartAndTransitions. (Rohith
Sharma KS via vinodkv)
YARN-1776. Fixed DelegationToken renewal to survive RM failover. (Zhijie
Shen via jianhe)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -73,7 +74,9 @@ public class FileSystemRMStateStore extends RMStateStore {
protected FileSystem fs;
private Path rootDirPath;
private Path rmDTSecretManagerRoot;
@Private
@VisibleForTesting
Path rmDTSecretManagerRoot;
private Path rmAppRoot;
private Path dtSequenceNumberPath = null;
@ -157,6 +160,7 @@ public class FileSystemRMStateStore extends RMStateStore {
new ArrayList<ApplicationAttemptState>();
for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
checkAndResumeUpdateOperation(appDir.getPath());
for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
@ -250,7 +254,29 @@ public class FileSystemRMStateStore extends RMStateStore {
return false;
}
private void checkAndResumeUpdateOperation(Path path) throws Exception {
// Before loading the state information, check whether .new file exists.
// If it does, the prior updateFile is failed on half way. We need to
// complete replacing the old file first.
FileStatus[] newChildNodes =
fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().endsWith(".new");
}
});
for(FileStatus newChildNodeStatus : newChildNodes) {
assert newChildNodeStatus.isFile();
String newChildNodeName = newChildNodeStatus.getPath().getName();
String childNodeName = newChildNodeName.substring(
0, newChildNodeName.length() - ".new".length());
Path childNodePath =
new Path(newChildNodeStatus.getPath().getParent(), childNodeName);
replaceFile(newChildNodeStatus.getPath(), childNodePath);
}
}
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
checkAndResumeUpdateOperation(rmDTSecretManagerRoot);
FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
for(FileStatus childNodeStatus : childNodes) {
@ -380,15 +406,44 @@ public class FileSystemRMStateStore extends RMStateStore {
public synchronized void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier identifier, Long renewDate,
int latestSequenceNumber) throws Exception {
storeOrUpdateRMDelegationTokenAndSequenceNumberState(
identifier, renewDate,latestSequenceNumber, false);
}
@Override
public synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier identifier) throws Exception {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
deleteFile(nodeCreatePath);
}
@Override
protected void updateRMDelegationTokenAndSequenceNumberInternal(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
storeOrUpdateRMDelegationTokenAndSequenceNumberState(
rmDTIdentifier, renewDate,latestSequenceNumber, true);
}
private void storeOrUpdateRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier identifier, Long renewDate,
int latestSequenceNumber, boolean isUpdate) throws Exception {
Path nodeCreatePath =
getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
identifier.write(fsOut);
fsOut.writeLong(renewDate);
writeFile(nodeCreatePath, os.toByteArray());
if (isUpdate) {
LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
updateFile(nodeCreatePath, os.toByteArray());
} else {
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
writeFile(nodeCreatePath, os.toByteArray());
}
fsOut.close();
// store sequence number
@ -408,15 +463,6 @@ public class FileSystemRMStateStore extends RMStateStore {
dtSequenceNumberPath = latestSequenceNumberPath;
}
@Override
public synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier identifier) throws Exception {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
deleteFile(nodeCreatePath);
}
@Override
public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
throws Exception {
@ -477,14 +523,28 @@ public class FileSystemRMStateStore extends RMStateStore {
fs.rename(tempPath, outputPath);
}
/*
* In order to make this update atomic as a part of write we will first write
* data to .new file and then rename it. Here we are assuming that rename is
* atomic for underlying file system.
*/
protected void updateFile(Path outputPath, byte[] data) throws Exception {
if (fs.exists(outputPath)) {
deleteFile(outputPath);
}
writeFile(outputPath, data);
Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
// use writeFile to make sure .new file is created atomically
writeFile(newPath, data);
replaceFile(newPath, outputPath);
}
private boolean renameFile(Path src, Path dst) throws Exception {
protected void replaceFile(Path srcPath, Path dstPath) throws Exception {
if (fs.exists(dstPath)) {
deleteFile(dstPath);
}
fs.rename(srcPath, dstPath);
}
@Private
@VisibleForTesting
boolean renameFile(Path src, Path dst) throws Exception {
return fs.rename(src, dst);
}
@ -492,7 +552,10 @@ public class FileSystemRMStateStore extends RMStateStore {
return fs.createNewFile(newFile);
}
private Path getNodePath(Path root, String nodeName) {
@Private
@VisibleForTesting
Path getNodePath(Path root, String nodeName) {
return new Path(root, nodeName);
}
}

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@ -201,6 +200,15 @@ public class MemoryRMStateStore extends RMStateStore {
rmDTState.remove(rmDTIdentifier);
}
@Override
protected void updateRMDelegationTokenAndSequenceNumberInternal(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
removeRMDelegationTokenState(rmDTIdentifier);
storeRMDelegationTokenAndSequenceNumberState(
rmDTIdentifier, renewDate, latestSequenceNumber);
}
@Override
public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception {
@ -239,4 +247,5 @@ public class MemoryRMStateStore extends RMStateStore {
protected RMStateVersion getCurrentVersion() {
return null;
}
}

View File

@ -83,6 +83,13 @@ public class NullRMStateStore extends RMStateStore {
// Do nothing
}
@Override
protected void updateRMDelegationTokenAndSequenceNumberInternal(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
// Do nothing
}
@Override
public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
// Do nothing
@ -125,4 +132,5 @@ public class NullRMStateStore extends RMStateStore {
// Do nothing
return null;
}
}

View File

@ -477,6 +477,30 @@ public abstract class RMStateStore extends AbstractService {
protected abstract void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception;
/**
* RMDTSecretManager call this to update the state of a delegation token
* and sequence number
*/
public synchronized void updateRMDelegationTokenAndSequenceNumber(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) {
try {
updateRMDelegationTokenAndSequenceNumberInternal(rmDTIdentifier, renewDate,
latestSequenceNumber);
} catch (Exception e) {
notifyStoreOperationFailed(e);
}
}
/**
* Blocking API
* Derived classes must implement this method to update the state of
* RMDelegationToken and sequence number
*/
protected abstract void updateRMDelegationTokenAndSequenceNumberInternal(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception;
/**
* RMDTSecretManager call this to store the state of a master key
*/

View File

@ -629,6 +629,54 @@ public class ZKRMStateStore extends RMStateStore {
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
ArrayList<Op> opList = new ArrayList<Op>();
addStoreOrUpdateOps(
opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
doMultiWithRetries(opList);
}
@Override
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
ArrayList<Op> opList = new ArrayList<Op>();
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
if (zkClient.exists(nodeRemovePath, true) != null) {
opList.add(Op.delete(nodeRemovePath, -1));
} else {
LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
}
doMultiWithRetries(opList);
}
@Override
protected void updateRMDelegationTokenAndSequenceNumberInternal(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
ArrayList<Op> opList = new ArrayList<Op>();
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (zkClient.exists(nodeRemovePath, true) == null) {
// in case znode doesn't exist
addStoreOrUpdateOps(
opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
LOG.info("Attempted to update a non-existing znode " + nodeRemovePath);
} else {
// in case znode exists
addStoreOrUpdateOps(
opList, rmDTIdentifier, renewDate, latestSequenceNumber, true);
}
doMultiWithRetries(opList);
}
private void addStoreOrUpdateOps(ArrayList<Op> opList,
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber, boolean isUpdate) throws Exception {
// store RM delegation token
String nodeCreatePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
@ -642,17 +690,21 @@ public class ZKRMStateStore extends RMStateStore {
rmDTIdentifier.write(tokenOut);
tokenOut.writeLong(renewDate);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing RMDelegationToken_" +
LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" +
rmDTIdentifier.getSequenceNumber());
}
opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl,
CreateMode.PERSISTENT));
if (isUpdate) {
opList.add(Op.setData(nodeCreatePath, tokenOs.toByteArray(), -1));
} else {
opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl,
CreateMode.PERSISTENT));
}
seqOut.writeInt(latestSequenceNumber);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing " + dtSequenceNumberPath +
LOG.debug((isUpdate ? "Storing " : "Updating ") + dtSequenceNumberPath +
". SequenceNumber: " + latestSequenceNumber);
}
@ -661,21 +713,6 @@ public class ZKRMStateStore extends RMStateStore {
tokenOs.close();
seqOs.close();
}
doMultiWithRetries(opList);
}
@Override
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
deleteWithRetries(nodeRemovePath, -1);
}
@Override
@ -707,7 +744,11 @@ public class ZKRMStateStore extends RMStateStore {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
deleteWithRetries(nodeRemovePath, -1);
if (zkClient.exists(nodeRemovePath, true) != null) {
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
} else {
LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
}
}
// ZK related code
@ -813,18 +854,6 @@ public class ZKRMStateStore extends RMStateStore {
doMultiWithRetries(Op.create(path, data, acl, mode));
}
private void deleteWithRetries(final String path, final int version)
throws Exception {
try {
doMultiWithRetries(Op.delete(path, version));
} catch (KeeperException.NoNodeException nne) {
// We tried to delete a node that doesn't exist
if (LOG.isDebugEnabled()) {
LOG.debug("Attempted to delete a non-existing znode " + path);
}
}
}
@VisibleForTesting
@Private
@Unstable

View File

@ -122,9 +122,7 @@ public class RMDelegationTokenSecretManager extends
try {
LOG.info("updating RMDelegation token with sequence number: "
+ id.getSequenceNumber());
rmContext.getStateStore().removeRMDelegationToken(id,
delegationTokenSequenceNumber);
rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id,
rmContext.getStateStore().updateRMDelegationTokenAndSequenceNumber(id,
renewDate, id.getSequenceNumber());
} catch (Exception e) {
LOG.error("Error in updating persisted RMDelegationToken with sequence number: "

View File

@ -244,6 +244,9 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
Thread.sleep(1000);
store.close();
// give tester a chance to modify app state in the store
modifyAppState();
// load state
store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(dispatcher);
@ -363,6 +366,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
int sequenceNumber = 1111;
store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
sequenceNumber);
modifyRMDelegationTokenState();
Map<RMDelegationTokenIdentifier, Long> token1 =
new HashMap<RMDelegationTokenIdentifier, Long>();
token1.put(dtId1, renewDate1);
@ -380,6 +384,20 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber());
// update RM delegation token;
renewDate1 = new Long(System.currentTimeMillis());
++sequenceNumber;
store.updateRMDelegationTokenAndSequenceNumber(
dtId1, renewDate1, sequenceNumber);
token1.put(dtId1, renewDate1);
RMDTSecretManagerState updateSecretManagerState =
store.loadState().getRMDTSecretManagerState();
Assert.assertEquals(token1, updateSecretManagerState.getTokenState());
Assert.assertEquals(keySet, updateSecretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
updateSecretManagerState.getDTSequenceNumber());
// check to delete delegationKey
store.removeRMDTMasterKey(key);
keySet.clear();
@ -487,4 +505,13 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
}
}
}
protected void modifyAppState() throws Exception {
}
protected void modifyRMDelegationTokenState() throws Exception {
}
}

View File

@ -49,6 +49,8 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class);
private TestFSRMStateStoreTester fsTester;
class TestFSRMStateStoreTester implements RMStateStoreHelper {
Path workingDirPathURI;
@ -134,7 +136,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
fsTester = new TestFSRMStateStoreTester(cluster);
// If the state store is FileSystemRMStateStore then add corrupted entry.
// It should discard the entry and remove it from file system.
FSDataOutputStream fsOut = null;
@ -162,6 +164,36 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
}
}
@Override
protected void modifyAppState() throws Exception {
// imitate appAttemptFile1 is still .new, but old one is deleted
String appAttemptIdStr1 = "appattempt_1352994193343_0001_000001";
ApplicationAttemptId attemptId1 =
ConverterUtils.toApplicationAttemptId(appAttemptIdStr1);
Path appDir =
fsTester.store.getAppDir(attemptId1.getApplicationId().toString());
Path appAttemptFile1 =
new Path(appDir, attemptId1.toString() + ".new");
FileSystemRMStateStore fileSystemRMStateStore =
(FileSystemRMStateStore) fsTester.getRMStateStore();
fileSystemRMStateStore.renameFile(appAttemptFile1,
new Path(appAttemptFile1.getParent(),
appAttemptFile1.getName() + ".new"));
}
@Override
protected void modifyRMDelegationTokenState() throws Exception {
// imitate dt file is still .new, but old one is deleted
Path nodeCreatePath =
fsTester.store.getNodePath(fsTester.store.rmDTSecretManagerRoot,
FileSystemRMStateStore.DELEGATION_TOKEN_PREFIX + 0);
FileSystemRMStateStore fileSystemRMStateStore =
(FileSystemRMStateStore) fsTester.getRMStateStore();
fileSystemRMStateStore.renameFile(nodeCreatePath,
new Path(nodeCreatePath.getParent(),
nodeCreatePath.getName() + ".new"));
}
@Test (timeout = 30000)
public void testFSRMStateStoreClientRetry() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.List;