YARN-2820. Retry in FileSystemRMStateStore when FS's operations fail due to IOException. Contributed by Zhihai Xu.
(cherry picked from commit 01a1621930
)
This commit is contained in:
parent
657b027bb2
commit
79f73f4613
|
@ -294,6 +294,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-3255. RM, NM, JobHistoryServer, and WebAppProxyServer's main()
|
YARN-3255. RM, NM, JobHistoryServer, and WebAppProxyServer's main()
|
||||||
should support generic options. (shv)
|
should support generic options. (shv)
|
||||||
|
|
||||||
|
YARN-2820. Retry in FileSystemRMStateStore when FS's operations fail
|
||||||
|
due to IOException. (Zhihai Xu via ozawa)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
|
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
|
||||||
|
|
|
@ -508,6 +508,15 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
|
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
|
||||||
"2000, 500";
|
"2000, 500";
|
||||||
|
|
||||||
|
public static final String FS_RM_STATE_STORE_NUM_RETRIES =
|
||||||
|
RM_PREFIX + "fs.state-store.num-retries";
|
||||||
|
public static final int DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES = 0;
|
||||||
|
|
||||||
|
public static final String FS_RM_STATE_STORE_RETRY_INTERVAL_MS =
|
||||||
|
RM_PREFIX + "fs.state-store.retry-interval-ms";
|
||||||
|
public static final long DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS =
|
||||||
|
1000L;
|
||||||
|
|
||||||
public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
|
public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
|
||||||
+ "leveldb-state-store.path";
|
+ "leveldb-state-store.path";
|
||||||
|
|
||||||
|
|
|
@ -419,6 +419,21 @@
|
||||||
<value>2000, 500</value>
|
<value>2000, 500</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>the number of retries to recover from IOException in
|
||||||
|
FileSystemRMStateStore.
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.fs.state-store.num-retries</name>
|
||||||
|
<value>0</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Retry interval in milliseconds in FileSystemRMStateStore.
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.fs.state-store.retry-interval-ms</name>
|
||||||
|
<value>1000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Local path where the RM state will be stored when using
|
<description>Local path where the RM state will be stored when using
|
||||||
org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore
|
org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore
|
||||||
|
|
|
@ -92,6 +92,8 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
Path rmDTSecretManagerRoot;
|
Path rmDTSecretManagerRoot;
|
||||||
private Path rmAppRoot;
|
private Path rmAppRoot;
|
||||||
private Path dtSequenceNumberPath = null;
|
private Path dtSequenceNumberPath = null;
|
||||||
|
private int fsNumRetries;
|
||||||
|
private long fsRetryInterval;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Path fsWorkingPath;
|
Path fsWorkingPath;
|
||||||
|
@ -106,6 +108,12 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
|
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
|
||||||
amrmTokenSecretManagerRoot =
|
amrmTokenSecretManagerRoot =
|
||||||
new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
|
new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
|
||||||
|
fsNumRetries =
|
||||||
|
conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES,
|
||||||
|
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES);
|
||||||
|
fsRetryInterval =
|
||||||
|
conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -121,14 +129,14 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
conf.set("dfs.client.retry.policy.spec", retryPolicy);
|
conf.set("dfs.client.retry.policy.spec", retryPolicy);
|
||||||
|
|
||||||
fs = fsWorkingPath.getFileSystem(conf);
|
fs = fsWorkingPath.getFileSystem(conf);
|
||||||
fs.mkdirs(rmDTSecretManagerRoot);
|
mkdirsWithRetries(rmDTSecretManagerRoot);
|
||||||
fs.mkdirs(rmAppRoot);
|
mkdirsWithRetries(rmAppRoot);
|
||||||
fs.mkdirs(amrmTokenSecretManagerRoot);
|
mkdirsWithRetries(amrmTokenSecretManagerRoot);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void closeInternal() throws Exception {
|
protected synchronized void closeInternal() throws Exception {
|
||||||
fs.close();
|
closeWithRetries();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -139,9 +147,9 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
protected synchronized Version loadVersion() throws Exception {
|
protected synchronized Version loadVersion() throws Exception {
|
||||||
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
||||||
FileStatus status = getFileStatus(versionNodePath);
|
FileStatus status = getFileStatusWithRetries(versionNodePath);
|
||||||
if (status != null) {
|
if (status != null) {
|
||||||
byte[] data = readFile(versionNodePath, status.getLen());
|
byte[] data = readFileWithRetries(versionNodePath, status.getLen());
|
||||||
Version version =
|
Version version =
|
||||||
new VersionPBImpl(VersionProto.parseFrom(data));
|
new VersionPBImpl(VersionProto.parseFrom(data));
|
||||||
return version;
|
return version;
|
||||||
|
@ -154,10 +162,10 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
||||||
byte[] data =
|
byte[] data =
|
||||||
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
||||||
if (fs.exists(versionNodePath)) {
|
if (existsWithRetries(versionNodePath)) {
|
||||||
updateFile(versionNodePath, data);
|
updateFile(versionNodePath, data);
|
||||||
} else {
|
} else {
|
||||||
writeFile(versionNodePath, data);
|
writeFileWithRetries(versionNodePath, data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,10 +173,10 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
public synchronized long getAndIncrementEpoch() throws Exception {
|
public synchronized long getAndIncrementEpoch() throws Exception {
|
||||||
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
|
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
|
||||||
long currentEpoch = 0;
|
long currentEpoch = 0;
|
||||||
FileStatus status = getFileStatus(epochNodePath);
|
FileStatus status = getFileStatusWithRetries(epochNodePath);
|
||||||
if (status != null) {
|
if (status != null) {
|
||||||
// load current epoch
|
// load current epoch
|
||||||
byte[] data = readFile(epochNodePath, status.getLen());
|
byte[] data = readFileWithRetries(epochNodePath, status.getLen());
|
||||||
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
|
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
|
||||||
currentEpoch = epoch.getEpoch();
|
currentEpoch = epoch.getEpoch();
|
||||||
// increment epoch and store it
|
// increment epoch and store it
|
||||||
|
@ -179,7 +187,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
// initialize epoch file with 1 for the next time.
|
// initialize epoch file with 1 for the next time.
|
||||||
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
|
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
|
||||||
.toByteArray();
|
.toByteArray();
|
||||||
writeFile(epochNodePath, storeData);
|
writeFileWithRetries(epochNodePath, storeData);
|
||||||
}
|
}
|
||||||
return currentEpoch;
|
return currentEpoch;
|
||||||
}
|
}
|
||||||
|
@ -201,12 +209,14 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
|
checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
|
||||||
Path amrmTokenSecretManagerStateDataDir =
|
Path amrmTokenSecretManagerStateDataDir =
|
||||||
new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
|
new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
|
||||||
FileStatus status = getFileStatus(amrmTokenSecretManagerStateDataDir);
|
FileStatus status = getFileStatusWithRetries(
|
||||||
|
amrmTokenSecretManagerStateDataDir);
|
||||||
if (status == null) {
|
if (status == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
assert status.isFile();
|
assert status.isFile();
|
||||||
byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
|
byte[] data = readFileWithRetries(amrmTokenSecretManagerStateDataDir,
|
||||||
|
status.getLen());
|
||||||
AMRMTokenSecretManagerStatePBImpl stateData =
|
AMRMTokenSecretManagerStatePBImpl stateData =
|
||||||
new AMRMTokenSecretManagerStatePBImpl(
|
new AMRMTokenSecretManagerStatePBImpl(
|
||||||
AMRMTokenSecretManagerStateProto.parseFrom(data));
|
AMRMTokenSecretManagerStateProto.parseFrom(data));
|
||||||
|
@ -220,16 +230,18 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
List<ApplicationAttemptStateData> attempts =
|
List<ApplicationAttemptStateData> attempts =
|
||||||
new ArrayList<ApplicationAttemptStateData>();
|
new ArrayList<ApplicationAttemptStateData>();
|
||||||
|
|
||||||
for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
|
for (FileStatus appDir : listStatusWithRetries(rmAppRoot)) {
|
||||||
checkAndResumeUpdateOperation(appDir.getPath());
|
checkAndResumeUpdateOperation(appDir.getPath());
|
||||||
for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
|
for (FileStatus childNodeStatus :
|
||||||
|
listStatusWithRetries(appDir.getPath())) {
|
||||||
assert childNodeStatus.isFile();
|
assert childNodeStatus.isFile();
|
||||||
String childNodeName = childNodeStatus.getPath().getName();
|
String childNodeName = childNodeStatus.getPath().getName();
|
||||||
if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
|
if (checkAndRemovePartialRecordWithRetries(
|
||||||
|
childNodeStatus.getPath())) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
byte[] childData =
|
byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
|
||||||
readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
|
childNodeStatus.getLen());
|
||||||
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
||||||
// application
|
// application
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -292,7 +304,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
// If it does, the prior updateFile is failed on half way. We need to
|
// If it does, the prior updateFile is failed on half way. We need to
|
||||||
// complete replacing the old file first.
|
// complete replacing the old file first.
|
||||||
FileStatus[] newChildNodes =
|
FileStatus[] newChildNodes =
|
||||||
fs.listStatus(path, new PathFilter() {
|
listStatusWithRetries(path, new PathFilter() {
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(Path path) {
|
public boolean accept(Path path) {
|
||||||
return path.getName().endsWith(".new");
|
return path.getName().endsWith(".new");
|
||||||
|
@ -310,12 +322,12 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
|
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
|
||||||
checkAndResumeUpdateOperation(rmDTSecretManagerRoot);
|
checkAndResumeUpdateOperation(rmDTSecretManagerRoot);
|
||||||
FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
|
FileStatus[] childNodes = listStatusWithRetries(rmDTSecretManagerRoot);
|
||||||
|
|
||||||
for(FileStatus childNodeStatus : childNodes) {
|
for(FileStatus childNodeStatus : childNodes) {
|
||||||
assert childNodeStatus.isFile();
|
assert childNodeStatus.isFile();
|
||||||
String childNodeName = childNodeStatus.getPath().getName();
|
String childNodeName = childNodeStatus.getPath().getName();
|
||||||
if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
|
if (checkAndRemovePartialRecordWithRetries(childNodeStatus.getPath())) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
|
if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
|
||||||
|
@ -325,35 +337,36 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
|
Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
|
||||||
byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
|
byte[] childData = readFileWithRetries(childNodePath,
|
||||||
|
childNodeStatus.getLen());
|
||||||
ByteArrayInputStream is = new ByteArrayInputStream(childData);
|
ByteArrayInputStream is = new ByteArrayInputStream(childData);
|
||||||
DataInputStream fsIn = new DataInputStream(is);
|
try (DataInputStream fsIn = new DataInputStream(is)) {
|
||||||
if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){
|
if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
|
||||||
DelegationKey key = new DelegationKey();
|
DelegationKey key = new DelegationKey();
|
||||||
key.readFields(fsIn);
|
key.readFields(fsIn);
|
||||||
rmState.rmSecretManagerState.masterKeyState.add(key);
|
rmState.rmSecretManagerState.masterKeyState.add(key);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Loaded delegation key: keyId=" + key.getKeyId()
|
LOG.debug("Loaded delegation key: keyId=" + key.getKeyId()
|
||||||
+ ", expirationDate=" + key.getExpiryDate());
|
+ ", expirationDate=" + key.getExpiryDate());
|
||||||
}
|
}
|
||||||
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
|
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
|
||||||
RMDelegationTokenIdentifierData identifierData =
|
RMDelegationTokenIdentifierData identifierData =
|
||||||
new RMDelegationTokenIdentifierData();
|
new RMDelegationTokenIdentifierData();
|
||||||
identifierData.readFields(fsIn);
|
identifierData.readFields(fsIn);
|
||||||
RMDelegationTokenIdentifier identifier =
|
RMDelegationTokenIdentifier identifier =
|
||||||
identifierData.getTokenIdentifier();
|
identifierData.getTokenIdentifier();
|
||||||
long renewDate = identifierData.getRenewDate();
|
long renewDate = identifierData.getRenewDate();
|
||||||
|
|
||||||
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
|
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
|
||||||
renewDate);
|
renewDate);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
|
LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
|
||||||
+ " renewDate=" + renewDate);
|
+ " renewDate=" + renewDate);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
|
|
||||||
}
|
}
|
||||||
fsIn.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,7 +374,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateData appStateDataPB) throws Exception {
|
ApplicationStateData appStateDataPB) throws Exception {
|
||||||
Path appDirPath = getAppDir(rmAppRoot, appId);
|
Path appDirPath = getAppDir(rmAppRoot, appId);
|
||||||
fs.mkdirs(appDirPath);
|
mkdirsWithRetries(appDirPath);
|
||||||
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
|
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
|
||||||
|
|
||||||
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
|
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
|
||||||
|
@ -369,7 +382,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
try {
|
try {
|
||||||
// currently throw all exceptions. May need to respond differently for HA
|
// currently throw all exceptions. May need to respond differently for HA
|
||||||
// based on whether we have lost the right to write to FS
|
// based on whether we have lost the right to write to FS
|
||||||
writeFile(nodeCreatePath, appStateData);
|
writeFileWithRetries(nodeCreatePath, appStateData);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("Error storing info for app: " + appId, e);
|
LOG.info("Error storing info for app: " + appId, e);
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -408,7 +421,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
try {
|
try {
|
||||||
// currently throw all exceptions. May need to respond differently for HA
|
// currently throw all exceptions. May need to respond differently for HA
|
||||||
// based on whether we have lost the right to write to FS
|
// based on whether we have lost the right to write to FS
|
||||||
writeFile(nodeCreatePath, attemptStateData);
|
writeFileWithRetries(nodeCreatePath, attemptStateData);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("Error storing info for attempt: " + appAttemptId, e);
|
LOG.info("Error storing info for attempt: " + appAttemptId, e);
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -444,7 +457,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
appState.getApplicationSubmissionContext().getApplicationId();
|
appState.getApplicationSubmissionContext().getApplicationId();
|
||||||
Path nodeRemovePath = getAppDir(rmAppRoot, appId);
|
Path nodeRemovePath = getAppDir(rmAppRoot, appId);
|
||||||
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
|
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
|
||||||
deleteFile(nodeRemovePath);
|
deleteFileWithRetries(nodeRemovePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -460,7 +473,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
|
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
|
||||||
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
||||||
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
|
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
|
||||||
deleteFile(nodeCreatePath);
|
deleteFileWithRetries(nodeCreatePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -483,7 +496,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
updateFile(nodeCreatePath, identifierData.toByteArray());
|
updateFile(nodeCreatePath, identifierData.toByteArray());
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
|
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
|
||||||
writeFile(nodeCreatePath, identifierData.toByteArray());
|
writeFileWithRetries(nodeCreatePath, identifierData.toByteArray());
|
||||||
|
|
||||||
// store sequence number
|
// store sequence number
|
||||||
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
|
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
|
||||||
|
@ -492,11 +505,12 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
|
LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
|
||||||
+ identifier.getSequenceNumber());
|
+ identifier.getSequenceNumber());
|
||||||
if (dtSequenceNumberPath == null) {
|
if (dtSequenceNumberPath == null) {
|
||||||
if (!createFile(latestSequenceNumberPath)) {
|
if (!createFileWithRetries(latestSequenceNumberPath)) {
|
||||||
throw new Exception("Failed to create " + latestSequenceNumberPath);
|
throw new Exception("Failed to create " + latestSequenceNumberPath);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
|
if (!renameFileWithRetries(dtSequenceNumberPath,
|
||||||
|
latestSequenceNumberPath)) {
|
||||||
throw new Exception("Failed to rename " + dtSequenceNumberPath);
|
throw new Exception("Failed to rename " + dtSequenceNumberPath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -510,11 +524,11 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
|
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
|
||||||
DELEGATION_KEY_PREFIX + masterKey.getKeyId());
|
DELEGATION_KEY_PREFIX + masterKey.getKeyId());
|
||||||
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
||||||
DataOutputStream fsOut = new DataOutputStream(os);
|
try (DataOutputStream fsOut = new DataOutputStream(os)) {
|
||||||
LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
|
LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
|
||||||
masterKey.write(fsOut);
|
masterKey.write(fsOut);
|
||||||
writeFile(nodeCreatePath, os.toByteArray());
|
writeFileWithRetries(nodeCreatePath, os.toByteArray());
|
||||||
fsOut.close();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -523,13 +537,13 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
|
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
|
||||||
DELEGATION_KEY_PREFIX + masterKey.getKeyId());
|
DELEGATION_KEY_PREFIX + masterKey.getKeyId());
|
||||||
LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId());
|
LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId());
|
||||||
deleteFile(nodeCreatePath);
|
deleteFileWithRetries(nodeCreatePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void deleteStore() throws IOException {
|
public synchronized void deleteStore() throws Exception {
|
||||||
if (fs.exists(rootDirPath)) {
|
if (existsWithRetries(rootDirPath)) {
|
||||||
fs.delete(rootDirPath, true);
|
deleteFileWithRetries(rootDirPath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -539,6 +553,146 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
// FileSystem related code
|
// FileSystem related code
|
||||||
|
|
||||||
|
private boolean checkAndRemovePartialRecordWithRetries(final Path record)
|
||||||
|
throws Exception {
|
||||||
|
return new FSAction<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean run() throws Exception {
|
||||||
|
return checkAndRemovePartialRecord(record);
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mkdirsWithRetries(final Path appDirPath) throws Exception {
|
||||||
|
new FSAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
fs.mkdirs(appDirPath);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeFileWithRetries(final Path outputPath,final byte[] data)
|
||||||
|
throws Exception {
|
||||||
|
new FSAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
writeFile(outputPath, data);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteFileWithRetries(final Path deletePath) throws Exception {
|
||||||
|
new FSAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
deleteFile(deletePath);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean renameFileWithRetries(final Path src, final Path dst)
|
||||||
|
throws Exception {
|
||||||
|
return new FSAction<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean run() throws Exception {
|
||||||
|
return renameFile(src, dst);
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean createFileWithRetries(final Path newFile) throws Exception {
|
||||||
|
return new FSAction<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean run() throws Exception {
|
||||||
|
return createFile(newFile);
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileStatus getFileStatusWithRetries(final Path path)
|
||||||
|
throws Exception {
|
||||||
|
return new FSAction<FileStatus>() {
|
||||||
|
@Override
|
||||||
|
public FileStatus run() throws Exception {
|
||||||
|
return getFileStatus(path);
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean existsWithRetries(final Path path) throws Exception {
|
||||||
|
return new FSAction<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean run() throws Exception {
|
||||||
|
return fs.exists(path);
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] readFileWithRetries(final Path inputPath, final long len)
|
||||||
|
throws Exception {
|
||||||
|
return new FSAction<byte[]>() {
|
||||||
|
@Override
|
||||||
|
public byte[] run() throws Exception {
|
||||||
|
return readFile(inputPath, len);
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileStatus[] listStatusWithRetries(final Path path)
|
||||||
|
throws Exception {
|
||||||
|
return new FSAction<FileStatus[]>() {
|
||||||
|
@Override
|
||||||
|
public FileStatus[] run() throws Exception {
|
||||||
|
return fs.listStatus(path);
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileStatus[] listStatusWithRetries(final Path path,
|
||||||
|
final PathFilter filter) throws Exception {
|
||||||
|
return new FSAction<FileStatus[]>() {
|
||||||
|
@Override
|
||||||
|
public FileStatus[] run() throws Exception {
|
||||||
|
return fs.listStatus(path, filter);
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closeWithRetries() throws Exception {
|
||||||
|
new FSAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
fs.close();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private abstract class FSAction<T> {
|
||||||
|
abstract T run() throws Exception;
|
||||||
|
|
||||||
|
T runWithRetries() throws Exception {
|
||||||
|
int retry = 0;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return run();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("Exception while executing a FS operation.", e);
|
||||||
|
if (++retry > fsNumRetries) {
|
||||||
|
LOG.info("Maxed out FS retries. Giving up!");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
LOG.info("Retrying operation on FS. Retry no. " + retry);
|
||||||
|
Thread.sleep(fsRetryInterval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void deleteFile(Path deletePath) throws Exception {
|
private void deleteFile(Path deletePath) throws Exception {
|
||||||
if(!fs.delete(deletePath, true)) {
|
if(!fs.delete(deletePath, true)) {
|
||||||
throw new Exception("Failed to delete " + deletePath);
|
throw new Exception("Failed to delete " + deletePath);
|
||||||
|
@ -595,18 +749,18 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
*/
|
*/
|
||||||
protected void updateFile(Path outputPath, byte[] data) throws Exception {
|
protected void updateFile(Path outputPath, byte[] data) throws Exception {
|
||||||
Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
|
Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
|
||||||
// use writeFile to make sure .new file is created atomically
|
// use writeFileWithRetries to make sure .new file is created atomically
|
||||||
writeFile(newPath, data);
|
writeFileWithRetries(newPath, data);
|
||||||
replaceFile(newPath, outputPath);
|
replaceFile(newPath, outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void replaceFile(Path srcPath, Path dstPath) throws Exception {
|
protected void replaceFile(Path srcPath, Path dstPath) throws Exception {
|
||||||
if (fs.exists(dstPath)) {
|
if (existsWithRetries(dstPath)) {
|
||||||
deleteFile(dstPath);
|
deleteFileWithRetries(dstPath);
|
||||||
} else {
|
} else {
|
||||||
LOG.info("File doesn't exist. Skip deleting the file " + dstPath);
|
LOG.info("File doesn't exist. Skip deleting the file " + dstPath);
|
||||||
}
|
}
|
||||||
fs.rename(srcPath, dstPath);
|
renameFileWithRetries(srcPath, dstPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -637,8 +791,17 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
if (isUpdate) {
|
if (isUpdate) {
|
||||||
updateFile(nodeCreatePath, stateData);
|
updateFile(nodeCreatePath, stateData);
|
||||||
} else {
|
} else {
|
||||||
writeFile(nodeCreatePath, stateData);
|
writeFileWithRetries(nodeCreatePath, stateData);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getNumRetries() {
|
||||||
|
return fsNumRetries;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getRetryInterval() {
|
||||||
|
return fsRetryInterval;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,7 +100,12 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
workingDirPathURI.toString());
|
workingDirPathURI.toString());
|
||||||
conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
|
conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
|
||||||
"100,6000");
|
"100,6000");
|
||||||
|
conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 5);
|
||||||
|
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
|
||||||
|
900L);
|
||||||
this.store = new TestFileSystemRMStore(conf);
|
this.store = new TestFileSystemRMStore(conf);
|
||||||
|
Assert.assertEquals(store.getNumRetries(), 5);
|
||||||
|
Assert.assertEquals(store.getRetryInterval(), 900L);
|
||||||
return store;
|
return store;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue