YARN-2820. Retry in FileSystemRMStateStore when FS's operations fail due to IOException. Contributed by Zhihai Xu.

This commit is contained in:
Tsuyoshi Ozawa 2015-02-28 00:56:44 +09:00
parent a979f3b58f
commit 01a1621930
5 changed files with 264 additions and 69 deletions

View File

@ -333,6 +333,9 @@ Release 2.7.0 - UNRELEASED
YARN-3255. RM, NM, JobHistoryServer, and WebAppProxyServer's main() YARN-3255. RM, NM, JobHistoryServer, and WebAppProxyServer's main()
should support generic options. (shv) should support generic options. (shv)
YARN-2820. Retry in FileSystemRMStateStore when FS's operations fail
due to IOException. (Zhihai Xu via ozawa)
OPTIMIZATIONS OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and YARN-2990. FairScheduler's delay-scheduling always waits for node-local and

View File

@ -508,6 +508,15 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC = public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
"2000, 500"; "2000, 500";
public static final String FS_RM_STATE_STORE_NUM_RETRIES =
RM_PREFIX + "fs.state-store.num-retries";
public static final int DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES = 0;
public static final String FS_RM_STATE_STORE_RETRY_INTERVAL_MS =
RM_PREFIX + "fs.state-store.retry-interval-ms";
public static final long DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS =
1000L;
public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
+ "leveldb-state-store.path"; + "leveldb-state-store.path";

View File

@ -419,6 +419,21 @@
<value>2000, 500</value> <value>2000, 500</value>
</property> </property>
<property>
<description>the number of retries to recover from IOException in
FileSystemRMStateStore.
</description>
<name>yarn.resourcemanager.fs.state-store.num-retries</name>
<value>0</value>
</property>
<property>
<description>Retry interval in milliseconds in FileSystemRMStateStore.
</description>
<name>yarn.resourcemanager.fs.state-store.retry-interval-ms</name>
<value>1000</value>
</property>
<property> <property>
<description>Local path where the RM state will be stored when using <description>Local path where the RM state will be stored when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore

View File

@ -92,6 +92,8 @@ public class FileSystemRMStateStore extends RMStateStore {
Path rmDTSecretManagerRoot; Path rmDTSecretManagerRoot;
private Path rmAppRoot; private Path rmAppRoot;
private Path dtSequenceNumberPath = null; private Path dtSequenceNumberPath = null;
private int fsNumRetries;
private long fsRetryInterval;
@VisibleForTesting @VisibleForTesting
Path fsWorkingPath; Path fsWorkingPath;
@ -106,6 +108,12 @@ public class FileSystemRMStateStore extends RMStateStore {
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT); rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
amrmTokenSecretManagerRoot = amrmTokenSecretManagerRoot =
new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT); new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
fsNumRetries =
conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES,
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES);
fsRetryInterval =
conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS);
} }
@Override @Override
@ -121,14 +129,14 @@ public class FileSystemRMStateStore extends RMStateStore {
conf.set("dfs.client.retry.policy.spec", retryPolicy); conf.set("dfs.client.retry.policy.spec", retryPolicy);
fs = fsWorkingPath.getFileSystem(conf); fs = fsWorkingPath.getFileSystem(conf);
fs.mkdirs(rmDTSecretManagerRoot); mkdirsWithRetries(rmDTSecretManagerRoot);
fs.mkdirs(rmAppRoot); mkdirsWithRetries(rmAppRoot);
fs.mkdirs(amrmTokenSecretManagerRoot); mkdirsWithRetries(amrmTokenSecretManagerRoot);
} }
@Override @Override
protected synchronized void closeInternal() throws Exception { protected synchronized void closeInternal() throws Exception {
fs.close(); closeWithRetries();
} }
@Override @Override
@ -139,9 +147,9 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override @Override
protected synchronized Version loadVersion() throws Exception { protected synchronized Version loadVersion() throws Exception {
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
FileStatus status = getFileStatus(versionNodePath); FileStatus status = getFileStatusWithRetries(versionNodePath);
if (status != null) { if (status != null) {
byte[] data = readFile(versionNodePath, status.getLen()); byte[] data = readFileWithRetries(versionNodePath, status.getLen());
Version version = Version version =
new VersionPBImpl(VersionProto.parseFrom(data)); new VersionPBImpl(VersionProto.parseFrom(data));
return version; return version;
@ -154,10 +162,10 @@ public class FileSystemRMStateStore extends RMStateStore {
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
byte[] data = byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (fs.exists(versionNodePath)) { if (existsWithRetries(versionNodePath)) {
updateFile(versionNodePath, data); updateFile(versionNodePath, data);
} else { } else {
writeFile(versionNodePath, data); writeFileWithRetries(versionNodePath, data);
} }
} }
@ -165,10 +173,10 @@ public class FileSystemRMStateStore extends RMStateStore {
public synchronized long getAndIncrementEpoch() throws Exception { public synchronized long getAndIncrementEpoch() throws Exception {
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE); Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
long currentEpoch = 0; long currentEpoch = 0;
FileStatus status = getFileStatus(epochNodePath); FileStatus status = getFileStatusWithRetries(epochNodePath);
if (status != null) { if (status != null) {
// load current epoch // load current epoch
byte[] data = readFile(epochNodePath, status.getLen()); byte[] data = readFileWithRetries(epochNodePath, status.getLen());
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch(); currentEpoch = epoch.getEpoch();
// increment epoch and store it // increment epoch and store it
@ -179,7 +187,7 @@ public class FileSystemRMStateStore extends RMStateStore {
// initialize epoch file with 1 for the next time. // initialize epoch file with 1 for the next time.
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray(); .toByteArray();
writeFile(epochNodePath, storeData); writeFileWithRetries(epochNodePath, storeData);
} }
return currentEpoch; return currentEpoch;
} }
@ -201,12 +209,14 @@ public class FileSystemRMStateStore extends RMStateStore {
checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot); checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
Path amrmTokenSecretManagerStateDataDir = Path amrmTokenSecretManagerStateDataDir =
new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE); new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
FileStatus status = getFileStatus(amrmTokenSecretManagerStateDataDir); FileStatus status = getFileStatusWithRetries(
amrmTokenSecretManagerStateDataDir);
if (status == null) { if (status == null) {
return; return;
} }
assert status.isFile(); assert status.isFile();
byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen()); byte[] data = readFileWithRetries(amrmTokenSecretManagerStateDataDir,
status.getLen());
AMRMTokenSecretManagerStatePBImpl stateData = AMRMTokenSecretManagerStatePBImpl stateData =
new AMRMTokenSecretManagerStatePBImpl( new AMRMTokenSecretManagerStatePBImpl(
AMRMTokenSecretManagerStateProto.parseFrom(data)); AMRMTokenSecretManagerStateProto.parseFrom(data));
@ -220,16 +230,18 @@ public class FileSystemRMStateStore extends RMStateStore {
List<ApplicationAttemptStateData> attempts = List<ApplicationAttemptStateData> attempts =
new ArrayList<ApplicationAttemptStateData>(); new ArrayList<ApplicationAttemptStateData>();
for (FileStatus appDir : fs.listStatus(rmAppRoot)) { for (FileStatus appDir : listStatusWithRetries(rmAppRoot)) {
checkAndResumeUpdateOperation(appDir.getPath()); checkAndResumeUpdateOperation(appDir.getPath());
for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) { for (FileStatus childNodeStatus :
listStatusWithRetries(appDir.getPath())) {
assert childNodeStatus.isFile(); assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName(); String childNodeName = childNodeStatus.getPath().getName();
if (checkAndRemovePartialRecord(childNodeStatus.getPath())) { if (checkAndRemovePartialRecordWithRetries(
childNodeStatus.getPath())) {
continue; continue;
} }
byte[] childData = byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
readFile(childNodeStatus.getPath(), childNodeStatus.getLen()); childNodeStatus.getLen());
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application // application
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -292,7 +304,7 @@ public class FileSystemRMStateStore extends RMStateStore {
// If it does, the prior updateFile is failed on half way. We need to // If it does, the prior updateFile is failed on half way. We need to
// complete replacing the old file first. // complete replacing the old file first.
FileStatus[] newChildNodes = FileStatus[] newChildNodes =
fs.listStatus(path, new PathFilter() { listStatusWithRetries(path, new PathFilter() {
@Override @Override
public boolean accept(Path path) { public boolean accept(Path path) {
return path.getName().endsWith(".new"); return path.getName().endsWith(".new");
@ -310,12 +322,12 @@ public class FileSystemRMStateStore extends RMStateStore {
} }
private void loadRMDTSecretManagerState(RMState rmState) throws Exception { private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
checkAndResumeUpdateOperation(rmDTSecretManagerRoot); checkAndResumeUpdateOperation(rmDTSecretManagerRoot);
FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot); FileStatus[] childNodes = listStatusWithRetries(rmDTSecretManagerRoot);
for(FileStatus childNodeStatus : childNodes) { for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile(); assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName(); String childNodeName = childNodeStatus.getPath().getName();
if (checkAndRemovePartialRecord(childNodeStatus.getPath())) { if (checkAndRemovePartialRecordWithRetries(childNodeStatus.getPath())) {
continue; continue;
} }
if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
@ -325,35 +337,36 @@ public class FileSystemRMStateStore extends RMStateStore {
} }
Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName); Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); byte[] childData = readFileWithRetries(childNodePath,
childNodeStatus.getLen());
ByteArrayInputStream is = new ByteArrayInputStream(childData); ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is); try (DataInputStream fsIn = new DataInputStream(is)) {
if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){ if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
DelegationKey key = new DelegationKey(); DelegationKey key = new DelegationKey();
key.readFields(fsIn); key.readFields(fsIn);
rmState.rmSecretManagerState.masterKeyState.add(key); rmState.rmSecretManagerState.masterKeyState.add(key);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loaded delegation key: keyId=" + key.getKeyId() LOG.debug("Loaded delegation key: keyId=" + key.getKeyId()
+ ", expirationDate=" + key.getExpiryDate()); + ", expirationDate=" + key.getExpiryDate());
} }
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifierData identifierData = RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData(); new RMDelegationTokenIdentifierData();
identifierData.readFields(fsIn); identifierData.readFields(fsIn);
RMDelegationTokenIdentifier identifier = RMDelegationTokenIdentifier identifier =
identifierData.getTokenIdentifier(); identifierData.getTokenIdentifier();
long renewDate = identifierData.getRenewDate(); long renewDate = identifierData.getRenewDate();
rmState.rmSecretManagerState.delegationTokenState.put(identifier, rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate); renewDate);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
+ " renewDate=" + renewDate); + " renewDate=" + renewDate);
}
} else {
LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
} }
} else {
LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
} }
fsIn.close();
} }
} }
@ -361,7 +374,7 @@ public class FileSystemRMStateStore extends RMStateStore {
public synchronized void storeApplicationStateInternal(ApplicationId appId, public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception { ApplicationStateData appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId); Path appDirPath = getAppDir(rmAppRoot, appId);
fs.mkdirs(appDirPath); mkdirsWithRetries(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
@ -369,7 +382,7 @@ public class FileSystemRMStateStore extends RMStateStore {
try { try {
// currently throw all exceptions. May need to respond differently for HA // currently throw all exceptions. May need to respond differently for HA
// based on whether we have lost the right to write to FS // based on whether we have lost the right to write to FS
writeFile(nodeCreatePath, appStateData); writeFileWithRetries(nodeCreatePath, appStateData);
} catch (Exception e) { } catch (Exception e) {
LOG.info("Error storing info for app: " + appId, e); LOG.info("Error storing info for app: " + appId, e);
throw e; throw e;
@ -408,7 +421,7 @@ public class FileSystemRMStateStore extends RMStateStore {
try { try {
// currently throw all exceptions. May need to respond differently for HA // currently throw all exceptions. May need to respond differently for HA
// based on whether we have lost the right to write to FS // based on whether we have lost the right to write to FS
writeFile(nodeCreatePath, attemptStateData); writeFileWithRetries(nodeCreatePath, attemptStateData);
} catch (Exception e) { } catch (Exception e) {
LOG.info("Error storing info for attempt: " + appAttemptId, e); LOG.info("Error storing info for attempt: " + appAttemptId, e);
throw e; throw e;
@ -444,7 +457,7 @@ public class FileSystemRMStateStore extends RMStateStore {
appState.getApplicationSubmissionContext().getApplicationId(); appState.getApplicationSubmissionContext().getApplicationId();
Path nodeRemovePath = getAppDir(rmAppRoot, appId); Path nodeRemovePath = getAppDir(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath); deleteFileWithRetries(nodeRemovePath);
} }
@Override @Override
@ -460,7 +473,7 @@ public class FileSystemRMStateStore extends RMStateStore {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
deleteFile(nodeCreatePath); deleteFileWithRetries(nodeCreatePath);
} }
@Override @Override
@ -483,7 +496,7 @@ public class FileSystemRMStateStore extends RMStateStore {
updateFile(nodeCreatePath, identifierData.toByteArray()); updateFile(nodeCreatePath, identifierData.toByteArray());
} else { } else {
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
writeFile(nodeCreatePath, identifierData.toByteArray()); writeFileWithRetries(nodeCreatePath, identifierData.toByteArray());
// store sequence number // store sequence number
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot, Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
@ -492,11 +505,12 @@ public class FileSystemRMStateStore extends RMStateStore {
LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
+ identifier.getSequenceNumber()); + identifier.getSequenceNumber());
if (dtSequenceNumberPath == null) { if (dtSequenceNumberPath == null) {
if (!createFile(latestSequenceNumberPath)) { if (!createFileWithRetries(latestSequenceNumberPath)) {
throw new Exception("Failed to create " + latestSequenceNumberPath); throw new Exception("Failed to create " + latestSequenceNumberPath);
} }
} else { } else {
if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) { if (!renameFileWithRetries(dtSequenceNumberPath,
latestSequenceNumberPath)) {
throw new Exception("Failed to rename " + dtSequenceNumberPath); throw new Exception("Failed to rename " + dtSequenceNumberPath);
} }
} }
@ -510,11 +524,11 @@ public class FileSystemRMStateStore extends RMStateStore {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_KEY_PREFIX + masterKey.getKeyId()); DELEGATION_KEY_PREFIX + masterKey.getKeyId());
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os); try (DataOutputStream fsOut = new DataOutputStream(os)) {
LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId()); LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
masterKey.write(fsOut); masterKey.write(fsOut);
writeFile(nodeCreatePath, os.toByteArray()); writeFileWithRetries(nodeCreatePath, os.toByteArray());
fsOut.close(); }
} }
@Override @Override
@ -523,13 +537,13 @@ public class FileSystemRMStateStore extends RMStateStore {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_KEY_PREFIX + masterKey.getKeyId()); DELEGATION_KEY_PREFIX + masterKey.getKeyId());
LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId()); LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId());
deleteFile(nodeCreatePath); deleteFileWithRetries(nodeCreatePath);
} }
@Override @Override
public synchronized void deleteStore() throws IOException { public synchronized void deleteStore() throws Exception {
if (fs.exists(rootDirPath)) { if (existsWithRetries(rootDirPath)) {
fs.delete(rootDirPath, true); deleteFileWithRetries(rootDirPath);
} }
} }
@ -539,6 +553,146 @@ public class FileSystemRMStateStore extends RMStateStore {
// FileSystem related code // FileSystem related code
private boolean checkAndRemovePartialRecordWithRetries(final Path record)
throws Exception {
return new FSAction<Boolean>() {
@Override
public Boolean run() throws Exception {
return checkAndRemovePartialRecord(record);
}
}.runWithRetries();
}
private void mkdirsWithRetries(final Path appDirPath) throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws Exception {
fs.mkdirs(appDirPath);
return null;
}
}.runWithRetries();
}
private void writeFileWithRetries(final Path outputPath,final byte[] data)
throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws Exception {
writeFile(outputPath, data);
return null;
}
}.runWithRetries();
}
private void deleteFileWithRetries(final Path deletePath) throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws Exception {
deleteFile(deletePath);
return null;
}
}.runWithRetries();
}
private boolean renameFileWithRetries(final Path src, final Path dst)
throws Exception {
return new FSAction<Boolean>() {
@Override
public Boolean run() throws Exception {
return renameFile(src, dst);
}
}.runWithRetries();
}
private boolean createFileWithRetries(final Path newFile) throws Exception {
return new FSAction<Boolean>() {
@Override
public Boolean run() throws Exception {
return createFile(newFile);
}
}.runWithRetries();
}
private FileStatus getFileStatusWithRetries(final Path path)
throws Exception {
return new FSAction<FileStatus>() {
@Override
public FileStatus run() throws Exception {
return getFileStatus(path);
}
}.runWithRetries();
}
private boolean existsWithRetries(final Path path) throws Exception {
return new FSAction<Boolean>() {
@Override
public Boolean run() throws Exception {
return fs.exists(path);
}
}.runWithRetries();
}
private byte[] readFileWithRetries(final Path inputPath, final long len)
throws Exception {
return new FSAction<byte[]>() {
@Override
public byte[] run() throws Exception {
return readFile(inputPath, len);
}
}.runWithRetries();
}
private FileStatus[] listStatusWithRetries(final Path path)
throws Exception {
return new FSAction<FileStatus[]>() {
@Override
public FileStatus[] run() throws Exception {
return fs.listStatus(path);
}
}.runWithRetries();
}
private FileStatus[] listStatusWithRetries(final Path path,
final PathFilter filter) throws Exception {
return new FSAction<FileStatus[]>() {
@Override
public FileStatus[] run() throws Exception {
return fs.listStatus(path, filter);
}
}.runWithRetries();
}
private void closeWithRetries() throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws Exception {
fs.close();
return null;
}
}.runWithRetries();
}
private abstract class FSAction<T> {
abstract T run() throws Exception;
T runWithRetries() throws Exception {
int retry = 0;
while (true) {
try {
return run();
} catch (IOException e) {
LOG.info("Exception while executing a FS operation.", e);
if (++retry > fsNumRetries) {
LOG.info("Maxed out FS retries. Giving up!");
throw e;
}
LOG.info("Retrying operation on FS. Retry no. " + retry);
Thread.sleep(fsRetryInterval);
}
}
}
}
private void deleteFile(Path deletePath) throws Exception { private void deleteFile(Path deletePath) throws Exception {
if(!fs.delete(deletePath, true)) { if(!fs.delete(deletePath, true)) {
throw new Exception("Failed to delete " + deletePath); throw new Exception("Failed to delete " + deletePath);
@ -595,18 +749,18 @@ public class FileSystemRMStateStore extends RMStateStore {
*/ */
protected void updateFile(Path outputPath, byte[] data) throws Exception { protected void updateFile(Path outputPath, byte[] data) throws Exception {
Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new"); Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
// use writeFile to make sure .new file is created atomically // use writeFileWithRetries to make sure .new file is created atomically
writeFile(newPath, data); writeFileWithRetries(newPath, data);
replaceFile(newPath, outputPath); replaceFile(newPath, outputPath);
} }
protected void replaceFile(Path srcPath, Path dstPath) throws Exception { protected void replaceFile(Path srcPath, Path dstPath) throws Exception {
if (fs.exists(dstPath)) { if (existsWithRetries(dstPath)) {
deleteFile(dstPath); deleteFileWithRetries(dstPath);
} else { } else {
LOG.info("File doesn't exist. Skip deleting the file " + dstPath); LOG.info("File doesn't exist. Skip deleting the file " + dstPath);
} }
fs.rename(srcPath, dstPath); renameFileWithRetries(srcPath, dstPath);
} }
@Private @Private
@ -637,8 +791,17 @@ public class FileSystemRMStateStore extends RMStateStore {
if (isUpdate) { if (isUpdate) {
updateFile(nodeCreatePath, stateData); updateFile(nodeCreatePath, stateData);
} else { } else {
writeFile(nodeCreatePath, stateData); writeFileWithRetries(nodeCreatePath, stateData);
} }
} }
@VisibleForTesting
public int getNumRetries() {
return fsNumRetries;
}
@VisibleForTesting
public long getRetryInterval() {
return fsRetryInterval;
}
} }

View File

@ -100,7 +100,12 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
workingDirPathURI.toString()); workingDirPathURI.toString());
conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
"100,6000"); "100,6000");
conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 5);
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
900L);
this.store = new TestFileSystemRMStore(conf); this.store = new TestFileSystemRMStore(conf);
Assert.assertEquals(store.getNumRetries(), 5);
Assert.assertEquals(store.getRetryInterval(), 900L);
return store; return store;
} }