YARN-1378. Implemented a cleaner of old finished applications from the RM state-store. Contributed by Jian He.
svn merge --ignore-ancestry -c 1548990 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1548991 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6026982bcb
commit
82f1335e36
@ -130,6 +130,9 @@ Release 2.4.0 - UNRELEASED
|
|||||||
YARN-807. When querying apps by queue, iterating over all apps is
|
YARN-807. When querying apps by queue, iterating over all apps is
|
||||||
inefficient and limiting (Sandy Ryza)
|
inefficient and limiting (Sandy Ryza)
|
||||||
|
|
||||||
|
YARN-1378. Implemented a cleaner of old finished applications from the RM
|
||||||
|
state-store. (Jian He via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -343,6 +343,15 @@ public class YarnConfiguration extends Configuration {
|
|||||||
RM_PREFIX + "max-completed-applications";
|
RM_PREFIX + "max-completed-applications";
|
||||||
public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
|
public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of completed applications RM state store keeps, by
|
||||||
|
* default equals to DEFAULT_RM_MAX_COMPLETED_APPLICATIONS
|
||||||
|
*/
|
||||||
|
public static final String RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
|
||||||
|
RM_PREFIX + "state-store.max-completed-applications";
|
||||||
|
public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
|
||||||
|
DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
|
||||||
|
|
||||||
/** Default application name */
|
/** Default application name */
|
||||||
public static final String DEFAULT_APPLICATION_NAME = "N/A";
|
public static final String DEFAULT_APPLICATION_NAME = "N/A";
|
||||||
|
|
||||||
|
@ -275,6 +275,21 @@
|
|||||||
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
|
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The maximum number of completed applications RM state
|
||||||
|
store keeps, less than or equals to ${yarn.resourcemanager.max-completed-applications}.
|
||||||
|
By default, it equals to ${yarn.resourcemanager.max-completed-applications}.
|
||||||
|
This ensures that the applications kept in the state store are consistent with
|
||||||
|
the applications remembered in RM memory.
|
||||||
|
Any values larger than ${yarn.resourcemanager.max-completed-applications} will
|
||||||
|
be reset to ${yarn.resourcemanager.max-completed-applications}.
|
||||||
|
Note that this value impacts the RM recovery performance.Typically,
|
||||||
|
a smaller value indicates better performance on RM recovery.
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.state-store.max-completed-applications</name>
|
||||||
|
<value>${yarn.resourcemanager.max-completed-applications}</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Host:Port of the ZooKeeper server where RM state will
|
<description>Host:Port of the ZooKeeper server where RM state will
|
||||||
be stored. This must be supplied when using
|
be stored. This must be supplied when using
|
||||||
|
@ -65,7 +65,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
|
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
|
||||||
|
|
||||||
private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
|
private int maxCompletedAppsInMemory;
|
||||||
|
private int maxCompletedAppsInStateStore;
|
||||||
|
protected int completedAppsInStateStore = 0;
|
||||||
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
|
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
|
||||||
|
|
||||||
private final RMContext rmContext;
|
private final RMContext rmContext;
|
||||||
@ -82,9 +84,16 @@ public RMAppManager(RMContext context,
|
|||||||
this.masterService = masterService;
|
this.masterService = masterService;
|
||||||
this.applicationACLsManager = applicationACLsManager;
|
this.applicationACLsManager = applicationACLsManager;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
setCompletedAppsMax(conf.getInt(
|
this.maxCompletedAppsInMemory = conf.getInt(
|
||||||
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
|
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
|
||||||
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
|
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
|
||||||
|
this.maxCompletedAppsInStateStore =
|
||||||
|
conf.getInt(
|
||||||
|
YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS);
|
||||||
|
if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {
|
||||||
|
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -173,10 +182,6 @@ public void logApplicationSummary(ApplicationId appId) {
|
|||||||
ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId));
|
ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void setCompletedAppsMax(int max) {
|
|
||||||
this.completedAppsMax = max;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected synchronized int getCompletedAppsListSize() {
|
protected synchronized int getCompletedAppsListSize() {
|
||||||
return this.completedApps.size();
|
return this.completedApps.size();
|
||||||
}
|
}
|
||||||
@ -191,6 +196,7 @@ protected synchronized void finishApplication(ApplicationId applicationId) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
completedApps.add(applicationId);
|
completedApps.add(applicationId);
|
||||||
|
completedAppsInStateStore++;
|
||||||
writeAuditLog(applicationId);
|
writeAuditLog(applicationId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -229,10 +235,26 @@ protected void writeAuditLog(ApplicationId appId) {
|
|||||||
* check to see if hit the limit for max # completed apps kept
|
* check to see if hit the limit for max # completed apps kept
|
||||||
*/
|
*/
|
||||||
protected synchronized void checkAppNumCompletedLimit() {
|
protected synchronized void checkAppNumCompletedLimit() {
|
||||||
while (completedApps.size() > this.completedAppsMax) {
|
// check apps kept in state store.
|
||||||
|
while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
|
||||||
|
ApplicationId removeId =
|
||||||
|
completedApps.get(completedApps.size() - completedAppsInStateStore);
|
||||||
|
RMApp removeApp = rmContext.getRMApps().get(removeId);
|
||||||
|
LOG.info("Max number of completed apps kept in state store met:"
|
||||||
|
+ " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
|
||||||
|
+ ", removing app " + removeApp.getApplicationId()
|
||||||
|
+ " from state store.");
|
||||||
|
rmContext.getStateStore().removeApplication(removeApp);
|
||||||
|
completedAppsInStateStore--;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check apps kept in memorty.
|
||||||
|
while (completedApps.size() > this.maxCompletedAppsInMemory) {
|
||||||
ApplicationId removeId = completedApps.remove();
|
ApplicationId removeId = completedApps.remove();
|
||||||
LOG.info("Application should be expired, max # apps"
|
LOG.info("Application should be expired, max number of completed apps"
|
||||||
+ " met. Removing app: " + removeId);
|
+ " kept in memory met: maxCompletedAppsInMemory = "
|
||||||
|
+ this.maxCompletedAppsInMemory + ", removing app " + removeId
|
||||||
|
+ " from memory: ");
|
||||||
rmContext.getRMApps().remove(removeId);
|
rmContext.getRMApps().remove(removeId);
|
||||||
this.applicationACLsManager.removeApplication(removeId);
|
this.applicationACLsManager.removeApplication(removeId);
|
||||||
}
|
}
|
||||||
@ -380,8 +402,6 @@ public void recover(RMState state) throws Exception {
|
|||||||
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
|
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
|
||||||
LOG.info("Recovering " + appStates.size() + " applications");
|
LOG.info("Recovering " + appStates.size() + " applications");
|
||||||
for (ApplicationState appState : appStates.values()) {
|
for (ApplicationState appState : appStates.values()) {
|
||||||
LOG.info("Recovering application " + appState.getAppId());
|
|
||||||
|
|
||||||
submitApplication(appState.getApplicationSubmissionContext(),
|
submitApplication(appState.getApplicationSubmissionContext(),
|
||||||
appState.getSubmitTime(), appState.getUser(), true, state);
|
appState.getSubmitTime(), appState.getUser(), true, state);
|
||||||
}
|
}
|
||||||
|
@ -167,7 +167,9 @@ private void loadRMAppState(RMState rmState) throws Exception {
|
|||||||
readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
|
readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
|
||||||
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
||||||
// application
|
// application
|
||||||
LOG.info("Loading application from node: " + childNodeName);
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Loading application from node: " + childNodeName);
|
||||||
|
}
|
||||||
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
|
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
|
||||||
ApplicationStateDataPBImpl appStateData =
|
ApplicationStateDataPBImpl appStateData =
|
||||||
new ApplicationStateDataPBImpl(
|
new ApplicationStateDataPBImpl(
|
||||||
@ -185,7 +187,10 @@ private void loadRMAppState(RMState rmState) throws Exception {
|
|||||||
} else if (childNodeName
|
} else if (childNodeName
|
||||||
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
||||||
// attempt
|
// attempt
|
||||||
LOG.info("Loading application attempt from node: " + childNodeName);
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Loading application attempt from node: "
|
||||||
|
+ childNodeName);
|
||||||
|
}
|
||||||
ApplicationAttemptId attemptId =
|
ApplicationAttemptId attemptId =
|
||||||
ConverterUtils.toApplicationAttemptId(childNodeName);
|
ConverterUtils.toApplicationAttemptId(childNodeName);
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData =
|
ApplicationAttemptStateDataPBImpl attemptStateData =
|
||||||
@ -225,6 +230,7 @@ private void loadRMAppState(RMState rmState) throws Exception {
|
|||||||
assert appState != null;
|
assert appState != null;
|
||||||
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
||||||
}
|
}
|
||||||
|
LOG.info("Done Loading applications from FS state store");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Failed to load state.", e);
|
LOG.error("Failed to load state.", e);
|
||||||
throw e;
|
throw e;
|
||||||
@ -362,7 +368,7 @@ public synchronized void updateApplicationAttemptStateInternal(
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void removeApplicationState(ApplicationState appState)
|
public synchronized void removeApplicationStateInternal(ApplicationState appState)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
String appId = appState.getAppId().toString();
|
String appId = appState.getAppId().toString();
|
||||||
Path nodeRemovePath = getAppDir(rmAppRoot, appId);
|
Path nodeRemovePath = getAppDir(rmAppRoot, appId);
|
||||||
|
@ -171,8 +171,8 @@ public synchronized void updateApplicationAttemptStateInternal(
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void removeApplicationState(ApplicationState appState)
|
public synchronized void removeApplicationStateInternal(
|
||||||
throws Exception {
|
ApplicationState appState) throws Exception {
|
||||||
ApplicationId appId = appState.getAppId();
|
ApplicationId appId = appState.getAppId();
|
||||||
ApplicationState removed = state.appState.remove(appId);
|
ApplicationState removed = state.appState.remove(appId);
|
||||||
if (removed == null) {
|
if (removed == null) {
|
||||||
|
@ -63,7 +63,7 @@ protected void storeApplicationAttemptStateInternal(String attemptId,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void removeApplicationState(ApplicationState appState)
|
protected void removeApplicationStateInternal(ApplicationState appState)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
@ -519,6 +518,7 @@ protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
|
|||||||
* This does not block the dispatcher threads
|
* This does not block the dispatcher threads
|
||||||
* There is no notification of completion for this operation.
|
* There is no notification of completion for this operation.
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public synchronized void removeApplication(RMApp app) {
|
public synchronized void removeApplication(RMApp app) {
|
||||||
ApplicationState appState = new ApplicationState(
|
ApplicationState appState = new ApplicationState(
|
||||||
app.getSubmitTime(), app.getStartTime(),
|
app.getSubmitTime(), app.getStartTime(),
|
||||||
@ -532,14 +532,6 @@ public synchronized void removeApplication(RMApp app) {
|
|||||||
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
||||||
}
|
}
|
||||||
|
|
||||||
removeApplication(appState);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
/**
|
|
||||||
* Non-Blocking API
|
|
||||||
*/
|
|
||||||
public synchronized void removeApplication(ApplicationState appState) {
|
|
||||||
dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
|
dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -548,8 +540,8 @@ public synchronized void removeApplication(ApplicationState appState) {
|
|||||||
* Derived classes must implement this method to remove the state of an
|
* Derived classes must implement this method to remove the state of an
|
||||||
* application and its attempts
|
* application and its attempts
|
||||||
*/
|
*/
|
||||||
protected abstract void removeApplicationState(ApplicationState appState)
|
protected abstract void removeApplicationStateInternal(
|
||||||
throws Exception;
|
ApplicationState appState) throws Exception;
|
||||||
|
|
||||||
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
|
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
|
||||||
// YARN-986
|
// YARN-986
|
||||||
@ -666,11 +658,9 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
|
|||||||
ApplicationState appState =
|
ApplicationState appState =
|
||||||
((RMStateStoreRemoveAppEvent) event).getAppState();
|
((RMStateStoreRemoveAppEvent) event).getAppState();
|
||||||
ApplicationId appId = appState.getAppId();
|
ApplicationId appId = appState.getAppId();
|
||||||
Exception removedException = null;
|
|
||||||
LOG.info("Removing info for app: " + appId);
|
LOG.info("Removing info for app: " + appId);
|
||||||
try {
|
try {
|
||||||
removeApplicationState(appState);
|
removeApplicationStateInternal(appState);
|
||||||
notifyDoneRemovingApplcation(appId, removedException);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error removing app: " + appId, e);
|
LOG.error("Error removing app: " + appId, e);
|
||||||
notifyStoreOperationFailed(e);
|
notifyStoreOperationFailed(e);
|
||||||
@ -738,17 +728,6 @@ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId
|
|||||||
new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
|
new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
/**
|
|
||||||
* This is to notify RMApp that this application has been removed from
|
|
||||||
* RMStateStore
|
|
||||||
*/
|
|
||||||
private void notifyDoneRemovingApplcation(ApplicationId appId,
|
|
||||||
Exception removedException) {
|
|
||||||
rmDispatcher.getEventHandler().handle(
|
|
||||||
new RMAppRemovedEvent(appId, removedException));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* EventHandler implementation which forward events to the FSRMStateStore
|
* EventHandler implementation which forward events to the FSRMStateStore
|
||||||
* This hides the EventHandle methods of the store from its public interface
|
* This hides the EventHandle methods of the store from its public interface
|
||||||
|
@ -392,7 +392,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
|
|||||||
byte[] childData = getDataWithRetries(childNodePath, true);
|
byte[] childData = getDataWithRetries(childNodePath, true);
|
||||||
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
||||||
// application
|
// application
|
||||||
LOG.info("Loading application from znode: " + childNodeName);
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Loading application from znode: " + childNodeName);
|
||||||
|
}
|
||||||
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
|
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
|
||||||
ApplicationStateDataPBImpl appStateData =
|
ApplicationStateDataPBImpl appStateData =
|
||||||
new ApplicationStateDataPBImpl(
|
new ApplicationStateDataPBImpl(
|
||||||
@ -412,7 +414,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
|
|||||||
} else if (childNodeName
|
} else if (childNodeName
|
||||||
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
||||||
// attempt
|
// attempt
|
||||||
LOG.info("Loading application attempt from znode: " + childNodeName);
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Loading application attempt from znode: " + childNodeName);
|
||||||
|
}
|
||||||
ApplicationAttemptId attemptId =
|
ApplicationAttemptId attemptId =
|
||||||
ConverterUtils.toApplicationAttemptId(childNodeName);
|
ConverterUtils.toApplicationAttemptId(childNodeName);
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData =
|
ApplicationAttemptStateDataPBImpl attemptStateData =
|
||||||
@ -456,10 +460,10 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
|
|||||||
LOG.info("Application node not found for attempt: "
|
LOG.info("Application node not found for attempt: "
|
||||||
+ attemptState.getAttemptId());
|
+ attemptState.getAttemptId());
|
||||||
deleteWithRetries(
|
deleteWithRetries(
|
||||||
getNodePath(rmAppRoot, attemptState.getAttemptId().toString()),
|
getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1);
|
||||||
0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.info("Done Loading applications from ZK state store");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -517,16 +521,16 @@ public synchronized void updateApplicationAttemptStateInternal(
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void removeApplicationState(ApplicationState appState)
|
public synchronized void removeApplicationStateInternal(ApplicationState appState)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
String appId = appState.getAppId().toString();
|
String appId = appState.getAppId().toString();
|
||||||
String nodeRemovePath = getNodePath(rmAppRoot, appId);
|
String nodeRemovePath = getNodePath(rmAppRoot, appId);
|
||||||
ArrayList<Op> opList = new ArrayList<Op>();
|
ArrayList<Op> opList = new ArrayList<Op>();
|
||||||
opList.add(Op.delete(nodeRemovePath, 0));
|
opList.add(Op.delete(nodeRemovePath, -1));
|
||||||
|
|
||||||
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
|
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
|
||||||
String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
|
String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
|
||||||
opList.add(Op.delete(attemptRemovePath, 0));
|
opList.add(Op.delete(attemptRemovePath, -1));
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
|
LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
|
||||||
@ -569,7 +573,7 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (dtSequenceNumberPath != null) {
|
if (dtSequenceNumberPath != null) {
|
||||||
opList.add(Op.delete(dtSequenceNumberPath, 0));
|
opList.add(Op.delete(dtSequenceNumberPath, -1));
|
||||||
}
|
}
|
||||||
opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
|
opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
|
||||||
CreateMode.PERSISTENT));
|
CreateMode.PERSISTENT));
|
||||||
@ -587,7 +591,7 @@ protected synchronized void removeRMDelegationTokenState(
|
|||||||
LOG.debug("Removing RMDelegationToken_"
|
LOG.debug("Removing RMDelegationToken_"
|
||||||
+ rmDTIdentifier.getSequenceNumber());
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
}
|
}
|
||||||
deleteWithRetries(nodeRemovePath, 0);
|
deleteWithRetries(nodeRemovePath, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -619,7 +623,7 @@ protected synchronized void removeRMDTMasterKeyState(
|
|||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
||||||
}
|
}
|
||||||
deleteWithRetries(nodeRemovePath, 0);
|
deleteWithRetries(nodeRemovePath, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ZK related code
|
// ZK related code
|
||||||
|
@ -663,15 +663,9 @@ private static final class RMAppRecoveredTransition implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||||
|
|
||||||
if (app.attempts.isEmpty()) {
|
|
||||||
// Saved application was not running any attempts.
|
|
||||||
app.createNewAttempt(true);
|
|
||||||
return RMAppState.SUBMITTED;
|
|
||||||
} else {
|
|
||||||
/*
|
/*
|
||||||
* If last attempt recovered final state is null .. it means attempt
|
* If last attempt recovered final state is null .. it means attempt was
|
||||||
* was started but AM container may or may not have started / finished.
|
* started but AM container may or may not have started / finished.
|
||||||
* Therefore we should wait for it to finish.
|
* Therefore we should wait for it to finish.
|
||||||
*/
|
*/
|
||||||
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
|
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
|
||||||
@ -679,15 +673,23 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
new RMAppAttemptEvent(attempt.getAppAttemptId(),
|
new RMAppAttemptEvent(attempt.getAppAttemptId(),
|
||||||
RMAppAttemptEventType.RECOVER));
|
RMAppAttemptEventType.RECOVER));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The app has completed.
|
||||||
if (app.recoveredFinalState != null) {
|
if (app.recoveredFinalState != null) {
|
||||||
FINAL_TRANSITION.transition(app, event);
|
FINAL_TRANSITION.transition(app, event);
|
||||||
return app.recoveredFinalState;
|
return app.recoveredFinalState;
|
||||||
} else {
|
}
|
||||||
|
|
||||||
|
// No existent attempts means the attempt associated with this app was not
|
||||||
|
// started or started but not yet saved。
|
||||||
|
if (app.attempts.isEmpty()) {
|
||||||
|
app.createNewAttempt(true);
|
||||||
|
return RMAppState.SUBMITTED;
|
||||||
|
}
|
||||||
|
|
||||||
return RMAppState.RUNNING;
|
return RMAppState.RUNNING;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final class StartAppAttemptTransition extends RMAppTransition {
|
private static final class StartAppAttemptTransition extends RMAppTransition {
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,36 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
|
|
||||||
public class RMAppRemovedEvent extends RMAppEvent {
|
|
||||||
|
|
||||||
private final Exception removedException;
|
|
||||||
|
|
||||||
public RMAppRemovedEvent(ApplicationId appId, Exception removedException) {
|
|
||||||
super(appId, RMAppEventType.APP_REMOVED);
|
|
||||||
this.removedException = removedException;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Exception getRemovedException() {
|
|
||||||
return removedException;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -76,14 +76,13 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
@ -675,9 +674,8 @@ public void recover(RMState state) throws Exception {
|
|||||||
ApplicationAttemptState attemptState =
|
ApplicationAttemptState attemptState =
|
||||||
appState.getAttempt(getAppAttemptId());
|
appState.getAttempt(getAppAttemptId());
|
||||||
assert attemptState != null;
|
assert attemptState != null;
|
||||||
LOG.info("Recovered attempt: AppId: "
|
LOG.info("Recovering attempt: " + getAppAttemptId() + " with final state: "
|
||||||
+ getAppAttemptId().getApplicationId() + " AttemptId: "
|
+ attemptState.getState());
|
||||||
+ getAppAttemptId() + " MasterContainer: " + masterContainer);
|
|
||||||
diagnostics.append("Attempt recovered after RM restart");
|
diagnostics.append("Attempt recovered after RM restart");
|
||||||
diagnostics.append(attemptState.getDiagnostics());
|
diagnostics.append(attemptState.getDiagnostics());
|
||||||
setMasterContainer(attemptState.getMasterContainer());
|
setMasterContainer(attemptState.getMasterContainer());
|
||||||
@ -856,8 +854,6 @@ private static class AttemptRecoveredTransition
|
|||||||
@Override
|
@Override
|
||||||
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||||
RMAppAttemptEvent event) {
|
RMAppAttemptEvent event) {
|
||||||
LOG.info("Recovering attempt : recoverdFinalState :"
|
|
||||||
+ appAttempt.recoveredFinalState);
|
|
||||||
if (appAttempt.recoveredFinalState != null) {
|
if (appAttempt.recoveredFinalState != null) {
|
||||||
appAttempt.progress = 1.0f;
|
appAttempt.progress = 1.0f;
|
||||||
RMApp rmApp =appAttempt.rmContext.getRMApps().get(
|
RMApp rmApp =appAttempt.rmContext.getRMApps().get(
|
||||||
|
@ -19,8 +19,12 @@
|
|||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -43,6 +47,7 @@
|
|||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
@ -99,7 +104,7 @@ public static RMContext mockRMContext(int n, long time) {
|
|||||||
rmDispatcher);
|
rmDispatcher);
|
||||||
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
|
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
|
||||||
rmDispatcher);
|
rmDispatcher);
|
||||||
return new RMContextImpl(rmDispatcher,
|
RMContext context = new RMContextImpl(rmDispatcher,
|
||||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||||
null, null, null, null, null) {
|
null, null, null, null, null) {
|
||||||
@Override
|
@Override
|
||||||
@ -107,6 +112,8 @@ public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
|||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
|
||||||
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
public class TestAppManagerDispatcher implements
|
public class TestAppManagerDispatcher implements
|
||||||
@ -142,7 +149,6 @@ public class TestRMAppManager extends RMAppManager {
|
|||||||
|
|
||||||
public TestRMAppManager(RMContext context, Configuration conf) {
|
public TestRMAppManager(RMContext context, Configuration conf) {
|
||||||
super(context, null, null, new ApplicationACLsManager(conf), conf);
|
super(context, null, null, new ApplicationACLsManager(conf), conf);
|
||||||
setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestRMAppManager(RMContext context,
|
public TestRMAppManager(RMContext context,
|
||||||
@ -150,7 +156,6 @@ public TestRMAppManager(RMContext context,
|
|||||||
YarnScheduler scheduler, ApplicationMasterService masterService,
|
YarnScheduler scheduler, ApplicationMasterService masterService,
|
||||||
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
||||||
super(context, scheduler, masterService, applicationACLsManager, conf);
|
super(context, scheduler, masterService, applicationACLsManager, conf);
|
||||||
setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void checkAppNumCompletedLimit() {
|
public void checkAppNumCompletedLimit() {
|
||||||
@ -164,9 +169,8 @@ public void finishApplication(ApplicationId appId) {
|
|||||||
public int getCompletedAppsListSize() {
|
public int getCompletedAppsListSize() {
|
||||||
return super.getCompletedAppsListSize();
|
return super.getCompletedAppsListSize();
|
||||||
}
|
}
|
||||||
|
public int getCompletedAppsInStateStore() {
|
||||||
public void setCompletedAppsMax(int max) {
|
return this.completedAppsInStateStore;
|
||||||
super.setCompletedAppsMax(max);
|
|
||||||
}
|
}
|
||||||
public void submitApplication(
|
public void submitApplication(
|
||||||
ApplicationSubmissionContext submissionContext, String user)
|
ApplicationSubmissionContext submissionContext, String user)
|
||||||
@ -227,9 +231,9 @@ public void testRMAppRetireNone() throws Exception {
|
|||||||
// Create such that none of the applications will retire since
|
// Create such that none of the applications will retire since
|
||||||
// haven't hit max #
|
// haven't hit max #
|
||||||
RMContext rmContext = mockRMContext(10, now - 10);
|
RMContext rmContext = mockRMContext(10, now - 10);
|
||||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 10);
|
||||||
appMonitor.setCompletedAppsMax(10);
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,conf);
|
||||||
|
|
||||||
Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit",
|
Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit",
|
||||||
10, rmContext.getRMApps().size());
|
10, rmContext.getRMApps().size());
|
||||||
@ -243,6 +247,8 @@ public void testRMAppRetireNone() throws Exception {
|
|||||||
rmContext.getRMApps().size());
|
rmContext.getRMApps().size());
|
||||||
Assert.assertEquals("Number of completed apps incorrect after check", 10,
|
Assert.assertEquals("Number of completed apps incorrect after check", 10,
|
||||||
appMonitor.getCompletedAppsListSize());
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
verify(rmContext.getStateStore(), never()).removeApplication(
|
||||||
|
isA(RMApp.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -250,9 +256,10 @@ public void testRMAppRetireSome() throws Exception {
|
|||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
RMContext rmContext = mockRMContext(10, now - 20000);
|
RMContext rmContext = mockRMContext(10, now - 20000);
|
||||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 3);
|
||||||
appMonitor.setCompletedAppsMax(3);
|
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 3);
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
|
||||||
|
|
||||||
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
|
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
|
||||||
.getRMApps().size());
|
.getRMApps().size());
|
||||||
@ -266,6 +273,8 @@ public void testRMAppRetireSome() throws Exception {
|
|||||||
rmContext.getRMApps().size());
|
rmContext.getRMApps().size());
|
||||||
Assert.assertEquals("Number of completed apps incorrect after check", 3,
|
Assert.assertEquals("Number of completed apps incorrect after check", 3,
|
||||||
appMonitor.getCompletedAppsListSize());
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
verify(rmContext.getStateStore(), times(7)).removeApplication(
|
||||||
|
isA(RMApp.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -274,14 +283,17 @@ public void testRMAppRetireSomeDifferentStates() throws Exception {
|
|||||||
|
|
||||||
// these parameters don't matter, override applications below
|
// these parameters don't matter, override applications below
|
||||||
RMContext rmContext = mockRMContext(10, now - 20000);
|
RMContext rmContext = mockRMContext(10, now - 20000);
|
||||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 2);
|
||||||
|
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
|
||||||
|
|
||||||
appMonitor.setCompletedAppsMax(2);
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
|
||||||
|
|
||||||
// clear out applications map
|
// clear out applications map
|
||||||
rmContext.getRMApps().clear();
|
rmContext.getRMApps().clear();
|
||||||
Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size());
|
Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size());
|
||||||
|
|
||||||
|
// 6 applications are in final state, 4 are not in final state.
|
||||||
// / set with various finished states
|
// / set with various finished states
|
||||||
RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED);
|
RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED);
|
||||||
rmContext.getRMApps().put(app.getApplicationId(), app);
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
@ -318,7 +330,9 @@ public void testRMAppRetireSomeDifferentStates() throws Exception {
|
|||||||
rmContext.getRMApps().size());
|
rmContext.getRMApps().size());
|
||||||
Assert.assertEquals("Number of completed apps incorrect after check", 2,
|
Assert.assertEquals("Number of completed apps incorrect after check", 2,
|
||||||
appMonitor.getCompletedAppsListSize());
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
// 6 applications in final state, 4 of them are removed
|
||||||
|
verify(rmContext.getStateStore(), times(4)).removeApplication(
|
||||||
|
isA(RMApp.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -342,14 +356,13 @@ public void testRMAppRetireZeroSetting() throws Exception {
|
|||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
RMContext rmContext = mockRMContext(10, now - 20000);
|
RMContext rmContext = mockRMContext(10, now - 20000);
|
||||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 0);
|
||||||
|
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 0);
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
|
||||||
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
|
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
|
||||||
.getRMApps().size());
|
.getRMApps().size());
|
||||||
|
|
||||||
// test with 0
|
|
||||||
appMonitor.setCompletedAppsMax(0);
|
|
||||||
|
|
||||||
addToCompletedApps(appMonitor, rmContext);
|
addToCompletedApps(appMonitor, rmContext);
|
||||||
Assert.assertEquals("Number of completed apps incorrect", 10,
|
Assert.assertEquals("Number of completed apps incorrect", 10,
|
||||||
appMonitor.getCompletedAppsListSize());
|
appMonitor.getCompletedAppsListSize());
|
||||||
@ -360,6 +373,64 @@ public void testRMAppRetireZeroSetting() throws Exception {
|
|||||||
rmContext.getRMApps().size());
|
rmContext.getRMApps().size());
|
||||||
Assert.assertEquals("Number of completed apps incorrect after check", 0,
|
Assert.assertEquals("Number of completed apps incorrect after check", 0,
|
||||||
appMonitor.getCompletedAppsListSize());
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
verify(rmContext.getStateStore(), times(10)).removeApplication(
|
||||||
|
isA(RMApp.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStateStoreAppLimitLessThanMemoryAppLimit() {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
RMContext rmContext = mockRMContext(10, now - 20000);
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
int maxAppsInMemory = 8;
|
||||||
|
int maxAppsInStateStore = 4;
|
||||||
|
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
|
||||||
|
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
|
||||||
|
maxAppsInStateStore);
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
|
||||||
|
|
||||||
|
addToCompletedApps(appMonitor, rmContext);
|
||||||
|
Assert.assertEquals("Number of completed apps incorrect", 10,
|
||||||
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
appMonitor.checkAppNumCompletedLimit();
|
||||||
|
|
||||||
|
Assert.assertEquals("Number of apps incorrect after # completed check",
|
||||||
|
maxAppsInMemory, rmContext.getRMApps().size());
|
||||||
|
Assert.assertEquals("Number of completed apps incorrect after check",
|
||||||
|
maxAppsInMemory, appMonitor.getCompletedAppsListSize());
|
||||||
|
|
||||||
|
int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore;
|
||||||
|
verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
|
||||||
|
.removeApplication(isA(RMApp.class));
|
||||||
|
Assert.assertEquals(maxAppsInStateStore,
|
||||||
|
appMonitor.getCompletedAppsInStateStore());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStateStoreAppLimitLargerThanMemoryAppLimit() {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
RMContext rmContext = mockRMContext(10, now - 20000);
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
int maxAppsInMemory = 8;
|
||||||
|
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
|
||||||
|
// larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
|
||||||
|
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000);
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
|
||||||
|
|
||||||
|
addToCompletedApps(appMonitor, rmContext);
|
||||||
|
Assert.assertEquals("Number of completed apps incorrect", 10,
|
||||||
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
appMonitor.checkAppNumCompletedLimit();
|
||||||
|
|
||||||
|
int numRemoveApps = 10 - maxAppsInMemory;
|
||||||
|
Assert.assertEquals("Number of apps incorrect after # completed check",
|
||||||
|
maxAppsInMemory, rmContext.getRMApps().size());
|
||||||
|
Assert.assertEquals("Number of completed apps incorrect after check",
|
||||||
|
maxAppsInMemory, appMonitor.getCompletedAppsListSize());
|
||||||
|
verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication(
|
||||||
|
isA(RMApp.class));
|
||||||
|
Assert.assertEquals(maxAppsInMemory,
|
||||||
|
appMonitor.getCompletedAppsInStateStore());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setupDispatcher(RMContext rmContext, Configuration conf) {
|
protected void setupDispatcher(RMContext rmContext, Configuration conf) {
|
||||||
|
@ -80,6 +80,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
@ -423,6 +424,8 @@ public void testRMRestartAppRunningAMFailed() throws Exception {
|
|||||||
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||||
Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
|
Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
|
||||||
.getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
|
.getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -629,6 +632,8 @@ public void testRMRestartFailedApp() throws Exception {
|
|||||||
.contains("Failing the application."));
|
.contains("Failing the application."));
|
||||||
// failed diagnostics from attempt is lost because the diagnostics from
|
// failed diagnostics from attempt is lost because the diagnostics from
|
||||||
// attempt is not yet available by the time app is saving the app state.
|
// attempt is not yet available by the time app is saving the app state.
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -675,6 +680,48 @@ public void testRMRestartKilledApp() throws Exception{
|
|||||||
ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
|
ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
|
||||||
Assert.assertEquals(app0.getDiagnostics().toString(),
|
Assert.assertEquals(app0.getDiagnostics().toString(),
|
||||||
appReport.getDiagnostics());
|
appReport.getDiagnostics());
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMRestartKilledAppWithNoAttempts() throws Exception {
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||||
|
@Override
|
||||||
|
public synchronized void storeApplicationAttemptStateInternal(
|
||||||
|
String attemptIdStr,
|
||||||
|
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
|
||||||
|
// ignore attempt saving request.
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void updateApplicationAttemptStateInternal(
|
||||||
|
String attemptIdStr,
|
||||||
|
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
|
||||||
|
// ignore attempt saving request.
|
||||||
|
}
|
||||||
|
};
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
// start RM
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
// create app
|
||||||
|
RMApp app0 =
|
||||||
|
rm1.submitApp(200, "name", "user",
|
||||||
|
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
|
||||||
|
null, "MAPREDUCE", false);
|
||||||
|
// kill the app.
|
||||||
|
rm1.killApp(app0.getApplicationId());
|
||||||
|
rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
|
||||||
|
|
||||||
|
// restart rm
|
||||||
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
RMApp loadedApp0 =
|
||||||
|
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||||
|
rm2.waitForState(loadedApp0.getApplicationId(), RMAppState.KILLED);
|
||||||
|
Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -724,6 +771,9 @@ public void testRMRestartSucceededApp() throws Exception {
|
|||||||
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
|
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
|
||||||
appReport.getFinalApplicationStatus());
|
appReport.getFinalApplicationStatus());
|
||||||
Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
|
Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -817,6 +867,9 @@ protected RMAppManager createRMAppManager() {
|
|||||||
// check application summary is logged for the completed apps after RM restart.
|
// check application summary is logged for the completed apps after RM restart.
|
||||||
verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
|
verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
|
||||||
isA(ApplicationId.class));
|
isA(ApplicationId.class));
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
||||||
@ -1378,6 +1431,52 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
|
|||||||
Assert.assertTrue(rmAppState.size() == NUM_APPS);
|
Assert.assertTrue(rmAppState.size() == NUM_APPS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFinishedAppRemovalAfterRMRestart() throws Exception {
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
|
||||||
|
memStore.init(conf);
|
||||||
|
RMState rmState = memStore.getState();
|
||||||
|
|
||||||
|
// start RM
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
|
||||||
|
// create an app and finish the app.
|
||||||
|
RMApp app0 = rm1.submitApp(200);
|
||||||
|
MockAM am0 = launchAM(app0, rm1, nm1);
|
||||||
|
finishApplicationMaster(app0, rm1, nm1, am0);
|
||||||
|
|
||||||
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
|
||||||
|
|
||||||
|
Map<ApplicationId, ApplicationState> rmAppState =
|
||||||
|
rmState.getApplicationState();
|
||||||
|
|
||||||
|
// app0 exits in both state store and rmContext
|
||||||
|
Assert.assertEquals(RMAppState.FINISHED,
|
||||||
|
rmAppState.get(app0.getApplicationId()).getState());
|
||||||
|
rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
|
||||||
|
|
||||||
|
// create one more app and finish the app.
|
||||||
|
RMApp app1 = rm2.submitApp(200);
|
||||||
|
MockAM am1 = launchAM(app1, rm2, nm1);
|
||||||
|
finishApplicationMaster(app1, rm2, nm1, am1);
|
||||||
|
|
||||||
|
// the first app0 get kicked out from both rmContext and state store
|
||||||
|
Assert.assertNull(rm2.getRMContext().getRMApps()
|
||||||
|
.get(app0.getApplicationId()));
|
||||||
|
Assert.assertNull(rmAppState.get(app0.getApplicationId()));
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
public static class TestSecurityMockRM extends MockRM {
|
public static class TestSecurityMockRM extends MockRM {
|
||||||
|
|
||||||
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
|
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -109,6 +110,7 @@ interface RMStateStoreHelper {
|
|||||||
boolean isFinalStateValid() throws Exception;
|
boolean isFinalStateValid() throws Exception;
|
||||||
void writeVersion(RMStateVersion version) throws Exception;
|
void writeVersion(RMStateVersion version) throws Exception;
|
||||||
RMStateVersion getCurrentVersion() throws Exception;
|
RMStateVersion getCurrentVersion() throws Exception;
|
||||||
|
boolean appExists(RMApp app) throws Exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
void waitNotify(TestDispatcher dispatcher) {
|
void waitNotify(TestDispatcher dispatcher) {
|
||||||
@ -128,7 +130,7 @@ void waitNotify(TestDispatcher dispatcher) {
|
|||||||
dispatcher.notified = false;
|
dispatcher.notified = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
|
RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
|
||||||
long startTime) throws Exception {
|
long startTime) throws Exception {
|
||||||
ApplicationSubmissionContext context =
|
ApplicationSubmissionContext context =
|
||||||
new ApplicationSubmissionContextPBImpl();
|
new ApplicationSubmissionContextPBImpl();
|
||||||
@ -141,6 +143,7 @@ void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
|
|||||||
when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
|
when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
|
||||||
when(mockApp.getUser()).thenReturn("test");
|
when(mockApp.getUser()).thenReturn("test");
|
||||||
store.storeNewApplication(mockApp);
|
store.storeNewApplication(mockApp);
|
||||||
|
return mockApp;
|
||||||
}
|
}
|
||||||
|
|
||||||
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
|
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
|
||||||
@ -370,6 +373,7 @@ public void testRMDTSecretManagerStateStore(
|
|||||||
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
|
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
|
||||||
Assert.assertEquals(sequenceNumber,
|
Assert.assertEquals(sequenceNumber,
|
||||||
secretManagerState.getDTSequenceNumber());
|
secretManagerState.getDTSequenceNumber());
|
||||||
|
store.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Token<AMRMTokenIdentifier> generateAMRMToken(
|
private Token<AMRMTokenIdentifier> generateAMRMToken(
|
||||||
@ -415,4 +419,43 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper)
|
|||||||
Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
|
Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
|
||||||
|
throws Exception {
|
||||||
|
RMStateStore store = stateStoreHelper.getRMStateStore();
|
||||||
|
store.setRMDispatcher(new TestDispatcher());
|
||||||
|
// create and store apps
|
||||||
|
ArrayList<RMApp> appList = new ArrayList<RMApp>();
|
||||||
|
int NUM_APPS = 5;
|
||||||
|
for (int i = 0; i < NUM_APPS; i++) {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(1383183338, i);
|
||||||
|
RMApp app = storeApp(store, appId, 123456789, 987654321);
|
||||||
|
appList.add(app);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(NUM_APPS, appList.size());
|
||||||
|
for (RMApp app : appList) {
|
||||||
|
// wait for app to be stored.
|
||||||
|
while (true) {
|
||||||
|
if (stateStoreHelper.appExists(app)) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (RMApp app : appList) {
|
||||||
|
// remove the app
|
||||||
|
store.removeApplication(app);
|
||||||
|
// wait for app to be removed.
|
||||||
|
while (true) {
|
||||||
|
if (!stateStoreHelper.appExists(app)) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
@ -38,6 +39,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -69,6 +71,13 @@ public Path getVersionNode() {
|
|||||||
public RMStateVersion getCurrentVersion() {
|
public RMStateVersion getCurrentVersion() {
|
||||||
return CURRENT_VERSION_INFO;
|
return CURRENT_VERSION_INFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Path getAppDir(String appId) {
|
||||||
|
Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME);
|
||||||
|
Path appRootDir = new Path(rootDir, RM_APP_ROOT);
|
||||||
|
Path appDir = new Path(appRootDir, appId);
|
||||||
|
return appDir;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
|
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
|
||||||
@ -109,9 +118,16 @@ public void writeVersion(RMStateVersion version) throws Exception {
|
|||||||
public RMStateVersion getCurrentVersion() throws Exception {
|
public RMStateVersion getCurrentVersion() throws Exception {
|
||||||
return store.getCurrentVersion();
|
return store.getCurrentVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean appExists(RMApp app) throws IOException {
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
Path nodePath =
|
||||||
|
store.getAppDir(app.getApplicationId().toString());
|
||||||
|
return fs.exists(nodePath);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout = 60000)
|
||||||
public void testFSRMStateStore() throws Exception {
|
public void testFSRMStateStore() throws Exception {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
@ -126,11 +142,8 @@ public void testFSRMStateStore() throws Exception {
|
|||||||
String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
|
String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
|
||||||
ApplicationAttemptId attemptId3 =
|
ApplicationAttemptId attemptId3 =
|
||||||
ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
|
ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
|
||||||
Path rootDir =
|
|
||||||
new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
|
|
||||||
Path appRootDir = new Path(rootDir, "RMAppRoot");
|
|
||||||
Path appDir =
|
Path appDir =
|
||||||
new Path(appRootDir, attemptId3.getApplicationId().toString());
|
fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
|
||||||
Path tempAppAttemptFile =
|
Path tempAppAttemptFile =
|
||||||
new Path(appDir, attemptId3.toString() + ".tmp");
|
new Path(appDir, attemptId3.toString() + ".tmp");
|
||||||
fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
|
fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
|
||||||
@ -138,10 +151,11 @@ public void testFSRMStateStore() throws Exception {
|
|||||||
fsOut.close();
|
fsOut.close();
|
||||||
|
|
||||||
testRMAppStateStore(fsTester);
|
testRMAppStateStore(fsTester);
|
||||||
Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
|
Assert.assertFalse(fsTester.workingDirPathURI
|
||||||
.getFileSystem(conf).exists(tempAppAttemptFile));
|
.getFileSystem(conf).exists(tempAppAttemptFile));
|
||||||
testRMDTSecretManagerStateStore(fsTester);
|
testRMDTSecretManagerStateStore(fsTester);
|
||||||
testCheckVersion(fsTester);
|
testCheckVersion(fsTester);
|
||||||
|
testAppDeletion(fsTester);
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,9 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestZKRMStateStore extends RMStateStoreTestBase {
|
public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
@ -57,6 +59,7 @@ class TestZKRMStateStoreTester implements RMStateStoreHelper {
|
|||||||
|
|
||||||
ZooKeeper client;
|
ZooKeeper client;
|
||||||
TestZKRMStateStoreInternal store;
|
TestZKRMStateStoreInternal store;
|
||||||
|
String workingZnode;
|
||||||
|
|
||||||
class TestZKRMStateStoreInternal extends ZKRMStateStore {
|
class TestZKRMStateStoreInternal extends ZKRMStateStore {
|
||||||
|
|
||||||
@ -79,11 +82,16 @@ public String getVersionNode() {
|
|||||||
public RMStateVersion getCurrentVersion() {
|
public RMStateVersion getCurrentVersion() {
|
||||||
return CURRENT_VERSION_INFO;
|
return CURRENT_VERSION_INFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getAppNode(String appId) {
|
||||||
|
return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/"
|
||||||
|
+ appId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMStateStore getRMStateStore() throws Exception {
|
public RMStateStore getRMStateStore() throws Exception {
|
||||||
String workingZnode = "/Test";
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
Configuration conf = new YarnConfiguration();
|
workingZnode = "/Test";
|
||||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
|
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
|
||||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||||
this.client = createClient();
|
this.client = createClient();
|
||||||
@ -107,14 +115,22 @@ public void writeVersion(RMStateVersion version) throws Exception {
|
|||||||
public RMStateVersion getCurrentVersion() throws Exception {
|
public RMStateVersion getCurrentVersion() throws Exception {
|
||||||
return store.getCurrentVersion();
|
return store.getCurrentVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean appExists(RMApp app) throws Exception {
|
||||||
|
Stat node =
|
||||||
|
client.exists(store.getAppNode(app.getApplicationId().toString()),
|
||||||
|
false);
|
||||||
|
return node !=null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testZKRMStateStoreRealZK() throws Exception {
|
public void testZKRMStateStoreRealZK() throws Exception {
|
||||||
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||||
testRMAppStateStore(zkTester);
|
testRMAppStateStore(zkTester);
|
||||||
testRMDTSecretManagerStateStore(zkTester);
|
testRMDTSecretManagerStateStore(zkTester);
|
||||||
testCheckVersion(zkTester);
|
testCheckVersion(zkTester);
|
||||||
|
testAppDeletion(zkTester);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Configuration createHARMConf(
|
private Configuration createHARMConf(
|
||||||
|
@ -120,7 +120,7 @@ public void testZKClientRetry() throws Exception {
|
|||||||
TestZKClient zkClientTester = new TestZKClient();
|
TestZKClient zkClientTester = new TestZKClient();
|
||||||
final String path = "/test";
|
final String path = "/test";
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
|
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 1000);
|
||||||
conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100);
|
conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100);
|
||||||
final ZKRMStateStore store =
|
final ZKRMStateStore store =
|
||||||
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
|
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user