From 79f73f461362d6d574e248f65d1e0dc6e895524a Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ozawa Date: Sat, 28 Feb 2015 00:56:44 +0900 Subject: [PATCH] YARN-2820. Retry in FileSystemRMStateStore when FS's operations fail due to IOException. Contributed by Zhihai Xu. (cherry picked from commit 01a1621930df17a745dd37892996c68fca3447d1) --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 9 + .../src/main/resources/yarn-default.xml | 15 + .../recovery/FileSystemRMStateStore.java | 301 ++++++++++++++---- .../recovery/TestFSRMStateStore.java | 5 + 5 files changed, 264 insertions(+), 69 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 801192a046d..b016cbb1d8b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -294,6 +294,9 @@ Release 2.7.0 - UNRELEASED YARN-3255. RM, NM, JobHistoryServer, and WebAppProxyServer's main() should support generic options. (shv) + YARN-2820. Retry in FileSystemRMStateStore when FS's operations fail + due to IOException. (Zhihai Xu via ozawa) + OPTIMIZATIONS YARN-2990. FairScheduler's delay-scheduling always waits for node-local and diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 544ae1b3e64..8cc7ad70a94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -508,6 +508,15 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC = "2000, 500"; + public static final String FS_RM_STATE_STORE_NUM_RETRIES = + RM_PREFIX + "fs.state-store.num-retries"; + public static final int DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES = 0; + + public static final String FS_RM_STATE_STORE_RETRY_INTERVAL_MS = + RM_PREFIX + "fs.state-store.retry-interval-ms"; + public static final long DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS = + 1000L; + public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX + "leveldb-state-store.path"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 0a1d3db3b7f..f311f16128d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -419,6 +419,21 @@ 2000, 500 + + the number of retries to recover from IOException in + FileSystemRMStateStore. + + yarn.resourcemanager.fs.state-store.num-retries + 0 + + + + Retry interval in milliseconds in FileSystemRMStateStore. + + yarn.resourcemanager.fs.state-store.retry-interval-ms + 1000 + + Local path where the RM state will be stored when using org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 6e830a028ab..8147597dcad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -92,6 +92,8 @@ public class FileSystemRMStateStore extends RMStateStore { Path rmDTSecretManagerRoot; private Path rmAppRoot; private Path dtSequenceNumberPath = null; + private int fsNumRetries; + private long fsRetryInterval; @VisibleForTesting Path fsWorkingPath; @@ -106,6 +108,12 @@ public class FileSystemRMStateStore extends RMStateStore { rmAppRoot = new Path(rootDirPath, RM_APP_ROOT); amrmTokenSecretManagerRoot = new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT); + fsNumRetries = + conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, + YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES); + fsRetryInterval = + conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS); } @Override @@ -121,14 +129,14 @@ public class FileSystemRMStateStore extends RMStateStore { conf.set("dfs.client.retry.policy.spec", retryPolicy); fs = fsWorkingPath.getFileSystem(conf); - fs.mkdirs(rmDTSecretManagerRoot); - fs.mkdirs(rmAppRoot); - fs.mkdirs(amrmTokenSecretManagerRoot); + mkdirsWithRetries(rmDTSecretManagerRoot); + mkdirsWithRetries(rmAppRoot); + mkdirsWithRetries(amrmTokenSecretManagerRoot); } @Override protected synchronized void closeInternal() throws Exception { - fs.close(); + closeWithRetries(); } @Override @@ -139,9 +147,9 @@ public class FileSystemRMStateStore extends RMStateStore { @Override protected synchronized Version loadVersion() throws Exception { Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); - FileStatus status = getFileStatus(versionNodePath); + FileStatus status = getFileStatusWithRetries(versionNodePath); if (status != null) { - byte[] data = readFile(versionNodePath, status.getLen()); + byte[] data = readFileWithRetries(versionNodePath, status.getLen()); Version version = new VersionPBImpl(VersionProto.parseFrom(data)); return version; @@ -154,10 +162,10 @@ public class FileSystemRMStateStore extends RMStateStore { Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); - if (fs.exists(versionNodePath)) { + if (existsWithRetries(versionNodePath)) { updateFile(versionNodePath, data); } else { - writeFile(versionNodePath, data); + writeFileWithRetries(versionNodePath, data); } } @@ -165,10 +173,10 @@ public class FileSystemRMStateStore extends RMStateStore { public synchronized long getAndIncrementEpoch() throws Exception { Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE); long currentEpoch = 0; - FileStatus status = getFileStatus(epochNodePath); + FileStatus status = getFileStatusWithRetries(epochNodePath); if (status != null) { // load current epoch - byte[] data = readFile(epochNodePath, status.getLen()); + byte[] data = readFileWithRetries(epochNodePath, status.getLen()); Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); // increment epoch and store it @@ -179,7 +187,7 @@ public class FileSystemRMStateStore extends RMStateStore { // initialize epoch file with 1 for the next time. byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() .toByteArray(); - writeFile(epochNodePath, storeData); + writeFileWithRetries(epochNodePath, storeData); } return currentEpoch; } @@ -201,12 +209,14 @@ public class FileSystemRMStateStore extends RMStateStore { checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot); Path amrmTokenSecretManagerStateDataDir = new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE); - FileStatus status = getFileStatus(amrmTokenSecretManagerStateDataDir); + FileStatus status = getFileStatusWithRetries( + amrmTokenSecretManagerStateDataDir); if (status == null) { return; } assert status.isFile(); - byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen()); + byte[] data = readFileWithRetries(amrmTokenSecretManagerStateDataDir, + status.getLen()); AMRMTokenSecretManagerStatePBImpl stateData = new AMRMTokenSecretManagerStatePBImpl( AMRMTokenSecretManagerStateProto.parseFrom(data)); @@ -220,16 +230,18 @@ public class FileSystemRMStateStore extends RMStateStore { List attempts = new ArrayList(); - for (FileStatus appDir : fs.listStatus(rmAppRoot)) { + for (FileStatus appDir : listStatusWithRetries(rmAppRoot)) { checkAndResumeUpdateOperation(appDir.getPath()); - for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) { + for (FileStatus childNodeStatus : + listStatusWithRetries(appDir.getPath())) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); - if (checkAndRemovePartialRecord(childNodeStatus.getPath())) { + if (checkAndRemovePartialRecordWithRetries( + childNodeStatus.getPath())) { continue; } - byte[] childData = - readFile(childNodeStatus.getPath(), childNodeStatus.getLen()); + byte[] childData = readFileWithRetries(childNodeStatus.getPath(), + childNodeStatus.getLen()); if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { // application if (LOG.isDebugEnabled()) { @@ -292,7 +304,7 @@ public class FileSystemRMStateStore extends RMStateStore { // If it does, the prior updateFile is failed on half way. We need to // complete replacing the old file first. FileStatus[] newChildNodes = - fs.listStatus(path, new PathFilter() { + listStatusWithRetries(path, new PathFilter() { @Override public boolean accept(Path path) { return path.getName().endsWith(".new"); @@ -310,12 +322,12 @@ public class FileSystemRMStateStore extends RMStateStore { } private void loadRMDTSecretManagerState(RMState rmState) throws Exception { checkAndResumeUpdateOperation(rmDTSecretManagerRoot); - FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot); + FileStatus[] childNodes = listStatusWithRetries(rmDTSecretManagerRoot); for(FileStatus childNodeStatus : childNodes) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); - if (checkAndRemovePartialRecord(childNodeStatus.getPath())) { + if (checkAndRemovePartialRecordWithRetries(childNodeStatus.getPath())) { continue; } if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { @@ -325,35 +337,36 @@ public class FileSystemRMStateStore extends RMStateStore { } Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName); - byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + byte[] childData = readFileWithRetries(childNodePath, + childNodeStatus.getLen()); ByteArrayInputStream is = new ByteArrayInputStream(childData); - DataInputStream fsIn = new DataInputStream(is); - if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){ - DelegationKey key = new DelegationKey(); - key.readFields(fsIn); - rmState.rmSecretManagerState.masterKeyState.add(key); - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded delegation key: keyId=" + key.getKeyId() - + ", expirationDate=" + key.getExpiryDate()); - } - } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { - RMDelegationTokenIdentifierData identifierData = - new RMDelegationTokenIdentifierData(); - identifierData.readFields(fsIn); - RMDelegationTokenIdentifier identifier = - identifierData.getTokenIdentifier(); - long renewDate = identifierData.getRenewDate(); + try (DataInputStream fsIn = new DataInputStream(is)) { + if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) { + DelegationKey key = new DelegationKey(); + key.readFields(fsIn); + rmState.rmSecretManagerState.masterKeyState.add(key); + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded delegation key: keyId=" + key.getKeyId() + + ", expirationDate=" + key.getExpiryDate()); + } + } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { + RMDelegationTokenIdentifierData identifierData = + new RMDelegationTokenIdentifierData(); + identifierData.readFields(fsIn); + RMDelegationTokenIdentifier identifier = + identifierData.getTokenIdentifier(); + long renewDate = identifierData.getRenewDate(); - rmState.rmSecretManagerState.delegationTokenState.put(identifier, - renewDate); - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier - + " renewDate=" + renewDate); + rmState.rmSecretManagerState.delegationTokenState.put(identifier, + renewDate); + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier + + " renewDate=" + renewDate); + } + } else { + LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager"); } - } else { - LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager"); } - fsIn.close(); } } @@ -361,7 +374,7 @@ public class FileSystemRMStateStore extends RMStateStore { public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { Path appDirPath = getAppDir(rmAppRoot, appId); - fs.mkdirs(appDirPath); + mkdirsWithRetries(appDirPath); Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); @@ -369,7 +382,7 @@ public class FileSystemRMStateStore extends RMStateStore { try { // currently throw all exceptions. May need to respond differently for HA // based on whether we have lost the right to write to FS - writeFile(nodeCreatePath, appStateData); + writeFileWithRetries(nodeCreatePath, appStateData); } catch (Exception e) { LOG.info("Error storing info for app: " + appId, e); throw e; @@ -408,7 +421,7 @@ public class FileSystemRMStateStore extends RMStateStore { try { // currently throw all exceptions. May need to respond differently for HA // based on whether we have lost the right to write to FS - writeFile(nodeCreatePath, attemptStateData); + writeFileWithRetries(nodeCreatePath, attemptStateData); } catch (Exception e) { LOG.info("Error storing info for attempt: " + appAttemptId, e); throw e; @@ -444,7 +457,7 @@ public class FileSystemRMStateStore extends RMStateStore { appState.getApplicationSubmissionContext().getApplicationId(); Path nodeRemovePath = getAppDir(rmAppRoot, appId); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); - deleteFile(nodeRemovePath); + deleteFileWithRetries(nodeRemovePath); } @Override @@ -460,7 +473,7 @@ public class FileSystemRMStateStore extends RMStateStore { Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); - deleteFile(nodeCreatePath); + deleteFileWithRetries(nodeCreatePath); } @Override @@ -483,7 +496,7 @@ public class FileSystemRMStateStore extends RMStateStore { updateFile(nodeCreatePath, identifierData.toByteArray()); } else { LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); - writeFile(nodeCreatePath, identifierData.toByteArray()); + writeFileWithRetries(nodeCreatePath, identifierData.toByteArray()); // store sequence number Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot, @@ -492,11 +505,12 @@ public class FileSystemRMStateStore extends RMStateStore { LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + identifier.getSequenceNumber()); if (dtSequenceNumberPath == null) { - if (!createFile(latestSequenceNumberPath)) { + if (!createFileWithRetries(latestSequenceNumberPath)) { throw new Exception("Failed to create " + latestSequenceNumberPath); } } else { - if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) { + if (!renameFileWithRetries(dtSequenceNumberPath, + latestSequenceNumberPath)) { throw new Exception("Failed to rename " + dtSequenceNumberPath); } } @@ -510,11 +524,11 @@ public class FileSystemRMStateStore extends RMStateStore { Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX + masterKey.getKeyId()); ByteArrayOutputStream os = new ByteArrayOutputStream(); - DataOutputStream fsOut = new DataOutputStream(os); - LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId()); - masterKey.write(fsOut); - writeFile(nodeCreatePath, os.toByteArray()); - fsOut.close(); + try (DataOutputStream fsOut = new DataOutputStream(os)) { + LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId()); + masterKey.write(fsOut); + writeFileWithRetries(nodeCreatePath, os.toByteArray()); + } } @Override @@ -523,13 +537,13 @@ public class FileSystemRMStateStore extends RMStateStore { Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX + masterKey.getKeyId()); LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId()); - deleteFile(nodeCreatePath); + deleteFileWithRetries(nodeCreatePath); } @Override - public synchronized void deleteStore() throws IOException { - if (fs.exists(rootDirPath)) { - fs.delete(rootDirPath, true); + public synchronized void deleteStore() throws Exception { + if (existsWithRetries(rootDirPath)) { + deleteFileWithRetries(rootDirPath); } } @@ -539,6 +553,146 @@ public class FileSystemRMStateStore extends RMStateStore { // FileSystem related code + private boolean checkAndRemovePartialRecordWithRetries(final Path record) + throws Exception { + return new FSAction() { + @Override + public Boolean run() throws Exception { + return checkAndRemovePartialRecord(record); + } + }.runWithRetries(); + } + + private void mkdirsWithRetries(final Path appDirPath) throws Exception { + new FSAction() { + @Override + public Void run() throws Exception { + fs.mkdirs(appDirPath); + return null; + } + }.runWithRetries(); + } + + private void writeFileWithRetries(final Path outputPath,final byte[] data) + throws Exception { + new FSAction() { + @Override + public Void run() throws Exception { + writeFile(outputPath, data); + return null; + } + }.runWithRetries(); + } + + private void deleteFileWithRetries(final Path deletePath) throws Exception { + new FSAction() { + @Override + public Void run() throws Exception { + deleteFile(deletePath); + return null; + } + }.runWithRetries(); + } + + private boolean renameFileWithRetries(final Path src, final Path dst) + throws Exception { + return new FSAction() { + @Override + public Boolean run() throws Exception { + return renameFile(src, dst); + } + }.runWithRetries(); + } + + private boolean createFileWithRetries(final Path newFile) throws Exception { + return new FSAction() { + @Override + public Boolean run() throws Exception { + return createFile(newFile); + } + }.runWithRetries(); + } + + private FileStatus getFileStatusWithRetries(final Path path) + throws Exception { + return new FSAction() { + @Override + public FileStatus run() throws Exception { + return getFileStatus(path); + } + }.runWithRetries(); + } + + private boolean existsWithRetries(final Path path) throws Exception { + return new FSAction() { + @Override + public Boolean run() throws Exception { + return fs.exists(path); + } + }.runWithRetries(); + } + + private byte[] readFileWithRetries(final Path inputPath, final long len) + throws Exception { + return new FSAction() { + @Override + public byte[] run() throws Exception { + return readFile(inputPath, len); + } + }.runWithRetries(); + } + + private FileStatus[] listStatusWithRetries(final Path path) + throws Exception { + return new FSAction() { + @Override + public FileStatus[] run() throws Exception { + return fs.listStatus(path); + } + }.runWithRetries(); + } + + private FileStatus[] listStatusWithRetries(final Path path, + final PathFilter filter) throws Exception { + return new FSAction() { + @Override + public FileStatus[] run() throws Exception { + return fs.listStatus(path, filter); + } + }.runWithRetries(); + } + + private void closeWithRetries() throws Exception { + new FSAction() { + @Override + public Void run() throws Exception { + fs.close(); + return null; + } + }.runWithRetries(); + } + + private abstract class FSAction { + abstract T run() throws Exception; + + T runWithRetries() throws Exception { + int retry = 0; + while (true) { + try { + return run(); + } catch (IOException e) { + LOG.info("Exception while executing a FS operation.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out FS retries. Giving up!"); + throw e; + } + LOG.info("Retrying operation on FS. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + } + private void deleteFile(Path deletePath) throws Exception { if(!fs.delete(deletePath, true)) { throw new Exception("Failed to delete " + deletePath); @@ -595,18 +749,18 @@ public class FileSystemRMStateStore extends RMStateStore { */ protected void updateFile(Path outputPath, byte[] data) throws Exception { Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new"); - // use writeFile to make sure .new file is created atomically - writeFile(newPath, data); + // use writeFileWithRetries to make sure .new file is created atomically + writeFileWithRetries(newPath, data); replaceFile(newPath, outputPath); } protected void replaceFile(Path srcPath, Path dstPath) throws Exception { - if (fs.exists(dstPath)) { - deleteFile(dstPath); + if (existsWithRetries(dstPath)) { + deleteFileWithRetries(dstPath); } else { LOG.info("File doesn't exist. Skip deleting the file " + dstPath); } - fs.rename(srcPath, dstPath); + renameFileWithRetries(srcPath, dstPath); } @Private @@ -637,8 +791,17 @@ public class FileSystemRMStateStore extends RMStateStore { if (isUpdate) { updateFile(nodeCreatePath, stateData); } else { - writeFile(nodeCreatePath, stateData); + writeFileWithRetries(nodeCreatePath, stateData); } } + @VisibleForTesting + public int getNumRetries() { + return fsNumRetries; + } + + @VisibleForTesting + public long getRetryInterval() { + return fsRetryInterval; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index d0d19e31031..675d73c1fbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -100,7 +100,12 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { workingDirPathURI.toString()); conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, "100,6000"); + conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 5); + conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, + 900L); this.store = new TestFileSystemRMStore(conf); + Assert.assertEquals(store.getNumRetries(), 5); + Assert.assertEquals(store.getRetryInterval(), 900L); return store; }