YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions after RM recovery but before scheduler learns about apps and app-attempts. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611222 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-07-17 00:14:56 +00:00
parent c477a166e1
commit bda23181bf
12 changed files with 131 additions and 63 deletions

View File

@ -62,6 +62,10 @@ Release 2.6.0 - UNRELEASED
YARN-2264. Fixed a race condition in DrainDispatcher which may cause random
test failures. (Li Lu via jianhe)
YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions
after RM recovery but before scheduler learns about apps and app-attempts.
(Jian He via vinodkv)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -205,12 +205,6 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
// recovery the app returns ACCEPTED state and the app once again go
// through the scheduler and triggers one more APP_ACCEPTED event at
// ACCEPTED state.
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED)
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@ -789,8 +783,18 @@ public class RMAppImpl implements RMApp, Recoverable {
return app.recoveredFinalState;
}
// Notify scheduler about the app on recovery
new AddApplicationToSchedulerTransition().transition(app, event);
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user));
return RMAppState.SUBMITTED;
}
// Add application to scheduler synchronously to guarantee scheduler
// knows applications before AM or NM re-registers.
app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user, true));
// recover attempts
app.recoverAppAttempts();
@ -805,12 +809,6 @@ public class RMAppImpl implements RMApp, Recoverable {
return RMAppState.ACCEPTED;
}
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
return RMAppState.SUBMITTED;
}
// YARN-1507 is saving the application state after the application is
// accepted. So after YARN-1507, an app is saved meaning it is accepted.
// Thus we return ACCECPTED state on recovery.

View File

@ -926,8 +926,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.masterService
.registerAppAttempt(appAttempt.applicationAttemptId);
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
appAttempt.getAppAttemptId(), false, false));
// Add attempt to scheduler synchronously to guarantee scheduler
// knows attempts before AM or NM re-registers.
appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent(
appAttempt.getAppAttemptId(), false, true));
}
/*

View File

@ -521,7 +521,7 @@ public class CapacityScheduler extends
}
private synchronized void addApplication(ApplicationId applicationId,
String queueName, String user) {
String queueName, String user, boolean isAppRecovering) {
// santiy checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
@ -553,14 +553,20 @@ public class CapacityScheduler extends
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
rmContext.getDispatcher().getEventHandler()
if (isAppRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
} else {
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
}
private synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean shouldNotifyAttemptAdded) {
boolean isAttemptRecovering) {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
CSQueue queue = (CSQueue) application.getQueue();
@ -578,14 +584,15 @@ public class CapacityScheduler extends
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
+ queue.getQueueName());
if (shouldNotifyAttemptAdded) {
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
} else {
if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping notifying ATTEMPT_ADDED");
LOG.debug(applicationAttemptId
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
}
} else {
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
}
@ -905,7 +912,8 @@ public class CapacityScheduler extends
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser());
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
@ -921,7 +929,7 @@ public class CapacityScheduler extends
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
appAttemptAddedEvent.getShouldNotifyAttemptAdded());
appAttemptAddedEvent.getIsAttemptRecovering());
}
break;
case APP_ATTEMPT_REMOVED:

View File

@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
private final ApplicationId applicationId;
private final String queue;
private final String user;
private final boolean isAppRecovering;
public AppAddedSchedulerEvent(
ApplicationId applicationId, String queue, String user) {
this(applicationId, queue, user, false);
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, boolean isAppRecovering) {
super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId;
this.queue = queue;
this.user = user;
this.isAppRecovering = isAppRecovering;
}
public ApplicationId getApplicationId() {
@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
return user;
}
public boolean getIsAppRecovering() {
return isAppRecovering;
}
}

View File

@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
private final ApplicationAttemptId applicationAttemptId;
private final boolean transferStateFromPreviousAttempt;
private final boolean shouldNotifyAttemptAdded;
private final boolean isAttemptRecovering;
public AppAttemptAddedSchedulerEvent(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) {
this(applicationAttemptId, transferStateFromPreviousAttempt, true);
this(applicationAttemptId, transferStateFromPreviousAttempt, false);
}
public AppAttemptAddedSchedulerEvent(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean shouldNotifyAttemptAdded) {
boolean isAttemptRecovering) {
super(SchedulerEventType.APP_ATTEMPT_ADDED);
this.applicationAttemptId = applicationAttemptId;
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
this.isAttemptRecovering = isAttemptRecovering;
}
public ApplicationAttemptId getApplicationAttemptId() {
@ -50,7 +50,7 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
return transferStateFromPreviousAttempt;
}
public boolean getShouldNotifyAttemptAdded() {
return shouldNotifyAttemptAdded;
public boolean getIsAttemptRecovering() {
return isAttemptRecovering;
}
}

View File

@ -566,7 +566,7 @@ public class FairScheduler extends
* configured limits, but the app will not be marked as runnable.
*/
protected synchronized void addApplication(ApplicationId applicationId,
String queueName, String user) {
String queueName, String user, boolean isAppRecovering) {
if (queueName == null || queueName.isEmpty()) {
String message = "Reject application " + applicationId +
" submitted by user " + user + " with an empty queue name.";
@ -603,8 +603,14 @@ public class FairScheduler extends
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName + ", currently num of applications: "
+ applications.size());
rmContext.getDispatcher().getEventHandler()
if (isAppRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
} else {
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
}
/**
@ -613,7 +619,7 @@ public class FairScheduler extends
protected synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean shouldNotifyAttemptAdded) {
boolean isAttemptRecovering) {
SchedulerApplication<FSSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser();
@ -642,14 +648,15 @@ public class FairScheduler extends
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user: " + user);
if (shouldNotifyAttemptAdded) {
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
} else {
if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping notifying ATTEMPT_ADDED");
LOG.debug(applicationAttemptId
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
}
} else {
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
}
@ -1136,7 +1143,8 @@ public class FairScheduler extends
}
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser());
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
break;
case APP_REMOVED:
if (!(event instanceof AppRemovedSchedulerEvent)) {
@ -1154,7 +1162,7 @@ public class FairScheduler extends
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
appAttemptAddedEvent.getShouldNotifyAttemptAdded());
appAttemptAddedEvent.getIsAttemptRecovering());
break;
case APP_ATTEMPT_REMOVED:
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {

View File

@ -356,22 +356,28 @@ public class FifoScheduler extends
@VisibleForTesting
public synchronized void addApplication(ApplicationId applicationId,
String queue, String user) {
String queue, String user, boolean isAppRecovering) {
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
applications.put(applicationId, application);
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
rmContext.getDispatcher().getEventHandler()
if (isAppRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
} else {
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
}
@VisibleForTesting
public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt,
boolean shouldNotifyAttemptAdded) {
boolean isAttemptRecovering) {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
@ -389,14 +395,15 @@ public class FifoScheduler extends
metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser());
if (shouldNotifyAttemptAdded) {
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
} else {
if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping notifying ATTEMPT_ADDED");
LOG.debug(appAttemptId
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
}
} else {
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
}
@ -772,7 +779,8 @@ public class FifoScheduler extends
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser());
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
@ -788,7 +796,7 @@ public class FifoScheduler extends
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
appAttemptAddedEvent.getShouldNotifyAttemptAdded());
appAttemptAddedEvent.getIsAttemptRecovering());
}
break;
case APP_ATTEMPT_REMOVED:

View File

@ -228,7 +228,7 @@ public class TestFifoScheduler {
scheduler.handle(new NodeAddedSchedulerEvent(node));
ApplicationId appId = ApplicationId.newInstance(0, 1);
scheduler.addApplication(appId, "queue1", "user1");
scheduler.addApplication(appId, "queue1", "user1", true);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
try {

View File

@ -610,6 +610,36 @@ public class TestWorkPreservingRMRestart {
attempt0.getMasterContainer().getId()).isAMContainer());
}
@Test (timeout = 20000)
public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception {
// start RM
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// create app and launch the AM
RMApp app0 = rm1.submitApp(200);
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// scheduler app/attempt is immediately available after RM is re-started.
Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo(
am0.getApplicationAttemptId()));
// getTransferredContainers should not throw NPE.
((AbstractYarnScheduler) rm2.getResourceScheduler())
.getTransferredContainers(am0.getApplicationAttemptId());
List<NMContainerStatus> containers = createNMContainerStatusForApp(am0);
nm1.registerNode(containers, null);
waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
}
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted,

View File

@ -147,7 +147,7 @@ public class FairSchedulerTestBase {
int memory, int vcores, String queueId, String userId, int numContainers,
int priority) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(id.getApplicationId(), queueId, userId);
scheduler.addApplication(id.getApplicationId(), queueId, userId, true);
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {

View File

@ -793,13 +793,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", true);
scheduler.addApplicationAttempt(id11, false, true);
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", true);
scheduler.addApplicationAttempt(id21, false, true);
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", true);
scheduler.addApplicationAttempt(id22, false, true);
int minReqSize =
@ -1561,7 +1561,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.handle(nodeEvent2);
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", true);
scheduler.addApplicationAttempt(appId, false, true);
// 1 request with 2 nodes on the same rack. another request with 1 node on
@ -1843,7 +1843,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
scheduler.addApplication(attId.getApplicationId(), queue, user);
scheduler.addApplication(attId.getApplicationId(), queue, user, true);
numTries = 0;
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
@ -2720,7 +2720,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// send application request
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", true);
fs.addApplicationAttempt(appAttemptId, false, true);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request =