YARN-5416. TestRMRestart#testRMRestartWaitForPreviousAMToFinish failed intermittently due to not wait SchedulerApplicationAttempt to be stopped. Contributed by Junping Du
This commit is contained in:
parent
a5ec1e31a3
commit
357eab9566
|
@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
@ -510,10 +511,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
// start RM
|
// start RM
|
||||||
final MockRM rm1 = createMockRM(conf, memStore);
|
final MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
AbstractYarnScheduler ys =
|
||||||
|
(AbstractYarnScheduler)rm1.getResourceScheduler();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
|
|
||||||
// submitting app
|
// submitting app
|
||||||
RMApp app1 = rm1.submitApp(200);
|
RMApp app1 = rm1.submitApp(200);
|
||||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
@ -521,10 +524,11 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
// Fail first AM.
|
// Fail first AM.
|
||||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(ys,
|
||||||
|
am1.getApplicationAttemptId());
|
||||||
// launch another AM.
|
// launch another AM.
|
||||||
MockAM am2 = launchAM(app1, rm1, nm1);
|
MockAM am2 = launchAM(app1, rm1, nm1);
|
||||||
|
|
||||||
Assert.assertEquals(1, rmAppState.size());
|
Assert.assertEquals(1, rmAppState.size());
|
||||||
Assert.assertEquals(app1.getState(), RMAppState.RUNNING);
|
Assert.assertEquals(app1.getState(), RMAppState.RUNNING);
|
||||||
Assert.assertEquals(app1.getAppAttempts()
|
Assert.assertEquals(app1.getAppAttempts()
|
||||||
|
@ -562,6 +566,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
nm1.registerNode(Arrays.asList(status), null);
|
nm1.registerNode(Arrays.asList(status), null);
|
||||||
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
ys = (AbstractYarnScheduler) rm2.getResourceScheduler();
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(ys,
|
||||||
|
am2.getApplicationAttemptId());
|
||||||
|
|
||||||
launchAM(rmApp, rm2, nm1);
|
launchAM(rmApp, rm2, nm1);
|
||||||
Assert.assertEquals(3, rmApp.getAppAttempts().size());
|
Assert.assertEquals(3, rmApp.getAppAttempts().size());
|
||||||
rm2.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
|
rm2.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
|
|
@ -75,8 +75,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
@ -777,9 +775,11 @@ public class TestSchedulerUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void waitSchedulerApplicationAttemptStopped(CapacityScheduler cs,
|
public static void waitSchedulerApplicationAttemptStopped(
|
||||||
|
AbstractYarnScheduler ys,
|
||||||
ApplicationAttemptId attemptId) throws InterruptedException {
|
ApplicationAttemptId attemptId) throws InterruptedException {
|
||||||
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(attemptId);
|
SchedulerApplicationAttempt schedulerApp =
|
||||||
|
ys.getApplicationAttempt(attemptId);
|
||||||
if (null == schedulerApp) {
|
if (null == schedulerApp) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue