YARN-2996. Improved synchronization and I/O operations of FS- and Mem- RMStateStore. Contributed by Yi Liu.
(cherry picked from commit dc2eaa26b2
)
This commit is contained in:
parent
132fd6ba58
commit
d2fbba790a
|
@ -137,6 +137,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-2880. Added a test to make sure node labels will be recovered
|
||||
if RM restart is enabled. (Rohith Sharmaks via jianhe)
|
||||
|
||||
YARN-2996. Improved synchronization and I/O operations of FS- and Mem-
|
||||
RMStateStore. (Yi Liu via zjshen)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -139,8 +139,8 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
@Override
|
||||
protected synchronized Version loadVersion() throws Exception {
|
||||
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
||||
if (fs.exists(versionNodePath)) {
|
||||
FileStatus status = fs.getFileStatus(versionNodePath);
|
||||
FileStatus status = getFileStatus(versionNodePath);
|
||||
if (status != null) {
|
||||
byte[] data = readFile(versionNodePath, status.getLen());
|
||||
Version version =
|
||||
new VersionPBImpl(VersionProto.parseFrom(data));
|
||||
|
@ -165,9 +165,9 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
public synchronized long getAndIncrementEpoch() throws Exception {
|
||||
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
|
||||
long currentEpoch = 0;
|
||||
if (fs.exists(epochNodePath)) {
|
||||
FileStatus status = getFileStatus(epochNodePath);
|
||||
if (status != null) {
|
||||
// load current epoch
|
||||
FileStatus status = fs.getFileStatus(epochNodePath);
|
||||
byte[] data = readFile(epochNodePath, status.getLen());
|
||||
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
|
||||
currentEpoch = epoch.getEpoch();
|
||||
|
@ -201,13 +201,11 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
|
||||
Path amrmTokenSecretManagerStateDataDir =
|
||||
new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
|
||||
FileStatus status;
|
||||
try {
|
||||
status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir);
|
||||
assert status.isFile();
|
||||
} catch (FileNotFoundException ex) {
|
||||
FileStatus status = getFileStatus(amrmTokenSecretManagerStateDataDir);
|
||||
if (status == null) {
|
||||
return;
|
||||
}
|
||||
assert status.isFile();
|
||||
byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
|
||||
AMRMTokenSecretManagerStatePBImpl stateData =
|
||||
new AMRMTokenSecretManagerStatePBImpl(
|
||||
|
@ -466,7 +464,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void updateRMDelegationTokenState(
|
||||
protected synchronized void updateRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception {
|
||||
storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate, true);
|
||||
|
@ -560,6 +558,14 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
}
|
||||
}
|
||||
|
||||
private FileStatus getFileStatus(Path path) throws Exception {
|
||||
try {
|
||||
return fs.getFileStatus(path);
|
||||
} catch (FileNotFoundException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* In order to make this write atomic as a part of write we will first write
|
||||
* data to .tmp file and then rename it. Here we are assuming that rename is
|
||||
|
|
|
@ -91,15 +91,16 @@ public class MemoryRMStateStore extends RMStateStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void storeApplicationStateInternal(
|
||||
public synchronized void storeApplicationStateInternal(
|
||||
ApplicationId appId, ApplicationStateData appState)
|
||||
throws Exception {
|
||||
state.appState.put(appId, appState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateApplicationStateInternal(ApplicationId appId,
|
||||
ApplicationStateData appState) throws Exception {
|
||||
public synchronized void updateApplicationStateInternal(
|
||||
ApplicationId appId, ApplicationStateData appState)
|
||||
throws Exception {
|
||||
LOG.info("Updating final state " + appState.getState() + " for app: "
|
||||
+ appId);
|
||||
if (state.appState.get(appId) != null) {
|
||||
|
@ -186,7 +187,7 @@ public class MemoryRMStateStore extends RMStateStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void updateRMDelegationTokenState(
|
||||
protected synchronized void updateRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception {
|
||||
removeRMDelegationTokenState(rmDTIdentifier);
|
||||
|
@ -237,7 +238,7 @@ public class MemoryRMStateStore extends RMStateStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void storeOrUpdateAMRMTokenSecretManagerState(
|
||||
public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
|
||||
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
|
||||
boolean isUpdate) {
|
||||
if (amrmTokenSecretManagerState != null) {
|
||||
|
|
Loading…
Reference in New Issue