YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions after RM recovery but before scheduler learns about apps and app-attempts. Contributed by Jian He.
svn merge --ignore-ancestry -c 1611222 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1611223 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
59c91b22d7
commit
63e374060e
@ -44,6 +44,10 @@ Release 2.6.0 - UNRELEASED
|
|||||||
YARN-2264. Fixed a race condition in DrainDispatcher which may cause random
|
YARN-2264. Fixed a race condition in DrainDispatcher which may cause random
|
||||||
test failures. (Li Lu via jianhe)
|
test failures. (Li Lu via jianhe)
|
||||||
|
|
||||||
|
YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions
|
||||||
|
after RM recovery but before scheduler learns about apps and app-attempts.
|
||||||
|
(Jian He via vinodkv)
|
||||||
|
|
||||||
Release 2.5.0 - UNRELEASED
|
Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -205,12 +205,6 @@ RMAppEventType.KILL, new KillAttemptTransition())
|
|||||||
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
||||||
RMAppEventType.APP_RUNNING_ON_NODE,
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
new AppRunningOnNodeTransition())
|
new AppRunningOnNodeTransition())
|
||||||
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
|
|
||||||
// recovery the app returns ACCEPTED state and the app once again go
|
|
||||||
// through the scheduler and triggers one more APP_ACCEPTED event at
|
|
||||||
// ACCEPTED state.
|
|
||||||
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
|
||||||
RMAppEventType.APP_ACCEPTED)
|
|
||||||
|
|
||||||
// Transitions from RUNNING state
|
// Transitions from RUNNING state
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||||
@ -789,8 +783,18 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
return app.recoveredFinalState;
|
return app.recoveredFinalState;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify scheduler about the app on recovery
|
// No existent attempts means the attempt associated with this app was not
|
||||||
new AddApplicationToSchedulerTransition().transition(app, event);
|
// started or started but not yet saved.
|
||||||
|
if (app.attempts.isEmpty()) {
|
||||||
|
app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
|
||||||
|
app.submissionContext.getQueue(), app.user));
|
||||||
|
return RMAppState.SUBMITTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add application to scheduler synchronously to guarantee scheduler
|
||||||
|
// knows applications before AM or NM re-registers.
|
||||||
|
app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
|
||||||
|
app.submissionContext.getQueue(), app.user, true));
|
||||||
|
|
||||||
// recover attempts
|
// recover attempts
|
||||||
app.recoverAppAttempts();
|
app.recoverAppAttempts();
|
||||||
@ -805,12 +809,6 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
return RMAppState.ACCEPTED;
|
return RMAppState.ACCEPTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
// No existent attempts means the attempt associated with this app was not
|
|
||||||
// started or started but not yet saved.
|
|
||||||
if (app.attempts.isEmpty()) {
|
|
||||||
return RMAppState.SUBMITTED;
|
|
||||||
}
|
|
||||||
|
|
||||||
// YARN-1507 is saving the application state after the application is
|
// YARN-1507 is saving the application state after the application is
|
||||||
// accepted. So after YARN-1507, an app is saved meaning it is accepted.
|
// accepted. So after YARN-1507, an app is saved meaning it is accepted.
|
||||||
// Thus we return ACCECPTED state on recovery.
|
// Thus we return ACCECPTED state on recovery.
|
||||||
|
@ -926,8 +926,10 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
|||||||
appAttempt.masterService
|
appAttempt.masterService
|
||||||
.registerAppAttempt(appAttempt.applicationAttemptId);
|
.registerAppAttempt(appAttempt.applicationAttemptId);
|
||||||
|
|
||||||
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
|
// Add attempt to scheduler synchronously to guarantee scheduler
|
||||||
appAttempt.getAppAttemptId(), false, false));
|
// knows attempts before AM or NM re-registers.
|
||||||
|
appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent(
|
||||||
|
appAttempt.getAppAttemptId(), false, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -521,7 +521,7 @@ synchronized CSQueue getQueue(String queueName) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addApplication(ApplicationId applicationId,
|
private synchronized void addApplication(ApplicationId applicationId,
|
||||||
String queueName, String user) {
|
String queueName, String user, boolean isAppRecovering) {
|
||||||
// santiy checks.
|
// santiy checks.
|
||||||
CSQueue queue = getQueue(queueName);
|
CSQueue queue = getQueue(queueName);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
@ -553,14 +553,20 @@ private synchronized void addApplication(ApplicationId applicationId,
|
|||||||
applications.put(applicationId, application);
|
applications.put(applicationId, application);
|
||||||
LOG.info("Accepted application " + applicationId + " from user: " + user
|
LOG.info("Accepted application " + applicationId + " from user: " + user
|
||||||
+ ", in queue: " + queueName);
|
+ ", in queue: " + queueName);
|
||||||
rmContext.getDispatcher().getEventHandler()
|
if (isAppRecovering) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rmContext.getDispatcher().getEventHandler()
|
||||||
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addApplicationAttempt(
|
private synchronized void addApplicationAttempt(
|
||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
boolean transferStateFromPreviousAttempt,
|
boolean transferStateFromPreviousAttempt,
|
||||||
boolean shouldNotifyAttemptAdded) {
|
boolean isAttemptRecovering) {
|
||||||
SchedulerApplication<FiCaSchedulerApp> application =
|
SchedulerApplication<FiCaSchedulerApp> application =
|
||||||
applications.get(applicationAttemptId.getApplicationId());
|
applications.get(applicationAttemptId.getApplicationId());
|
||||||
CSQueue queue = (CSQueue) application.getQueue();
|
CSQueue queue = (CSQueue) application.getQueue();
|
||||||
@ -578,14 +584,15 @@ private synchronized void addApplicationAttempt(
|
|||||||
LOG.info("Added Application Attempt " + applicationAttemptId
|
LOG.info("Added Application Attempt " + applicationAttemptId
|
||||||
+ " to scheduler from user " + application.getUser() + " in queue "
|
+ " to scheduler from user " + application.getUser() + " in queue "
|
||||||
+ queue.getQueueName());
|
+ queue.getQueueName());
|
||||||
if (shouldNotifyAttemptAdded) {
|
if (isAttemptRecovering) {
|
||||||
rmContext.getDispatcher().getEventHandler().handle(
|
|
||||||
new RMAppAttemptEvent(applicationAttemptId,
|
|
||||||
RMAppAttemptEventType.ATTEMPT_ADDED));
|
|
||||||
} else {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Skipping notifying ATTEMPT_ADDED");
|
LOG.debug(applicationAttemptId
|
||||||
|
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMAppAttemptEvent(applicationAttemptId,
|
||||||
|
RMAppAttemptEventType.ATTEMPT_ADDED));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -905,7 +912,8 @@ public void handle(SchedulerEvent event) {
|
|||||||
{
|
{
|
||||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
||||||
addApplication(appAddedEvent.getApplicationId(),
|
addApplication(appAddedEvent.getApplicationId(),
|
||||||
appAddedEvent.getQueue(), appAddedEvent.getUser());
|
appAddedEvent.getQueue(), appAddedEvent.getUser(),
|
||||||
|
appAddedEvent.getIsAppRecovering());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case APP_REMOVED:
|
case APP_REMOVED:
|
||||||
@ -921,7 +929,7 @@ public void handle(SchedulerEvent event) {
|
|||||||
(AppAttemptAddedSchedulerEvent) event;
|
(AppAttemptAddedSchedulerEvent) event;
|
||||||
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||||
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
|
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
|
||||||
appAttemptAddedEvent.getShouldNotifyAttemptAdded());
|
appAttemptAddedEvent.getIsAttemptRecovering());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case APP_ATTEMPT_REMOVED:
|
case APP_ATTEMPT_REMOVED:
|
||||||
|
@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
|
|||||||
private final ApplicationId applicationId;
|
private final ApplicationId applicationId;
|
||||||
private final String queue;
|
private final String queue;
|
||||||
private final String user;
|
private final String user;
|
||||||
|
private final boolean isAppRecovering;
|
||||||
|
|
||||||
public AppAddedSchedulerEvent(
|
public AppAddedSchedulerEvent(
|
||||||
ApplicationId applicationId, String queue, String user) {
|
ApplicationId applicationId, String queue, String user) {
|
||||||
|
this(applicationId, queue, user, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
|
||||||
|
String user, boolean isAppRecovering) {
|
||||||
super(SchedulerEventType.APP_ADDED);
|
super(SchedulerEventType.APP_ADDED);
|
||||||
this.applicationId = applicationId;
|
this.applicationId = applicationId;
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
this.user = user;
|
this.user = user;
|
||||||
|
this.isAppRecovering = isAppRecovering;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ApplicationId getApplicationId() {
|
public ApplicationId getApplicationId() {
|
||||||
@ -46,4 +53,7 @@ public String getUser() {
|
|||||||
return user;
|
return user;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean getIsAppRecovering() {
|
||||||
|
return isAppRecovering;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
|
|||||||
|
|
||||||
private final ApplicationAttemptId applicationAttemptId;
|
private final ApplicationAttemptId applicationAttemptId;
|
||||||
private final boolean transferStateFromPreviousAttempt;
|
private final boolean transferStateFromPreviousAttempt;
|
||||||
private final boolean shouldNotifyAttemptAdded;
|
private final boolean isAttemptRecovering;
|
||||||
|
|
||||||
public AppAttemptAddedSchedulerEvent(
|
public AppAttemptAddedSchedulerEvent(
|
||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
boolean transferStateFromPreviousAttempt) {
|
boolean transferStateFromPreviousAttempt) {
|
||||||
this(applicationAttemptId, transferStateFromPreviousAttempt, true);
|
this(applicationAttemptId, transferStateFromPreviousAttempt, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AppAttemptAddedSchedulerEvent(
|
public AppAttemptAddedSchedulerEvent(
|
||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
boolean transferStateFromPreviousAttempt,
|
boolean transferStateFromPreviousAttempt,
|
||||||
boolean shouldNotifyAttemptAdded) {
|
boolean isAttemptRecovering) {
|
||||||
super(SchedulerEventType.APP_ATTEMPT_ADDED);
|
super(SchedulerEventType.APP_ATTEMPT_ADDED);
|
||||||
this.applicationAttemptId = applicationAttemptId;
|
this.applicationAttemptId = applicationAttemptId;
|
||||||
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
|
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
|
||||||
this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
|
this.isAttemptRecovering = isAttemptRecovering;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ApplicationAttemptId getApplicationAttemptId() {
|
public ApplicationAttemptId getApplicationAttemptId() {
|
||||||
@ -50,7 +50,7 @@ public boolean getTransferStateFromPreviousAttempt() {
|
|||||||
return transferStateFromPreviousAttempt;
|
return transferStateFromPreviousAttempt;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean getShouldNotifyAttemptAdded() {
|
public boolean getIsAttemptRecovering() {
|
||||||
return shouldNotifyAttemptAdded;
|
return isAttemptRecovering;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -566,7 +566,7 @@ public FairSchedulerEventLog getEventLog() {
|
|||||||
* configured limits, but the app will not be marked as runnable.
|
* configured limits, but the app will not be marked as runnable.
|
||||||
*/
|
*/
|
||||||
protected synchronized void addApplication(ApplicationId applicationId,
|
protected synchronized void addApplication(ApplicationId applicationId,
|
||||||
String queueName, String user) {
|
String queueName, String user, boolean isAppRecovering) {
|
||||||
if (queueName == null || queueName.isEmpty()) {
|
if (queueName == null || queueName.isEmpty()) {
|
||||||
String message = "Reject application " + applicationId +
|
String message = "Reject application " + applicationId +
|
||||||
" submitted by user " + user + " with an empty queue name.";
|
" submitted by user " + user + " with an empty queue name.";
|
||||||
@ -603,8 +603,14 @@ protected synchronized void addApplication(ApplicationId applicationId,
|
|||||||
LOG.info("Accepted application " + applicationId + " from user: " + user
|
LOG.info("Accepted application " + applicationId + " from user: " + user
|
||||||
+ ", in queue: " + queueName + ", currently num of applications: "
|
+ ", in queue: " + queueName + ", currently num of applications: "
|
||||||
+ applications.size());
|
+ applications.size());
|
||||||
rmContext.getDispatcher().getEventHandler()
|
if (isAppRecovering) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rmContext.getDispatcher().getEventHandler()
|
||||||
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -613,7 +619,7 @@ protected synchronized void addApplication(ApplicationId applicationId,
|
|||||||
protected synchronized void addApplicationAttempt(
|
protected synchronized void addApplicationAttempt(
|
||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
boolean transferStateFromPreviousAttempt,
|
boolean transferStateFromPreviousAttempt,
|
||||||
boolean shouldNotifyAttemptAdded) {
|
boolean isAttemptRecovering) {
|
||||||
SchedulerApplication<FSSchedulerApp> application =
|
SchedulerApplication<FSSchedulerApp> application =
|
||||||
applications.get(applicationAttemptId.getApplicationId());
|
applications.get(applicationAttemptId.getApplicationId());
|
||||||
String user = application.getUser();
|
String user = application.getUser();
|
||||||
@ -642,14 +648,15 @@ queue, new ActiveUsersManager(getRootQueueMetrics()),
|
|||||||
LOG.info("Added Application Attempt " + applicationAttemptId
|
LOG.info("Added Application Attempt " + applicationAttemptId
|
||||||
+ " to scheduler from user: " + user);
|
+ " to scheduler from user: " + user);
|
||||||
|
|
||||||
if (shouldNotifyAttemptAdded) {
|
if (isAttemptRecovering) {
|
||||||
rmContext.getDispatcher().getEventHandler().handle(
|
|
||||||
new RMAppAttemptEvent(applicationAttemptId,
|
|
||||||
RMAppAttemptEventType.ATTEMPT_ADDED));
|
|
||||||
} else {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Skipping notifying ATTEMPT_ADDED");
|
LOG.debug(applicationAttemptId
|
||||||
|
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMAppAttemptEvent(applicationAttemptId,
|
||||||
|
RMAppAttemptEventType.ATTEMPT_ADDED));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1136,7 +1143,8 @@ public void handle(SchedulerEvent event) {
|
|||||||
}
|
}
|
||||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
||||||
addApplication(appAddedEvent.getApplicationId(),
|
addApplication(appAddedEvent.getApplicationId(),
|
||||||
appAddedEvent.getQueue(), appAddedEvent.getUser());
|
appAddedEvent.getQueue(), appAddedEvent.getUser(),
|
||||||
|
appAddedEvent.getIsAppRecovering());
|
||||||
break;
|
break;
|
||||||
case APP_REMOVED:
|
case APP_REMOVED:
|
||||||
if (!(event instanceof AppRemovedSchedulerEvent)) {
|
if (!(event instanceof AppRemovedSchedulerEvent)) {
|
||||||
@ -1154,7 +1162,7 @@ public void handle(SchedulerEvent event) {
|
|||||||
(AppAttemptAddedSchedulerEvent) event;
|
(AppAttemptAddedSchedulerEvent) event;
|
||||||
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||||
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
|
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
|
||||||
appAttemptAddedEvent.getShouldNotifyAttemptAdded());
|
appAttemptAddedEvent.getIsAttemptRecovering());
|
||||||
break;
|
break;
|
||||||
case APP_ATTEMPT_REMOVED:
|
case APP_ATTEMPT_REMOVED:
|
||||||
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
|
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
|
||||||
|
@ -356,22 +356,28 @@ private FiCaSchedulerNode getNode(NodeId nodeId) {
|
|||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public synchronized void addApplication(ApplicationId applicationId,
|
public synchronized void addApplication(ApplicationId applicationId,
|
||||||
String queue, String user) {
|
String queue, String user, boolean isAppRecovering) {
|
||||||
SchedulerApplication<FiCaSchedulerApp> application =
|
SchedulerApplication<FiCaSchedulerApp> application =
|
||||||
new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
|
new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
|
||||||
applications.put(applicationId, application);
|
applications.put(applicationId, application);
|
||||||
metrics.submitApp(user);
|
metrics.submitApp(user);
|
||||||
LOG.info("Accepted application " + applicationId + " from user: " + user
|
LOG.info("Accepted application " + applicationId + " from user: " + user
|
||||||
+ ", currently num of applications: " + applications.size());
|
+ ", currently num of applications: " + applications.size());
|
||||||
rmContext.getDispatcher().getEventHandler()
|
if (isAppRecovering) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rmContext.getDispatcher().getEventHandler()
|
||||||
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public synchronized void
|
public synchronized void
|
||||||
addApplicationAttempt(ApplicationAttemptId appAttemptId,
|
addApplicationAttempt(ApplicationAttemptId appAttemptId,
|
||||||
boolean transferStateFromPreviousAttempt,
|
boolean transferStateFromPreviousAttempt,
|
||||||
boolean shouldNotifyAttemptAdded) {
|
boolean isAttemptRecovering) {
|
||||||
SchedulerApplication<FiCaSchedulerApp> application =
|
SchedulerApplication<FiCaSchedulerApp> application =
|
||||||
applications.get(appAttemptId.getApplicationId());
|
applications.get(appAttemptId.getApplicationId());
|
||||||
String user = application.getUser();
|
String user = application.getUser();
|
||||||
@ -389,14 +395,15 @@ public synchronized void addApplication(ApplicationId applicationId,
|
|||||||
metrics.submitAppAttempt(user);
|
metrics.submitAppAttempt(user);
|
||||||
LOG.info("Added Application Attempt " + appAttemptId
|
LOG.info("Added Application Attempt " + appAttemptId
|
||||||
+ " to scheduler from user " + application.getUser());
|
+ " to scheduler from user " + application.getUser());
|
||||||
if (shouldNotifyAttemptAdded) {
|
if (isAttemptRecovering) {
|
||||||
rmContext.getDispatcher().getEventHandler().handle(
|
|
||||||
new RMAppAttemptEvent(appAttemptId,
|
|
||||||
RMAppAttemptEventType.ATTEMPT_ADDED));
|
|
||||||
} else {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Skipping notifying ATTEMPT_ADDED");
|
LOG.debug(appAttemptId
|
||||||
|
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMAppAttemptEvent(appAttemptId,
|
||||||
|
RMAppAttemptEventType.ATTEMPT_ADDED));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -772,7 +779,8 @@ public void handle(SchedulerEvent event) {
|
|||||||
{
|
{
|
||||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
||||||
addApplication(appAddedEvent.getApplicationId(),
|
addApplication(appAddedEvent.getApplicationId(),
|
||||||
appAddedEvent.getQueue(), appAddedEvent.getUser());
|
appAddedEvent.getQueue(), appAddedEvent.getUser(),
|
||||||
|
appAddedEvent.getIsAppRecovering());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case APP_REMOVED:
|
case APP_REMOVED:
|
||||||
@ -788,7 +796,7 @@ public void handle(SchedulerEvent event) {
|
|||||||
(AppAttemptAddedSchedulerEvent) event;
|
(AppAttemptAddedSchedulerEvent) event;
|
||||||
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||||
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
|
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
|
||||||
appAttemptAddedEvent.getShouldNotifyAttemptAdded());
|
appAttemptAddedEvent.getIsAttemptRecovering());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case APP_ATTEMPT_REMOVED:
|
case APP_ATTEMPT_REMOVED:
|
||||||
|
@ -228,7 +228,7 @@ public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
|
|||||||
scheduler.handle(new NodeAddedSchedulerEvent(node));
|
scheduler.handle(new NodeAddedSchedulerEvent(node));
|
||||||
|
|
||||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
scheduler.addApplication(appId, "queue1", "user1");
|
scheduler.addApplication(appId, "queue1", "user1", true);
|
||||||
|
|
||||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||||
try {
|
try {
|
||||||
|
@ -610,6 +610,36 @@ public void testAMContainerStatusWithRMRestart() throws Exception {
|
|||||||
attempt0.getMasterContainer().getId()).isAMContainer());
|
attempt0.getMasterContainer().getId()).isAMContainer());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 20000)
|
||||||
|
public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception {
|
||||||
|
// start RM
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
|
||||||
|
// create app and launch the AM
|
||||||
|
RMApp app0 = rm1.submitApp(200);
|
||||||
|
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
|
||||||
|
|
||||||
|
rm2 = new MockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
// scheduler app/attempt is immediately available after RM is re-started.
|
||||||
|
Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo(
|
||||||
|
am0.getApplicationAttemptId()));
|
||||||
|
|
||||||
|
// getTransferredContainers should not throw NPE.
|
||||||
|
((AbstractYarnScheduler) rm2.getResourceScheduler())
|
||||||
|
.getTransferredContainers(am0.getApplicationAttemptId());
|
||||||
|
|
||||||
|
List<NMContainerStatus> containers = createNMContainerStatusForApp(am0);
|
||||||
|
nm1.registerNode(containers, null);
|
||||||
|
waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
|
||||||
|
}
|
||||||
|
|
||||||
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
|
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
|
||||||
int appsPending, int appsRunning, int appsCompleted,
|
int appsPending, int appsRunning, int appsCompleted,
|
||||||
|
@ -147,7 +147,7 @@ protected ApplicationAttemptId createSchedulingRequest(
|
|||||||
int memory, int vcores, String queueId, String userId, int numContainers,
|
int memory, int vcores, String queueId, String userId, int numContainers,
|
||||||
int priority) {
|
int priority) {
|
||||||
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
scheduler.addApplication(id.getApplicationId(), queueId, userId);
|
scheduler.addApplication(id.getApplicationId(), queueId, userId, true);
|
||||||
// This conditional is for testAclSubmitApplication where app is rejected
|
// This conditional is for testAclSubmitApplication where app is rejected
|
||||||
// and no app is added.
|
// and no app is added.
|
||||||
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
|
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
|
||||||
|
@ -793,13 +793,13 @@ public void testQueueDemandCalculation() throws Exception {
|
|||||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
||||||
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
|
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", true);
|
||||||
scheduler.addApplicationAttempt(id11, false, true);
|
scheduler.addApplicationAttempt(id11, false, true);
|
||||||
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
|
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
|
||||||
scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
|
scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", true);
|
||||||
scheduler.addApplicationAttempt(id21, false, true);
|
scheduler.addApplicationAttempt(id21, false, true);
|
||||||
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
|
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
|
||||||
scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
|
scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", true);
|
||||||
scheduler.addApplicationAttempt(id22, false, true);
|
scheduler.addApplicationAttempt(id22, false, true);
|
||||||
|
|
||||||
int minReqSize =
|
int minReqSize =
|
||||||
@ -1561,7 +1561,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
|
|||||||
scheduler.handle(nodeEvent2);
|
scheduler.handle(nodeEvent2);
|
||||||
|
|
||||||
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
|
scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", true);
|
||||||
scheduler.addApplicationAttempt(appId, false, true);
|
scheduler.addApplicationAttempt(appId, false, true);
|
||||||
|
|
||||||
// 1 request with 2 nodes on the same rack. another request with 1 node on
|
// 1 request with 2 nodes on the same rack. another request with 1 node on
|
||||||
@ -1843,7 +1843,7 @@ public void testNotAllowSubmitApplication() throws Exception {
|
|||||||
|
|
||||||
ApplicationAttemptId attId =
|
ApplicationAttemptId attId =
|
||||||
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
|
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
|
||||||
scheduler.addApplication(attId.getApplicationId(), queue, user);
|
scheduler.addApplication(attId.getApplicationId(), queue, user, true);
|
||||||
|
|
||||||
numTries = 0;
|
numTries = 0;
|
||||||
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
|
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
|
||||||
@ -2720,7 +2720,7 @@ public void testContinuousScheduling() throws Exception {
|
|||||||
// send application request
|
// send application request
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
|
fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", true);
|
||||||
fs.addApplicationAttempt(appAttemptId, false, true);
|
fs.addApplicationAttempt(appAttemptId, false, true);
|
||||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
ResourceRequest request =
|
ResourceRequest request =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user