YARN-4968. A couple of AM retry unit tests need to wait SchedulerApplicationAttempt stopped. (Wangda Tan via gtcarrera9)
This commit is contained in:
parent
a749ba0cea
commit
7c6339f66a
|
@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
|
@ -2328,6 +2329,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
// start RM
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
@ -2338,10 +2341,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1,
|
||||
ContainerState.COMPLETE);
|
||||
am0.waitForState(RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(cs,
|
||||
am0.getApplicationAttemptId());
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||
am0.registerAppAttempt();
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
// get scheduler app
|
||||
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
|
||||
.get(app0.getApplicationId()).getCurrentAppAttempt();
|
||||
|
@ -2349,6 +2354,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(
|
||||
app0.getCurrentAppAttempt().getMasterContainer().getId()));
|
||||
am0.waitForState(RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(cs,
|
||||
am0.getApplicationAttemptId());
|
||||
}
|
||||
am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||
am0.registerAppAttempt();
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
|
@ -568,6 +569,9 @@ public class TestAMRestart {
|
|||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
||||
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||
am1.getApplicationAttemptId());
|
||||
|
||||
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
ApplicationStateData appState =
|
||||
|
@ -584,6 +588,9 @@ public class TestAMRestart {
|
|||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
|
||||
|
||||
am2.waitForState(RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||
am2.getApplicationAttemptId());
|
||||
|
||||
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
MockAM am3 =
|
||||
|
@ -604,6 +611,9 @@ public class TestAMRestart {
|
|||
nm1.nodeHeartbeat(conts, true);
|
||||
|
||||
am3.waitForState(RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||
am3.getApplicationAttemptId());
|
||||
|
||||
Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry());
|
||||
Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
|
||||
appState.getAttempt(am3.getApplicationAttemptId())
|
||||
|
@ -623,6 +633,9 @@ public class TestAMRestart {
|
|||
// This will mimic ContainerExitStatus.ABORT
|
||||
nm1.nodeHeartbeat(false);
|
||||
am4.waitForState(RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||
am4.getApplicationAttemptId());
|
||||
|
||||
Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry());
|
||||
Assert.assertEquals(ContainerExitStatus.ABORTED,
|
||||
appState.getAttempt(am4.getApplicationAttemptId())
|
||||
|
@ -637,6 +650,9 @@ public class TestAMRestart {
|
|||
nm2
|
||||
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am5.waitForState(RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||
am5.getApplicationAttemptId());
|
||||
|
||||
Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
|
||||
|
||||
// AM should not be restarted.
|
||||
|
|
|
@ -75,6 +75,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
|
@ -775,6 +777,27 @@ public class TestSchedulerUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static void waitSchedulerApplicationAttemptStopped(CapacityScheduler cs,
|
||||
ApplicationAttemptId attemptId) throws InterruptedException {
|
||||
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(attemptId);
|
||||
if (null == schedulerApp) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait at most 5 secs to make sure SchedulerApplicationAttempt stopped
|
||||
int tick = 0;
|
||||
while (tick < 100) {
|
||||
if (schedulerApp.isStopped()) {
|
||||
return;
|
||||
}
|
||||
tick++;
|
||||
Thread.sleep(50);
|
||||
}
|
||||
|
||||
// Only print, don't throw exception
|
||||
System.err.println("Failed to wait scheduler application attempt stopped.");
|
||||
}
|
||||
|
||||
public static SchedulerApplication<SchedulerApplicationAttempt>
|
||||
verifyAppAddedAndRemovedFromScheduler(
|
||||
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
|
||||
|
|
Loading…
Reference in New Issue