YARN-4907. Make all MockRM#waitForState consistent. (Contributed by Yufei Gu via Daniel Templeton)

(cherry picked from commit 155f06e0c1a5a15365fb620f3802dca8aa46c287)
This commit is contained in:
Daniel Templeton 2016-10-31 13:18:33 -07:00
parent 56092e351b
commit adceebc400
1 changed files with 19 additions and 25 deletions

View File

@ -272,9 +272,10 @@ public static void waitForState(RMAppAttempt attempt,
public void waitForContainerToComplete(RMAppAttempt attempt,
NMContainerStatus completedContainer) throws InterruptedException {
while (true) {
int timeWaiting = 0;
while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) {
List<ContainerStatus> containers = attempt.getJustFinishedContainers();
System.out.println("Received completed containers " + containers);
LOG.info("Received completed containers " + containers);
for (ContainerStatus container : containers) {
if (container.getContainerId().equals(
completedContainer.getContainerId())) {
@ -282,6 +283,7 @@ public void waitForContainerToComplete(RMAppAttempt attempt,
}
}
Thread.sleep(WAIT_MS_PER_LOOP);
timeWaiting += WAIT_MS_PER_LOOP;
}
}
@ -289,11 +291,16 @@ public MockAM waitForNewAMToLaunchAndRegister(ApplicationId appId, int attemptSi
MockNM nm) throws Exception {
RMApp app = getRMContext().getRMApps().get(appId);
Assert.assertNotNull(app);
int timeWaiting = 0;
while (app.getAppAttempts().size() != attemptSize) {
System.out.println("Application " + appId
if (timeWaiting >= TIMEOUT_MS_FOR_ATTEMPT) {
break;
}
LOG.info("Application " + appId
+ " is waiting for AM to restart. Current has "
+ app.getAppAttempts().size() + " attempts.");
Thread.sleep(WAIT_MS_PER_LOOP);
timeWaiting += WAIT_MS_PER_LOOP;
}
return launchAndRegisterAM(app, this, nm);
}
@ -375,7 +382,7 @@ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
nm.nodeHeartbeat(true);
}
container = getResourceScheduler().getRMContainer(containerId);
System.out.println("Waiting for container " + containerId + " to be "
LOG.info("Waiting for container " + containerId + " to be "
+ containerState + ", container is null right now.");
Thread.sleep(WAIT_MS_PER_LOOP);
timeWaiting += WAIT_MS_PER_LOOP;
@ -386,7 +393,7 @@ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
return false;
}
System.out.println("Container : " + containerId + " State is : "
LOG.info("Container : " + containerId + " State is : "
+ container.getState() + " Waiting for state : " + containerState);
for (MockNM nm : nms) {
nm.nodeHeartbeat(true);
@ -395,7 +402,7 @@ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
timeWaiting += WAIT_MS_PER_LOOP;
}
System.out.println("Container State is : " + container.getState());
LOG.info("Container State is : " + container.getState());
return true;
}
@ -724,13 +731,13 @@ public void waitForState(NodeId nodeId, NodeState finalState)
break;
}
System.out.println("Node State is : " + node.getState()
LOG.info("Node State is : " + node.getState()
+ " Waiting for state : " + finalState);
Thread.sleep(WAIT_MS_PER_LOOP);
timeWaiting += WAIT_MS_PER_LOOP;
}
System.out.println("Node " + nodeId + " State is : " + node.getState());
LOG.info("Node " + nodeId + " State is : " + node.getState());
Assert.assertEquals("Node state is not correct (timedout)", finalState,
node.getState());
}
@ -949,7 +956,7 @@ private static void waitForSchedulerAppAttemptAdded(
.getApplicationAttempt(attemptId) && tick < 50) {
Thread.sleep(100);
if (tick % 10 == 0) {
System.out.println("waiting for SchedulerApplicationAttempt="
LOG.info("waiting for SchedulerApplicationAttempt="
+ attemptId + " added.");
}
tick++;
@ -966,7 +973,7 @@ private static void waitForSchedulerAppAttemptAdded(
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
System.out.println("Launch AM " + attempt.getAppAttemptId());
LOG.info("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
@ -979,7 +986,7 @@ public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm)
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt();
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
System.out.println("Launch AM " + attempt.getAppAttemptId());
LOG.info("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true);
MockAM am = new MockAM(rm.getRMContext(), rm.masterService,
attempt.getAppAttemptId());
@ -1039,7 +1046,6 @@ public void signalToContainer(ContainerId containerId,
client.signalToContainer(req);
}
/**
* Wait until an app removed from scheduler.
* The timeout is 40 seconds.
@ -1049,25 +1055,13 @@ public void signalToContainer(ContainerId containerId,
*/
public void waitForAppRemovedFromScheduler(ApplicationId appId)
throws InterruptedException {
waitForAppRemovedFromScheduler(appId, TIMEOUT_MS_FOR_APP_REMOVED);
}
/**
* Wait until an app is removed from scheduler.
* @param appId the id of an app
* @param timeoutMsecs the length of timeout in milliseconds
* @throws InterruptedException
* if interrupted while waiting for app removed
*/
public void waitForAppRemovedFromScheduler(ApplicationId appId,
long timeoutMsecs) throws InterruptedException {
int timeWaiting = 0;
Map<ApplicationId, SchedulerApplication> apps =
((AbstractYarnScheduler) getResourceScheduler())
.getSchedulerApplications();
while (apps.containsKey(appId)) {
if (timeWaiting >= timeoutMsecs) {
if (timeWaiting >= TIMEOUT_MS_FOR_APP_REMOVED) {
break;
}
LOG.info("wait for app removed, " + appId);