YARN-5136. Error in handling event type APP_ATTEMPT_REMOVED to the scheduler
(Contributed by Wilfred Spiegelenburg via Daniel Templeton)
This commit is contained in:
parent
ab923a53fc
commit
9f5d2c4fff
|
@ -633,8 +633,7 @@ public class FairScheduler extends
|
||||||
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
|
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
LOG.info(
|
LOG.info("Application " + applicationAttemptId + " is done. finalState="
|
||||||
"Application " + applicationAttemptId + " is done." + " finalState="
|
|
||||||
+ rmAppAttemptFinalState);
|
+ rmAppAttemptFinalState);
|
||||||
FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId);
|
FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId);
|
||||||
|
|
||||||
|
@ -644,6 +643,13 @@ public class FairScheduler extends
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if the attempt is already stopped and don't stop it twice.
|
||||||
|
if (attempt.isStopped()) {
|
||||||
|
LOG.info("Application " + applicationAttemptId + " has already been "
|
||||||
|
+ "stopped!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Release all the running containers
|
// Release all the running containers
|
||||||
for (RMContainer rmContainer : attempt.getLiveContainers()) {
|
for (RMContainer rmContainer : attempt.getLiveContainers()) {
|
||||||
if (keepContainers && rmContainer.getState().equals(
|
if (keepContainers && rmContainer.getState().equals(
|
||||||
|
@ -1521,6 +1527,13 @@ public class FairScheduler extends
|
||||||
try {
|
try {
|
||||||
attempt.getWriteLock().lock();
|
attempt.getWriteLock().lock();
|
||||||
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
|
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
|
||||||
|
// Check if the attempt is already stopped: don't move stopped app
|
||||||
|
// attempt. The attempt has already been removed from all queues.
|
||||||
|
if (attempt.isStopped()) {
|
||||||
|
LOG.info("Application " + appId + " is stopped and can't be moved!");
|
||||||
|
throw new YarnException("Application " + appId
|
||||||
|
+ " is stopped and can't be moved!");
|
||||||
|
}
|
||||||
String destQueueName = handleMoveToPlanQueue(queueName);
|
String destQueueName = handleMoveToPlanQueue(queueName);
|
||||||
FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
|
FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
|
||||||
if (targetQueue == null) {
|
if (targetQueue == null) {
|
||||||
|
@ -1617,16 +1630,23 @@ public class FairScheduler extends
|
||||||
* operations will be atomic.
|
* operations will be atomic.
|
||||||
*/
|
*/
|
||||||
private void executeMove(SchedulerApplication<FSAppAttempt> app,
|
private void executeMove(SchedulerApplication<FSAppAttempt> app,
|
||||||
FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) {
|
FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue)
|
||||||
boolean wasRunnable = oldQueue.removeApp(attempt);
|
throws YarnException {
|
||||||
|
// Check current runs state. Do not remove the attempt from the queue until
|
||||||
|
// after the check has been performed otherwise it could remove the app
|
||||||
|
// from a queue without moving it to a new queue.
|
||||||
|
boolean wasRunnable = oldQueue.isRunnableApp(attempt);
|
||||||
// if app was not runnable before, it may be runnable now
|
// if app was not runnable before, it may be runnable now
|
||||||
boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
|
boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
|
||||||
attempt);
|
attempt);
|
||||||
if (wasRunnable && !nowRunnable) {
|
if (wasRunnable && !nowRunnable) {
|
||||||
throw new IllegalStateException("Should have already verified that app "
|
throw new YarnException("Should have already verified that app "
|
||||||
+ attempt.getApplicationId() + " would be runnable in new queue");
|
+ attempt.getApplicationId() + " would be runnable in new queue");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Now it is safe to remove from the queue.
|
||||||
|
oldQueue.removeApp(attempt);
|
||||||
|
|
||||||
if (wasRunnable) {
|
if (wasRunnable) {
|
||||||
maxRunningEnforcer.untrackRunnableApp(attempt);
|
maxRunningEnforcer.untrackRunnableApp(attempt);
|
||||||
} else if (nowRunnable) {
|
} else if (nowRunnable) {
|
||||||
|
|
|
@ -4551,6 +4551,95 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
assertEquals(ancestorQueue, queue1);
|
assertEquals(ancestorQueue, queue1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDoubleRemoval() throws Exception {
|
||||||
|
String testUser = "user1"; // convenience var
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
|
||||||
|
// The placement rule will add the app to the user based queue but the
|
||||||
|
// passed in queue must exist.
|
||||||
|
AppAddedSchedulerEvent appAddedEvent =
|
||||||
|
new AppAddedSchedulerEvent(attemptId.getApplicationId(), testUser,
|
||||||
|
testUser);
|
||||||
|
scheduler.handle(appAddedEvent);
|
||||||
|
AppAttemptAddedSchedulerEvent attemptAddedEvent =
|
||||||
|
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
|
||||||
|
scheduler.handle(attemptAddedEvent);
|
||||||
|
|
||||||
|
// Get a handle on the attempt.
|
||||||
|
FSAppAttempt attempt = scheduler.getSchedulerApp(attemptId);
|
||||||
|
|
||||||
|
AppAttemptRemovedSchedulerEvent attemptRemovedEvent =
|
||||||
|
new AppAttemptRemovedSchedulerEvent(createAppAttemptId(1, 1),
|
||||||
|
RMAppAttemptState.FINISHED, false);
|
||||||
|
|
||||||
|
// Make sure the app attempt is in the queue.
|
||||||
|
List<ApplicationAttemptId> attemptList =
|
||||||
|
scheduler.getAppsInQueue(testUser);
|
||||||
|
assertNotNull("Queue missing", attemptList);
|
||||||
|
assertTrue("Attempt should be in the queue",
|
||||||
|
attemptList.contains(attemptId));
|
||||||
|
assertFalse("Attempt is stopped", attempt.isStopped());
|
||||||
|
|
||||||
|
// Now remove the app attempt
|
||||||
|
scheduler.handle(attemptRemovedEvent);
|
||||||
|
// The attempt is not in the queue, and stopped
|
||||||
|
attemptList = scheduler.getAppsInQueue(testUser);
|
||||||
|
assertFalse("Attempt should not be in the queue",
|
||||||
|
attemptList.contains(attemptId));
|
||||||
|
assertTrue("Attempt should have been stopped", attempt.isStopped());
|
||||||
|
|
||||||
|
// Now remove the app attempt again, since it is stopped nothing happens.
|
||||||
|
scheduler.handle(attemptRemovedEvent);
|
||||||
|
// The attempt should still show the original queue info.
|
||||||
|
assertTrue("Attempt queue has changed",
|
||||||
|
attempt.getQueue().getName().endsWith(testUser));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (expected = YarnException.class)
|
||||||
|
public void testMoveAfterRemoval() throws Exception {
|
||||||
|
String testUser = "user1"; // convenience var
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
|
||||||
|
AppAddedSchedulerEvent appAddedEvent =
|
||||||
|
new AppAddedSchedulerEvent(attemptId.getApplicationId(), testUser,
|
||||||
|
testUser);
|
||||||
|
scheduler.handle(appAddedEvent);
|
||||||
|
AppAttemptAddedSchedulerEvent attemptAddedEvent =
|
||||||
|
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
|
||||||
|
scheduler.handle(attemptAddedEvent);
|
||||||
|
|
||||||
|
// Get a handle on the attempt.
|
||||||
|
FSAppAttempt attempt = scheduler.getSchedulerApp(attemptId);
|
||||||
|
|
||||||
|
AppAttemptRemovedSchedulerEvent attemptRemovedEvent =
|
||||||
|
new AppAttemptRemovedSchedulerEvent(createAppAttemptId(1, 1),
|
||||||
|
RMAppAttemptState.FINISHED, false);
|
||||||
|
|
||||||
|
// Remove the app attempt
|
||||||
|
scheduler.handle(attemptRemovedEvent);
|
||||||
|
// Make sure the app attempt is not in the queue and stopped.
|
||||||
|
List<ApplicationAttemptId> attemptList =
|
||||||
|
scheduler.getAppsInQueue(testUser);
|
||||||
|
assertNotNull("Queue missing", attemptList);
|
||||||
|
assertFalse("Attempt should not be in the queue",
|
||||||
|
attemptList.contains(attemptId));
|
||||||
|
assertTrue("Attempt should have been stopped", attempt.isStopped());
|
||||||
|
// The attempt should still show the original queue info.
|
||||||
|
assertTrue("Attempt queue has changed",
|
||||||
|
attempt.getQueue().getName().endsWith(testUser));
|
||||||
|
|
||||||
|
// Now move the app: not using an event since there is none
|
||||||
|
// in the scheduler. This should throw.
|
||||||
|
scheduler.moveApplication(attemptId.getApplicationId(), "default");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPerfMetricsInited() {
|
public void testPerfMetricsInited() {
|
||||||
scheduler.init(conf);
|
scheduler.init(conf);
|
||||||
|
|
Loading…
Reference in New Issue