diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d8f923c8df9..b0fd0401a1f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -540,6 +540,9 @@ Release 2.4.0 - UNRELEASED YARN-1854. Fixed test failure in TestRMHA#testStartAndTransitions. (Rohith Sharma KS via vinodkv) + YARN-1776. Fixed DelegationToken renewal to survive RM failover. (Zhijie + Shen via jianhe) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index d60e8ada086..cc25be77d43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -73,7 +74,9 @@ public class FileSystemRMStateStore extends RMStateStore { protected FileSystem fs; private Path rootDirPath; - private Path rmDTSecretManagerRoot; + @Private + @VisibleForTesting + Path rmDTSecretManagerRoot; private Path rmAppRoot; private Path dtSequenceNumberPath = null; @@ -157,6 +160,7 @@ public class FileSystemRMStateStore extends RMStateStore { new ArrayList(); for (FileStatus appDir : fs.listStatus(rmAppRoot)) { + checkAndResumeUpdateOperation(appDir.getPath()); for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); @@ -250,7 +254,29 @@ public class FileSystemRMStateStore extends RMStateStore { return false; } + private void checkAndResumeUpdateOperation(Path path) throws Exception { + // Before loading the state information, check whether .new file exists. + // If it does, the prior updateFile is failed on half way. We need to + // complete replacing the old file first. + FileStatus[] newChildNodes = + fs.listStatus(path, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(".new"); + } + }); + for(FileStatus newChildNodeStatus : newChildNodes) { + assert newChildNodeStatus.isFile(); + String newChildNodeName = newChildNodeStatus.getPath().getName(); + String childNodeName = newChildNodeName.substring( + 0, newChildNodeName.length() - ".new".length()); + Path childNodePath = + new Path(newChildNodeStatus.getPath().getParent(), childNodeName); + replaceFile(newChildNodeStatus.getPath(), childNodePath); + } + } private void loadRMDTSecretManagerState(RMState rmState) throws Exception { + checkAndResumeUpdateOperation(rmDTSecretManagerRoot); FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot); for(FileStatus childNodeStatus : childNodes) { @@ -380,15 +406,44 @@ public class FileSystemRMStateStore extends RMStateStore { public synchronized void storeRMDelegationTokenAndSequenceNumberState( RMDelegationTokenIdentifier identifier, Long renewDate, int latestSequenceNumber) throws Exception { + storeOrUpdateRMDelegationTokenAndSequenceNumberState( + identifier, renewDate,latestSequenceNumber, false); + } + + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier identifier) throws Exception { + Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, + DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); + deleteFile(nodeCreatePath); + } + + @Override + protected void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + storeOrUpdateRMDelegationTokenAndSequenceNumberState( + rmDTIdentifier, renewDate,latestSequenceNumber, true); + } + + private void storeOrUpdateRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier identifier, Long renewDate, + int latestSequenceNumber, boolean isUpdate) throws Exception { Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream fsOut = new DataOutputStream(os); - LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); identifier.write(fsOut); fsOut.writeLong(renewDate); - writeFile(nodeCreatePath, os.toByteArray()); + if (isUpdate) { + LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber()); + updateFile(nodeCreatePath, os.toByteArray()); + } else { + LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); + writeFile(nodeCreatePath, os.toByteArray()); + } fsOut.close(); // store sequence number @@ -408,15 +463,6 @@ public class FileSystemRMStateStore extends RMStateStore { dtSequenceNumberPath = latestSequenceNumberPath; } - @Override - public synchronized void removeRMDelegationTokenState( - RMDelegationTokenIdentifier identifier) throws Exception { - Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, - DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); - LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); - deleteFile(nodeCreatePath); - } - @Override public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey) throws Exception { @@ -477,14 +523,28 @@ public class FileSystemRMStateStore extends RMStateStore { fs.rename(tempPath, outputPath); } + /* + * In order to make this update atomic as a part of write we will first write + * data to .new file and then rename it. Here we are assuming that rename is + * atomic for underlying file system. + */ protected void updateFile(Path outputPath, byte[] data) throws Exception { - if (fs.exists(outputPath)) { - deleteFile(outputPath); - } - writeFile(outputPath, data); + Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new"); + // use writeFile to make sure .new file is created atomically + writeFile(newPath, data); + replaceFile(newPath, outputPath); } - private boolean renameFile(Path src, Path dst) throws Exception { + protected void replaceFile(Path srcPath, Path dstPath) throws Exception { + if (fs.exists(dstPath)) { + deleteFile(dstPath); + } + fs.rename(srcPath, dstPath); + } + + @Private + @VisibleForTesting + boolean renameFile(Path src, Path dst) throws Exception { return fs.rename(src, dst); } @@ -492,7 +552,10 @@ public class FileSystemRMStateStore extends RMStateStore { return fs.createNewFile(newFile); } - private Path getNodePath(Path root, String nodeName) { + @Private + @VisibleForTesting + Path getNodePath(Path root, String nodeName) { return new Path(root, nodeName); } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5a20ff28b95..0eb5a3df4fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; -import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -201,6 +200,15 @@ public class MemoryRMStateStore extends RMStateStore { rmDTState.remove(rmDTIdentifier); } + @Override + protected void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + removeRMDelegationTokenState(rmDTIdentifier); + storeRMDelegationTokenAndSequenceNumberState( + rmDTIdentifier, renewDate, latestSequenceNumber); + } + @Override public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { @@ -239,4 +247,5 @@ public class MemoryRMStateStore extends RMStateStore { protected RMStateVersion getCurrentVersion() { return null; } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index af28a0152ef..a12099f46f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -83,6 +83,13 @@ public class NullRMStateStore extends RMStateStore { // Do nothing } + @Override + protected void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + // Do nothing + } + @Override public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { // Do nothing @@ -125,4 +132,5 @@ public class NullRMStateStore extends RMStateStore { // Do nothing return null; } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 55f0b6591b4..32a06c49537 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -477,6 +477,30 @@ public abstract class RMStateStore extends AbstractService { protected abstract void removeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier) throws Exception; + /** + * RMDTSecretManager call this to update the state of a delegation token + * and sequence number + */ + public synchronized void updateRMDelegationTokenAndSequenceNumber( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) { + try { + updateRMDelegationTokenAndSequenceNumberInternal(rmDTIdentifier, renewDate, + latestSequenceNumber); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } + } + + /** + * Blocking API + * Derived classes must implement this method to update the state of + * RMDelegationToken and sequence number + */ + protected abstract void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception; + /** * RMDTSecretManager call this to store the state of a master key */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index abc1e4c6d75..fdfd6cd7b84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -629,6 +629,54 @@ public class ZKRMStateStore extends RMStateStore { RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, int latestSequenceNumber) throws Exception { ArrayList opList = new ArrayList(); + addStoreOrUpdateOps( + opList, rmDTIdentifier, renewDate, latestSequenceNumber, false); + doMultiWithRetries(opList); + } + + @Override + protected synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + ArrayList opList = new ArrayList(); + String nodeRemovePath = + getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + + rmDTIdentifier.getSequenceNumber()); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing RMDelegationToken_" + + rmDTIdentifier.getSequenceNumber()); + } + if (zkClient.exists(nodeRemovePath, true) != null) { + opList.add(Op.delete(nodeRemovePath, -1)); + } else { + LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath); + } + doMultiWithRetries(opList); + } + + @Override + protected void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + ArrayList opList = new ArrayList(); + String nodeRemovePath = + getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + + rmDTIdentifier.getSequenceNumber()); + if (zkClient.exists(nodeRemovePath, true) == null) { + // in case znode doesn't exist + addStoreOrUpdateOps( + opList, rmDTIdentifier, renewDate, latestSequenceNumber, false); + LOG.info("Attempted to update a non-existing znode " + nodeRemovePath); + } else { + // in case znode exists + addStoreOrUpdateOps( + opList, rmDTIdentifier, renewDate, latestSequenceNumber, true); + } + doMultiWithRetries(opList); + } + + private void addStoreOrUpdateOps(ArrayList opList, + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber, boolean isUpdate) throws Exception { // store RM delegation token String nodeCreatePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX @@ -642,17 +690,21 @@ public class ZKRMStateStore extends RMStateStore { rmDTIdentifier.write(tokenOut); tokenOut.writeLong(renewDate); if (LOG.isDebugEnabled()) { - LOG.debug("Storing RMDelegationToken_" + + LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl, - CreateMode.PERSISTENT)); + if (isUpdate) { + opList.add(Op.setData(nodeCreatePath, tokenOs.toByteArray(), -1)); + } else { + opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl, + CreateMode.PERSISTENT)); + } seqOut.writeInt(latestSequenceNumber); if (LOG.isDebugEnabled()) { - LOG.debug("Storing " + dtSequenceNumberPath + + LOG.debug((isUpdate ? "Storing " : "Updating ") + dtSequenceNumberPath + ". SequenceNumber: " + latestSequenceNumber); } @@ -661,21 +713,6 @@ public class ZKRMStateStore extends RMStateStore { tokenOs.close(); seqOs.close(); } - - doMultiWithRetries(opList); - } - - @Override - protected synchronized void removeRMDelegationTokenState( - RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { - String nodeRemovePath = - getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX - + rmDTIdentifier.getSequenceNumber()); - if (LOG.isDebugEnabled()) { - LOG.debug("Removing RMDelegationToken_" - + rmDTIdentifier.getSequenceNumber()); - } - deleteWithRetries(nodeRemovePath, -1); } @Override @@ -707,7 +744,11 @@ public class ZKRMStateStore extends RMStateStore { if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } - deleteWithRetries(nodeRemovePath, -1); + if (zkClient.exists(nodeRemovePath, true) != null) { + doMultiWithRetries(Op.delete(nodeRemovePath, -1)); + } else { + LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath); + } } // ZK related code @@ -813,18 +854,6 @@ public class ZKRMStateStore extends RMStateStore { doMultiWithRetries(Op.create(path, data, acl, mode)); } - private void deleteWithRetries(final String path, final int version) - throws Exception { - try { - doMultiWithRetries(Op.delete(path, version)); - } catch (KeeperException.NoNodeException nne) { - // We tried to delete a node that doesn't exist - if (LOG.isDebugEnabled()) { - LOG.debug("Attempted to delete a non-existing znode " + path); - } - } - } - @VisibleForTesting @Private @Unstable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index 23939defb42..ae786d75d09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -122,9 +122,7 @@ public class RMDelegationTokenSecretManager extends try { LOG.info("updating RMDelegation token with sequence number: " + id.getSequenceNumber()); - rmContext.getStateStore().removeRMDelegationToken(id, - delegationTokenSequenceNumber); - rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id, + rmContext.getStateStore().updateRMDelegationTokenAndSequenceNumber(id, renewDate, id.getSequenceNumber()); } catch (Exception e) { LOG.error("Error in updating persisted RMDelegationToken with sequence number: " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index dc8b043521b..009e96e7838 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -244,6 +244,9 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ Thread.sleep(1000); store.close(); + // give tester a chance to modify app state in the store + modifyAppState(); + // load state store = stateStoreHelper.getRMStateStore(); store.setRMDispatcher(dispatcher); @@ -363,6 +366,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ int sequenceNumber = 1111; store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1, sequenceNumber); + modifyRMDelegationTokenState(); Map token1 = new HashMap(); token1.put(dtId1, renewDate1); @@ -380,6 +384,20 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber()); + // update RM delegation token; + renewDate1 = new Long(System.currentTimeMillis()); + ++sequenceNumber; + store.updateRMDelegationTokenAndSequenceNumber( + dtId1, renewDate1, sequenceNumber); + token1.put(dtId1, renewDate1); + + RMDTSecretManagerState updateSecretManagerState = + store.loadState().getRMDTSecretManagerState(); + Assert.assertEquals(token1, updateSecretManagerState.getTokenState()); + Assert.assertEquals(keySet, updateSecretManagerState.getMasterKeyState()); + Assert.assertEquals(sequenceNumber, + updateSecretManagerState.getDTSequenceNumber()); + // check to delete delegationKey store.removeRMDTMasterKey(key); keySet.clear(); @@ -487,4 +505,13 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ } } } + + protected void modifyAppState() throws Exception { + + } + + protected void modifyRMDelegationTokenState() throws Exception { + + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 0c372acd332..792b73e5a66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -49,6 +49,8 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class); + private TestFSRMStateStoreTester fsTester; + class TestFSRMStateStoreTester implements RMStateStoreHelper { Path workingDirPathURI; @@ -134,7 +136,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); try { - TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); + fsTester = new TestFSRMStateStoreTester(cluster); // If the state store is FileSystemRMStateStore then add corrupted entry. // It should discard the entry and remove it from file system. FSDataOutputStream fsOut = null; @@ -162,6 +164,36 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { } } + @Override + protected void modifyAppState() throws Exception { + // imitate appAttemptFile1 is still .new, but old one is deleted + String appAttemptIdStr1 = "appattempt_1352994193343_0001_000001"; + ApplicationAttemptId attemptId1 = + ConverterUtils.toApplicationAttemptId(appAttemptIdStr1); + Path appDir = + fsTester.store.getAppDir(attemptId1.getApplicationId().toString()); + Path appAttemptFile1 = + new Path(appDir, attemptId1.toString() + ".new"); + FileSystemRMStateStore fileSystemRMStateStore = + (FileSystemRMStateStore) fsTester.getRMStateStore(); + fileSystemRMStateStore.renameFile(appAttemptFile1, + new Path(appAttemptFile1.getParent(), + appAttemptFile1.getName() + ".new")); + } + + @Override + protected void modifyRMDelegationTokenState() throws Exception { + // imitate dt file is still .new, but old one is deleted + Path nodeCreatePath = + fsTester.store.getNodePath(fsTester.store.rmDTSecretManagerRoot, + FileSystemRMStateStore.DELEGATION_TOKEN_PREFIX + 0); + FileSystemRMStateStore fileSystemRMStateStore = + (FileSystemRMStateStore) fsTester.getRMStateStore(); + fileSystemRMStateStore.renameFile(nodeCreatePath, + new Path(nodeCreatePath.getParent(), + nodeCreatePath.getName() + ".new")); + } + @Test (timeout = 30000) public void testFSRMStateStoreClientRetry() throws Exception { HdfsConfiguration conf = new HdfsConfiguration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 2174ae531c4..284794b3734 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.List;