YARN-534. Change RM restart recovery to also account for AM max-attempts configuration after the restart. Contributed by Jian He.

svn merge --ignore-ancestry -c 1466208 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1466209 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-09 20:16:56 +00:00
parent 76d72db5eb
commit fd5224e62c
4 changed files with 122 additions and 9 deletions

View File

@ -153,6 +153,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-112. Fixed a race condition during localization that fails containers.
(Omkar Vinit Joshi via vinodkv)
YARN-534. Change RM restart recovery to also account for AM max-attempts
configuration after the restart. (Jian He via vinodkv)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -57,6 +57,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
private int globalMaxAppAttempts;
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
private final RMContext rmContext;
@ -76,6 +77,8 @@ public RMAppManager(RMContext context,
setCompletedAppsMax(conf.getInt(
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
}
/**
@ -308,6 +311,7 @@ public void recover(RMState state) throws Exception {
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
for(ApplicationState appState : appStates.values()) {
boolean shouldRecover = true;
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
@ -318,8 +322,28 @@ public void recover(RMState state) throws Exception {
// This will need to be changed in work preserving recovery in which
// RM will re-connect with the running AM's instead of restarting them
LOG.info("Not recovering unmanaged application " + appState.getAppId());
store.removeApplication(appState);
shouldRecover = false;
}
int individualMaxAppAttempts = appState.getApplicationSubmissionContext()
.getMaxAppAttempts();
int maxAppAttempts;
if (individualMaxAppAttempts <= 0 ||
individualMaxAppAttempts > globalMaxAppAttempts) {
maxAppAttempts = globalMaxAppAttempts;
LOG.warn("The specific max attempts: " + individualMaxAppAttempts
+ " for application: " + appState.getAppId()
+ " is invalid, because it is out of the range [1, "
+ globalMaxAppAttempts + "]. Use the global max attempts instead.");
} else {
maxAppAttempts = individualMaxAppAttempts;
}
if(appState.getAttemptCount() >= maxAppAttempts) {
LOG.info("Not recovering application " + appState.getAppId() +
" due to recovering attempt is beyond maxAppAttempt limit");
shouldRecover = false;
}
if(shouldRecover) {
LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(),
appState.getSubmitTime());
@ -328,6 +352,9 @@ public void recover(RMState state) throws Exception {
appState.getAppId());
appImpl.recover(state);
}
else {
store.removeApplication(appState);
}
}
}

View File

@ -128,21 +128,28 @@ public RMApp submitApp(int masterMemory) throws Exception {
// client
public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
return submitApp(masterMemory, name, user, null, false, null);
return submitApp(masterMemory, name, user, null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls) throws Exception {
return submitApp(masterMemory, name, user, acls, false, null);
return submitApp(masterMemory, name, user, acls, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, String queue) throws Exception {
return submitApp(masterMemory, name, user, acls, false, queue);
return submitApp(masterMemory, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue) throws Exception {
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts) throws Exception {
ClientRMProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@ -155,6 +162,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setUser(user);
sub.setMaxAppAttempts(maxAppAttempts);
if(unmanaged) {
sub.setUnmanagedAM(true);
}

View File

@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -62,6 +64,7 @@ public void testRMRestart() throws Exception {
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
@ -152,7 +155,9 @@ public void testRMRestart() throws Exception {
.getApplicationId());
// create unmanaged app
RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, null);
RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true,
null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
ApplicationAttemptId unmanagedAttemptId =
appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
// assert appUnmanaged info is saved
@ -306,4 +311,74 @@ public void testRMRestart() throws Exception {
Assert.assertEquals(0, rmAppState.size());
}
@Test
public void testRMRestartOnMaxAppAttempts() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE,
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// submit an app with maxAppAttempts equals to 1
RMApp app1 = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", 1);
// submit an app with maxAppAttempts equals to -1
RMApp app2 = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1);
// assert app1 info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
.getApplicationId(), app1.getApplicationSubmissionContext()
.getApplicationId());
// Allocate the AM
nm1.nodeHeartbeat(true);
RMAppAttempt attempt = app1.getCurrentAppAttempt();
ApplicationAttemptId attemptId1 = attempt.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
Assert.assertEquals(1, appState.getAttemptCount());
ApplicationAttemptState attemptState =
appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
rm1.stop();
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
// verify that maxAppAttempts is set to global value
Assert.assertEquals(2,
rm2.getRMContext().getRMApps().get(app2.getApplicationId())
.getMaxAppAttempts());
// verify that app2 exists app1 is removed
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
Assert.assertNotNull(rm2.getRMContext().getRMApps()
.get(app2.getApplicationId()));
Assert.assertNull(rm2.getRMContext().getRMApps()
.get(app1.getApplicationId()));
// stop the RM
rm2.stop();
}
}