YARN-4968. A couple of AM retry unit tests need to wait SchedulerApplicationAttempt stopped. (Wangda Tan via gtcarrera9)
(cherry picked from commit 7c6339f66a
)
This commit is contained in:
parent
f98f4151ac
commit
054fa104c5
|
@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
@ -2328,6 +2329,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
|
@ -2338,10 +2341,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1,
|
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1,
|
||||||
ContainerState.COMPLETE);
|
ContainerState.COMPLETE);
|
||||||
am0.waitForState(RMAppAttemptState.FAILED);
|
am0.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(cs,
|
||||||
|
am0.getApplicationAttemptId());
|
||||||
|
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
am0 = MockRM.launchAM(app0, rm1, nm1);
|
am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||||
am0.registerAppAttempt();
|
am0.registerAppAttempt();
|
||||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
||||||
// get scheduler app
|
// get scheduler app
|
||||||
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
|
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
|
||||||
.get(app0.getApplicationId()).getCurrentAppAttempt();
|
.get(app0.getApplicationId()).getCurrentAppAttempt();
|
||||||
|
@ -2349,6 +2354,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(
|
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(
|
||||||
app0.getCurrentAppAttempt().getMasterContainer().getId()));
|
app0.getCurrentAppAttempt().getMasterContainer().getId()));
|
||||||
am0.waitForState(RMAppAttemptState.FAILED);
|
am0.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(cs,
|
||||||
|
am0.getApplicationAttemptId());
|
||||||
}
|
}
|
||||||
am0 = MockRM.launchAM(app0, rm1, nm1);
|
am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||||
am0.registerAppAttempt();
|
am0.registerAppAttempt();
|
||||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
@ -568,6 +569,9 @@ public class TestAMRestart {
|
||||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
||||||
|
|
||||||
am1.waitForState(RMAppAttemptState.FAILED);
|
am1.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
|
am1.getApplicationAttemptId());
|
||||||
|
|
||||||
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
|
||||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
ApplicationStateData appState =
|
ApplicationStateData appState =
|
||||||
|
@ -584,6 +588,9 @@ public class TestAMRestart {
|
||||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
|
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
|
||||||
|
|
||||||
am2.waitForState(RMAppAttemptState.FAILED);
|
am2.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
|
am2.getApplicationAttemptId());
|
||||||
|
|
||||||
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
|
||||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
MockAM am3 =
|
MockAM am3 =
|
||||||
|
@ -604,6 +611,9 @@ public class TestAMRestart {
|
||||||
nm1.nodeHeartbeat(conts, true);
|
nm1.nodeHeartbeat(conts, true);
|
||||||
|
|
||||||
am3.waitForState(RMAppAttemptState.FAILED);
|
am3.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
|
am3.getApplicationAttemptId());
|
||||||
|
|
||||||
Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry());
|
||||||
Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
|
Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
|
||||||
appState.getAttempt(am3.getApplicationAttemptId())
|
appState.getAttempt(am3.getApplicationAttemptId())
|
||||||
|
@ -623,6 +633,9 @@ public class TestAMRestart {
|
||||||
// This will mimic ContainerExitStatus.ABORT
|
// This will mimic ContainerExitStatus.ABORT
|
||||||
nm1.nodeHeartbeat(false);
|
nm1.nodeHeartbeat(false);
|
||||||
am4.waitForState(RMAppAttemptState.FAILED);
|
am4.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
|
am4.getApplicationAttemptId());
|
||||||
|
|
||||||
Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry());
|
||||||
Assert.assertEquals(ContainerExitStatus.ABORTED,
|
Assert.assertEquals(ContainerExitStatus.ABORTED,
|
||||||
appState.getAttempt(am4.getApplicationAttemptId())
|
appState.getAttempt(am4.getApplicationAttemptId())
|
||||||
|
@ -637,6 +650,9 @@ public class TestAMRestart {
|
||||||
nm2
|
nm2
|
||||||
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
am5.waitForState(RMAppAttemptState.FAILED);
|
am5.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
|
am5.getApplicationAttemptId());
|
||||||
|
|
||||||
Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
|
||||||
|
|
||||||
// AM should not be restarted.
|
// AM should not be restarted.
|
||||||
|
|
|
@ -75,6 +75,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
@ -775,6 +777,27 @@ public class TestSchedulerUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void waitSchedulerApplicationAttemptStopped(CapacityScheduler cs,
|
||||||
|
ApplicationAttemptId attemptId) throws InterruptedException {
|
||||||
|
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(attemptId);
|
||||||
|
if (null == schedulerApp) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait at most 5 secs to make sure SchedulerApplicationAttempt stopped
|
||||||
|
int tick = 0;
|
||||||
|
while (tick < 100) {
|
||||||
|
if (schedulerApp.isStopped()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
tick++;
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only print, don't throw exception
|
||||||
|
System.err.println("Failed to wait scheduler application attempt stopped.");
|
||||||
|
}
|
||||||
|
|
||||||
public static SchedulerApplication<SchedulerApplicationAttempt>
|
public static SchedulerApplication<SchedulerApplicationAttempt>
|
||||||
verifyAppAddedAndRemovedFromScheduler(
|
verifyAppAddedAndRemovedFromScheduler(
|
||||||
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
|
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
|
||||||
|
|
Loading…
Reference in New Issue