diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 801192a046d..b016cbb1d8b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -294,6 +294,9 @@ Release 2.7.0 - UNRELEASED
YARN-3255. RM, NM, JobHistoryServer, and WebAppProxyServer's main()
should support generic options. (shv)
+ YARN-2820. Retry in FileSystemRMStateStore when FS's operations fail
+ due to IOException. (Zhihai Xu via ozawa)
+
OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 544ae1b3e64..8cc7ad70a94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -508,6 +508,15 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
"2000, 500";
+ public static final String FS_RM_STATE_STORE_NUM_RETRIES =
+ RM_PREFIX + "fs.state-store.num-retries";
+ public static final int DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES = 0;
+
+ public static final String FS_RM_STATE_STORE_RETRY_INTERVAL_MS =
+ RM_PREFIX + "fs.state-store.retry-interval-ms";
+ public static final long DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS =
+ 1000L;
+
public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
+ "leveldb-state-store.path";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0a1d3db3b7f..f311f16128d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -419,6 +419,21 @@
2000, 500
+
+ the number of retries to recover from IOException in
+ FileSystemRMStateStore.
+
+ yarn.resourcemanager.fs.state-store.num-retries
+ 0
+
+
+
+ Retry interval in milliseconds in FileSystemRMStateStore.
+
+ yarn.resourcemanager.fs.state-store.retry-interval-ms
+ 1000
+
+
Local path where the RM state will be stored when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 6e830a028ab..8147597dcad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -92,6 +92,8 @@ public class FileSystemRMStateStore extends RMStateStore {
Path rmDTSecretManagerRoot;
private Path rmAppRoot;
private Path dtSequenceNumberPath = null;
+ private int fsNumRetries;
+ private long fsRetryInterval;
@VisibleForTesting
Path fsWorkingPath;
@@ -106,6 +108,12 @@ public class FileSystemRMStateStore extends RMStateStore {
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
amrmTokenSecretManagerRoot =
new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
+ fsNumRetries =
+ conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES,
+ YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES);
+ fsRetryInterval =
+ conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS);
}
@Override
@@ -121,14 +129,14 @@ public class FileSystemRMStateStore extends RMStateStore {
conf.set("dfs.client.retry.policy.spec", retryPolicy);
fs = fsWorkingPath.getFileSystem(conf);
- fs.mkdirs(rmDTSecretManagerRoot);
- fs.mkdirs(rmAppRoot);
- fs.mkdirs(amrmTokenSecretManagerRoot);
+ mkdirsWithRetries(rmDTSecretManagerRoot);
+ mkdirsWithRetries(rmAppRoot);
+ mkdirsWithRetries(amrmTokenSecretManagerRoot);
}
@Override
protected synchronized void closeInternal() throws Exception {
- fs.close();
+ closeWithRetries();
}
@Override
@@ -139,9 +147,9 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
protected synchronized Version loadVersion() throws Exception {
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
- FileStatus status = getFileStatus(versionNodePath);
+ FileStatus status = getFileStatusWithRetries(versionNodePath);
if (status != null) {
- byte[] data = readFile(versionNodePath, status.getLen());
+ byte[] data = readFileWithRetries(versionNodePath, status.getLen());
Version version =
new VersionPBImpl(VersionProto.parseFrom(data));
return version;
@@ -154,10 +162,10 @@ public class FileSystemRMStateStore extends RMStateStore {
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
- if (fs.exists(versionNodePath)) {
+ if (existsWithRetries(versionNodePath)) {
updateFile(versionNodePath, data);
} else {
- writeFile(versionNodePath, data);
+ writeFileWithRetries(versionNodePath, data);
}
}
@@ -165,10 +173,10 @@ public class FileSystemRMStateStore extends RMStateStore {
public synchronized long getAndIncrementEpoch() throws Exception {
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
long currentEpoch = 0;
- FileStatus status = getFileStatus(epochNodePath);
+ FileStatus status = getFileStatusWithRetries(epochNodePath);
if (status != null) {
// load current epoch
- byte[] data = readFile(epochNodePath, status.getLen());
+ byte[] data = readFileWithRetries(epochNodePath, status.getLen());
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
@@ -179,7 +187,7 @@ public class FileSystemRMStateStore extends RMStateStore {
// initialize epoch file with 1 for the next time.
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
- writeFile(epochNodePath, storeData);
+ writeFileWithRetries(epochNodePath, storeData);
}
return currentEpoch;
}
@@ -201,12 +209,14 @@ public class FileSystemRMStateStore extends RMStateStore {
checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
Path amrmTokenSecretManagerStateDataDir =
new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
- FileStatus status = getFileStatus(amrmTokenSecretManagerStateDataDir);
+ FileStatus status = getFileStatusWithRetries(
+ amrmTokenSecretManagerStateDataDir);
if (status == null) {
return;
}
assert status.isFile();
- byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
+ byte[] data = readFileWithRetries(amrmTokenSecretManagerStateDataDir,
+ status.getLen());
AMRMTokenSecretManagerStatePBImpl stateData =
new AMRMTokenSecretManagerStatePBImpl(
AMRMTokenSecretManagerStateProto.parseFrom(data));
@@ -220,16 +230,18 @@ public class FileSystemRMStateStore extends RMStateStore {
List attempts =
new ArrayList();
- for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
+ for (FileStatus appDir : listStatusWithRetries(rmAppRoot)) {
checkAndResumeUpdateOperation(appDir.getPath());
- for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
+ for (FileStatus childNodeStatus :
+ listStatusWithRetries(appDir.getPath())) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
- if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+ if (checkAndRemovePartialRecordWithRetries(
+ childNodeStatus.getPath())) {
continue;
}
- byte[] childData =
- readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
+ byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
+ childNodeStatus.getLen());
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
if (LOG.isDebugEnabled()) {
@@ -292,7 +304,7 @@ public class FileSystemRMStateStore extends RMStateStore {
// If it does, the prior updateFile is failed on half way. We need to
// complete replacing the old file first.
FileStatus[] newChildNodes =
- fs.listStatus(path, new PathFilter() {
+ listStatusWithRetries(path, new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().endsWith(".new");
@@ -310,12 +322,12 @@ public class FileSystemRMStateStore extends RMStateStore {
}
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
checkAndResumeUpdateOperation(rmDTSecretManagerRoot);
- FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
+ FileStatus[] childNodes = listStatusWithRetries(rmDTSecretManagerRoot);
for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
- if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+ if (checkAndRemovePartialRecordWithRetries(childNodeStatus.getPath())) {
continue;
}
if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
@@ -325,35 +337,36 @@ public class FileSystemRMStateStore extends RMStateStore {
}
Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
- byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ byte[] childData = readFileWithRetries(childNodePath,
+ childNodeStatus.getLen());
ByteArrayInputStream is = new ByteArrayInputStream(childData);
- DataInputStream fsIn = new DataInputStream(is);
- if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){
- DelegationKey key = new DelegationKey();
- key.readFields(fsIn);
- rmState.rmSecretManagerState.masterKeyState.add(key);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded delegation key: keyId=" + key.getKeyId()
- + ", expirationDate=" + key.getExpiryDate());
- }
- } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
- RMDelegationTokenIdentifierData identifierData =
- new RMDelegationTokenIdentifierData();
- identifierData.readFields(fsIn);
- RMDelegationTokenIdentifier identifier =
- identifierData.getTokenIdentifier();
- long renewDate = identifierData.getRenewDate();
+ try (DataInputStream fsIn = new DataInputStream(is)) {
+ if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
+ DelegationKey key = new DelegationKey();
+ key.readFields(fsIn);
+ rmState.rmSecretManagerState.masterKeyState.add(key);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded delegation key: keyId=" + key.getKeyId()
+ + ", expirationDate=" + key.getExpiryDate());
+ }
+ } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
+ RMDelegationTokenIdentifierData identifierData =
+ new RMDelegationTokenIdentifierData();
+ identifierData.readFields(fsIn);
+ RMDelegationTokenIdentifier identifier =
+ identifierData.getTokenIdentifier();
+ long renewDate = identifierData.getRenewDate();
- rmState.rmSecretManagerState.delegationTokenState.put(identifier,
- renewDate);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
- + " renewDate=" + renewDate);
+ rmState.rmSecretManagerState.delegationTokenState.put(identifier,
+ renewDate);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
+ + " renewDate=" + renewDate);
+ }
+ } else {
+ LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
}
- } else {
- LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
}
- fsIn.close();
}
}
@@ -361,7 +374,7 @@ public class FileSystemRMStateStore extends RMStateStore {
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
- fs.mkdirs(appDirPath);
+ mkdirsWithRetries(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
@@ -369,7 +382,7 @@ public class FileSystemRMStateStore extends RMStateStore {
try {
// currently throw all exceptions. May need to respond differently for HA
// based on whether we have lost the right to write to FS
- writeFile(nodeCreatePath, appStateData);
+ writeFileWithRetries(nodeCreatePath, appStateData);
} catch (Exception e) {
LOG.info("Error storing info for app: " + appId, e);
throw e;
@@ -408,7 +421,7 @@ public class FileSystemRMStateStore extends RMStateStore {
try {
// currently throw all exceptions. May need to respond differently for HA
// based on whether we have lost the right to write to FS
- writeFile(nodeCreatePath, attemptStateData);
+ writeFileWithRetries(nodeCreatePath, attemptStateData);
} catch (Exception e) {
LOG.info("Error storing info for attempt: " + appAttemptId, e);
throw e;
@@ -444,7 +457,7 @@ public class FileSystemRMStateStore extends RMStateStore {
appState.getApplicationSubmissionContext().getApplicationId();
Path nodeRemovePath = getAppDir(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
- deleteFile(nodeRemovePath);
+ deleteFileWithRetries(nodeRemovePath);
}
@Override
@@ -460,7 +473,7 @@ public class FileSystemRMStateStore extends RMStateStore {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
- deleteFile(nodeCreatePath);
+ deleteFileWithRetries(nodeCreatePath);
}
@Override
@@ -483,7 +496,7 @@ public class FileSystemRMStateStore extends RMStateStore {
updateFile(nodeCreatePath, identifierData.toByteArray());
} else {
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
- writeFile(nodeCreatePath, identifierData.toByteArray());
+ writeFileWithRetries(nodeCreatePath, identifierData.toByteArray());
// store sequence number
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
@@ -492,11 +505,12 @@ public class FileSystemRMStateStore extends RMStateStore {
LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
+ identifier.getSequenceNumber());
if (dtSequenceNumberPath == null) {
- if (!createFile(latestSequenceNumberPath)) {
+ if (!createFileWithRetries(latestSequenceNumberPath)) {
throw new Exception("Failed to create " + latestSequenceNumberPath);
}
} else {
- if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
+ if (!renameFileWithRetries(dtSequenceNumberPath,
+ latestSequenceNumberPath)) {
throw new Exception("Failed to rename " + dtSequenceNumberPath);
}
}
@@ -510,11 +524,11 @@ public class FileSystemRMStateStore extends RMStateStore {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_KEY_PREFIX + masterKey.getKeyId());
ByteArrayOutputStream os = new ByteArrayOutputStream();
- DataOutputStream fsOut = new DataOutputStream(os);
- LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
- masterKey.write(fsOut);
- writeFile(nodeCreatePath, os.toByteArray());
- fsOut.close();
+ try (DataOutputStream fsOut = new DataOutputStream(os)) {
+ LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
+ masterKey.write(fsOut);
+ writeFileWithRetries(nodeCreatePath, os.toByteArray());
+ }
}
@Override
@@ -523,13 +537,13 @@ public class FileSystemRMStateStore extends RMStateStore {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_KEY_PREFIX + masterKey.getKeyId());
LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId());
- deleteFile(nodeCreatePath);
+ deleteFileWithRetries(nodeCreatePath);
}
@Override
- public synchronized void deleteStore() throws IOException {
- if (fs.exists(rootDirPath)) {
- fs.delete(rootDirPath, true);
+ public synchronized void deleteStore() throws Exception {
+ if (existsWithRetries(rootDirPath)) {
+ deleteFileWithRetries(rootDirPath);
}
}
@@ -539,6 +553,146 @@ public class FileSystemRMStateStore extends RMStateStore {
// FileSystem related code
+ private boolean checkAndRemovePartialRecordWithRetries(final Path record)
+ throws Exception {
+ return new FSAction() {
+ @Override
+ public Boolean run() throws Exception {
+ return checkAndRemovePartialRecord(record);
+ }
+ }.runWithRetries();
+ }
+
+ private void mkdirsWithRetries(final Path appDirPath) throws Exception {
+ new FSAction() {
+ @Override
+ public Void run() throws Exception {
+ fs.mkdirs(appDirPath);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ private void writeFileWithRetries(final Path outputPath,final byte[] data)
+ throws Exception {
+ new FSAction() {
+ @Override
+ public Void run() throws Exception {
+ writeFile(outputPath, data);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ private void deleteFileWithRetries(final Path deletePath) throws Exception {
+ new FSAction() {
+ @Override
+ public Void run() throws Exception {
+ deleteFile(deletePath);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ private boolean renameFileWithRetries(final Path src, final Path dst)
+ throws Exception {
+ return new FSAction() {
+ @Override
+ public Boolean run() throws Exception {
+ return renameFile(src, dst);
+ }
+ }.runWithRetries();
+ }
+
+ private boolean createFileWithRetries(final Path newFile) throws Exception {
+ return new FSAction() {
+ @Override
+ public Boolean run() throws Exception {
+ return createFile(newFile);
+ }
+ }.runWithRetries();
+ }
+
+ private FileStatus getFileStatusWithRetries(final Path path)
+ throws Exception {
+ return new FSAction() {
+ @Override
+ public FileStatus run() throws Exception {
+ return getFileStatus(path);
+ }
+ }.runWithRetries();
+ }
+
+ private boolean existsWithRetries(final Path path) throws Exception {
+ return new FSAction() {
+ @Override
+ public Boolean run() throws Exception {
+ return fs.exists(path);
+ }
+ }.runWithRetries();
+ }
+
+ private byte[] readFileWithRetries(final Path inputPath, final long len)
+ throws Exception {
+ return new FSAction() {
+ @Override
+ public byte[] run() throws Exception {
+ return readFile(inputPath, len);
+ }
+ }.runWithRetries();
+ }
+
+ private FileStatus[] listStatusWithRetries(final Path path)
+ throws Exception {
+ return new FSAction() {
+ @Override
+ public FileStatus[] run() throws Exception {
+ return fs.listStatus(path);
+ }
+ }.runWithRetries();
+ }
+
+ private FileStatus[] listStatusWithRetries(final Path path,
+ final PathFilter filter) throws Exception {
+ return new FSAction() {
+ @Override
+ public FileStatus[] run() throws Exception {
+ return fs.listStatus(path, filter);
+ }
+ }.runWithRetries();
+ }
+
+ private void closeWithRetries() throws Exception {
+ new FSAction() {
+ @Override
+ public Void run() throws Exception {
+ fs.close();
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ private abstract class FSAction {
+ abstract T run() throws Exception;
+
+ T runWithRetries() throws Exception {
+ int retry = 0;
+ while (true) {
+ try {
+ return run();
+ } catch (IOException e) {
+ LOG.info("Exception while executing a FS operation.", e);
+ if (++retry > fsNumRetries) {
+ LOG.info("Maxed out FS retries. Giving up!");
+ throw e;
+ }
+ LOG.info("Retrying operation on FS. Retry no. " + retry);
+ Thread.sleep(fsRetryInterval);
+ }
+ }
+ }
+ }
+
private void deleteFile(Path deletePath) throws Exception {
if(!fs.delete(deletePath, true)) {
throw new Exception("Failed to delete " + deletePath);
@@ -595,18 +749,18 @@ public class FileSystemRMStateStore extends RMStateStore {
*/
protected void updateFile(Path outputPath, byte[] data) throws Exception {
Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
- // use writeFile to make sure .new file is created atomically
- writeFile(newPath, data);
+ // use writeFileWithRetries to make sure .new file is created atomically
+ writeFileWithRetries(newPath, data);
replaceFile(newPath, outputPath);
}
protected void replaceFile(Path srcPath, Path dstPath) throws Exception {
- if (fs.exists(dstPath)) {
- deleteFile(dstPath);
+ if (existsWithRetries(dstPath)) {
+ deleteFileWithRetries(dstPath);
} else {
LOG.info("File doesn't exist. Skip deleting the file " + dstPath);
}
- fs.rename(srcPath, dstPath);
+ renameFileWithRetries(srcPath, dstPath);
}
@Private
@@ -637,8 +791,17 @@ public class FileSystemRMStateStore extends RMStateStore {
if (isUpdate) {
updateFile(nodeCreatePath, stateData);
} else {
- writeFile(nodeCreatePath, stateData);
+ writeFileWithRetries(nodeCreatePath, stateData);
}
}
+ @VisibleForTesting
+ public int getNumRetries() {
+ return fsNumRetries;
+ }
+
+ @VisibleForTesting
+ public long getRetryInterval() {
+ return fsRetryInterval;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index d0d19e31031..675d73c1fbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -100,7 +100,12 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
workingDirPathURI.toString());
conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
"100,6000");
+ conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 5);
+ conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
+ 900L);
this.store = new TestFileSystemRMStore(conf);
+ Assert.assertEquals(store.getNumRetries(), 5);
+ Assert.assertEquals(store.getRetryInterval(), 900L);
return store;
}