YARN-4807. MockAM#waitForState sleep duration is too long. (Yufei Gu via kasha)
This commit is contained in:
parent
7b1c37a13a
commit
185c3d4de1
|
@ -74,31 +74,18 @@ public class MockAM {
|
|||
this.amRMProtocol = amRMProtocol;
|
||||
}
|
||||
|
||||
public void waitForState(RMAppAttemptState finalState) throws Exception {
|
||||
/**
|
||||
* Wait until an attempt has reached a specified state.
|
||||
* The timeout is 40 seconds.
|
||||
* @param finalState the attempt state waited
|
||||
* @throws InterruptedException
|
||||
* if interrupted while waiting for the state transition
|
||||
*/
|
||||
private void waitForState(RMAppAttemptState finalState)
|
||||
throws InterruptedException {
|
||||
RMApp app = context.getRMApps().get(attemptId.getApplicationId());
|
||||
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
||||
final int timeoutMsecs = 40000;
|
||||
final int minWaitMsecs = 1000;
|
||||
final int waitMsPerLoop = 500;
|
||||
int loop = 0;
|
||||
while (!finalState.equals(attempt.getAppAttemptState())
|
||||
&& waitMsPerLoop * loop < timeoutMsecs) {
|
||||
LOG.info("AppAttempt : " + attemptId + " State is : " +
|
||||
attempt.getAppAttemptState() + " Waiting for state : " +
|
||||
finalState);
|
||||
Thread.yield();
|
||||
Thread.sleep(waitMsPerLoop);
|
||||
loop++;
|
||||
}
|
||||
int waitedMsecs = waitMsPerLoop * loop;
|
||||
if (minWaitMsecs > waitedMsecs) {
|
||||
Thread.sleep(minWaitMsecs - waitedMsecs);
|
||||
}
|
||||
LOG.info("Attempt State is : " + attempt.getAppAttemptState());
|
||||
if (waitedMsecs >= timeoutMsecs) {
|
||||
Assert.fail("Attempt state is not correct (timedout): expected: "
|
||||
+ finalState + " actual: " + attempt.getAppAttemptState());
|
||||
}
|
||||
MockRM.waitForState(attempt, finalState);
|
||||
}
|
||||
|
||||
public RegisterApplicationMasterResponse registerAppAttempt()
|
||||
|
|
|
@ -107,8 +107,12 @@ public class MockRM extends ResourceManager {
|
|||
|
||||
static final Logger LOG = Logger.getLogger(MockRM.class);
|
||||
static final String ENABLE_WEBAPP = "mockrm.webapp.enabled";
|
||||
private static final int SECOND = 1000;
|
||||
private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND;
|
||||
private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 10 * SECOND;
|
||||
private static final int WAIT_MS_PER_LOOP = 10;
|
||||
|
||||
final private boolean useNullRMNodeLabelsManager;
|
||||
private final boolean useNullRMNodeLabelsManager;
|
||||
|
||||
public MockRM() {
|
||||
this(new YarnConfiguration());
|
||||
|
@ -160,106 +164,107 @@ public class MockRM extends ResourceManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until an application has reached a specified state.
|
||||
* The timeout is 80 seconds.
|
||||
* @param appId the id of an application
|
||||
* @param finalState the application state waited
|
||||
* @throws InterruptedException
|
||||
* if interrupted while waiting for the state transition
|
||||
*/
|
||||
public void waitForState(ApplicationId appId, RMAppState finalState)
|
||||
throws Exception {
|
||||
throws InterruptedException {
|
||||
RMApp app = getRMContext().getRMApps().get(appId);
|
||||
Assert.assertNotNull("app shouldn't be null", app);
|
||||
final int timeoutMsecs = 80000;
|
||||
final int waitMsPerLoop = 500;
|
||||
int loop = 0;
|
||||
while (!finalState.equals(app.getState()) &&
|
||||
((waitMsPerLoop * loop) < timeoutMsecs)) {
|
||||
final int timeoutMsecs = 80 * SECOND;
|
||||
int timeWaiting = 0;
|
||||
while (!finalState.equals(app.getState())) {
|
||||
if (timeWaiting >= timeoutMsecs) {
|
||||
break;
|
||||
}
|
||||
|
||||
LOG.info("App : " + appId + " State is : " + app.getState() +
|
||||
" Waiting for state : " + finalState);
|
||||
Thread.yield();
|
||||
Thread.sleep(waitMsPerLoop);
|
||||
loop++;
|
||||
" Waiting for state : " + finalState);
|
||||
Thread.sleep(WAIT_MS_PER_LOOP);
|
||||
timeWaiting += WAIT_MS_PER_LOOP;
|
||||
}
|
||||
int waitedMsecs = waitMsPerLoop * loop;
|
||||
|
||||
LOG.info("App State is : " + app.getState());
|
||||
if (waitedMsecs >= timeoutMsecs) {
|
||||
Assert.fail("App state is not correct (timedout): expected: " +
|
||||
finalState + " actual: " + app.getState() +
|
||||
" for the application " + appId);
|
||||
}
|
||||
Assert.assertEquals("App State is not correct (timeout).", finalState,
|
||||
app.getState());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until an attempt has reached a specified state.
|
||||
* The timeout is 40 seconds.
|
||||
* @param attemptId the id of an attempt
|
||||
* @param finalState the attempt state waited
|
||||
* @throws InterruptedException
|
||||
* if interrupted while waiting for the state transition
|
||||
*/
|
||||
public void waitForState(ApplicationAttemptId attemptId,
|
||||
RMAppAttemptState finalState)
|
||||
throws Exception {
|
||||
waitForState(attemptId, finalState, 40000);
|
||||
RMAppAttemptState finalState) throws InterruptedException {
|
||||
waitForState(attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until an attempt has reached a specified state.
|
||||
* The timeout can be specified by the parameter.
|
||||
* @param attemptId the id of an attempt
|
||||
* @param finalState the attempt state waited
|
||||
* @param timeoutMsecs the length of timeout in milliseconds
|
||||
* @throws InterruptedException
|
||||
* if interrupted while waiting for the state transition
|
||||
*/
|
||||
public void waitForState(ApplicationAttemptId attemptId,
|
||||
RMAppAttemptState finalState, int timeoutMsecs) throws Exception {
|
||||
RMAppAttemptState finalState, int timeoutMsecs)
|
||||
throws InterruptedException {
|
||||
RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
|
||||
Assert.assertNotNull("app shouldn't be null", app);
|
||||
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
||||
final int minWaitMsecs = 1000;
|
||||
final int waitMsPerLoop = 10;
|
||||
int loop = 0;
|
||||
while (!finalState.equals(attempt.getAppAttemptState())
|
||||
&& waitMsPerLoop * loop < timeoutMsecs) {
|
||||
LOG.info("AppAttempt : " + attemptId + " State is : " +
|
||||
MockRM.waitForState(attempt, finalState, timeoutMsecs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until an attempt has reached a specified state.
|
||||
* The timeout is 40 seconds.
|
||||
* @param attempt an attempt
|
||||
* @param finalState the attempt state waited
|
||||
* @throws InterruptedException
|
||||
* if interrupted while waiting for the state transition
|
||||
*/
|
||||
public static void waitForState(RMAppAttempt attempt,
|
||||
RMAppAttemptState finalState) throws InterruptedException {
|
||||
waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until an attempt has reached a specified state.
|
||||
* The timeout can be specified by the parameter.
|
||||
* @param attempt an attempt
|
||||
* @param finalState the attempt state waited
|
||||
* @param timeoutMsecs the length of timeout in milliseconds
|
||||
* @throws InterruptedException
|
||||
* if interrupted while waiting for the state transition
|
||||
*/
|
||||
public static void waitForState(RMAppAttempt attempt,
|
||||
RMAppAttemptState finalState, int timeoutMsecs)
|
||||
throws InterruptedException {
|
||||
int timeWaiting = 0;
|
||||
while (!finalState.equals(attempt.getAppAttemptState())) {
|
||||
if (timeWaiting >= timeoutMsecs) {
|
||||
break;
|
||||
}
|
||||
|
||||
LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " +
|
||||
attempt.getAppAttemptState() + " Waiting for state : " + finalState);
|
||||
Thread.yield();
|
||||
Thread.sleep(waitMsPerLoop);
|
||||
loop++;
|
||||
}
|
||||
int waitedMsecs = waitMsPerLoop * loop;
|
||||
if (minWaitMsecs > waitedMsecs) {
|
||||
Thread.sleep(minWaitMsecs - waitedMsecs);
|
||||
Thread.sleep(WAIT_MS_PER_LOOP);
|
||||
timeWaiting += WAIT_MS_PER_LOOP;
|
||||
}
|
||||
|
||||
LOG.info("Attempt State is : " + attempt.getAppAttemptState());
|
||||
if (waitedMsecs >= timeoutMsecs) {
|
||||
Assert.fail("Attempt state is not correct (timedout): expected: "
|
||||
+ finalState + " actual: " + attempt.getAppAttemptState()+
|
||||
" for the application attempt " + attemptId);
|
||||
}
|
||||
}
|
||||
|
||||
public void waitForContainerState(ContainerId containerId,
|
||||
RMContainerState state) throws Exception {
|
||||
// This method will assert if state is not expected after timeout.
|
||||
Assert.assertTrue(waitForContainerState(containerId, state, 8 * 1000));
|
||||
}
|
||||
|
||||
public boolean waitForContainerState(ContainerId containerId,
|
||||
RMContainerState containerState, int timeoutMillisecs) throws Exception {
|
||||
RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
||||
int timeoutSecs = 0;
|
||||
while (((container == null) || !containerState.equals(container.getState()))
|
||||
&& timeoutSecs++ < timeoutMillisecs / 100) {
|
||||
if(container == null){
|
||||
container = getResourceScheduler().getRMContainer(containerId);
|
||||
}
|
||||
System.out.println("Container : " + containerId +
|
||||
" Waiting for state : " + containerState);
|
||||
|
||||
Thread.sleep(100);
|
||||
|
||||
if (timeoutMillisecs <= timeoutSecs * 100) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("Container State is : " + container.getState());
|
||||
Assert.assertEquals("Container state is not correct (timedout)",
|
||||
containerState, container.getState());
|
||||
return true;
|
||||
}
|
||||
|
||||
public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
|
||||
throws Exception {
|
||||
int timeoutSecs = 0;
|
||||
while (getResourceScheduler().getRMContainer(containerId) == null
|
||||
&& timeoutSecs++ < 40) {
|
||||
System.out.println("Waiting for" + containerId + " to be allocated.");
|
||||
nm.nodeHeartbeat(true);
|
||||
Thread.sleep(200);
|
||||
}
|
||||
Assert.assertNotNull("Failed in waiting for " + containerId + " " +
|
||||
"allocation.", getResourceScheduler().getRMContainer(containerId));
|
||||
Assert.assertEquals("Attempt state is not correct (timeout).", finalState,
|
||||
attempt.getState());
|
||||
}
|
||||
|
||||
public void waitForContainerToComplete(RMAppAttempt attempt,
|
||||
|
@ -273,7 +278,7 @@ public class MockRM extends ResourceManager {
|
|||
return;
|
||||
}
|
||||
}
|
||||
Thread.sleep(200);
|
||||
Thread.sleep(WAIT_MS_PER_LOOP);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -285,58 +290,109 @@ public class MockRM extends ResourceManager {
|
|||
System.out.println("Application " + appId
|
||||
+ " is waiting for AM to restart. Current has "
|
||||
+ app.getAppAttempts().size() + " attempts.");
|
||||
Thread.sleep(200);
|
||||
Thread.sleep(WAIT_MS_PER_LOOP);
|
||||
}
|
||||
return launchAndRegisterAM(app, this, nm);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until a container has reached a specified state.
|
||||
* The timeout is 10 seconds.
|
||||
* @param nm A mock nodemanager
|
||||
* @param containerId the id of a container
|
||||
* @param containerState the container state waited
|
||||
* @return if reach the state before timeout; false otherwise.
|
||||
* @throws Exception
|
||||
* if interrupted while waiting for the state transition
|
||||
* or an unexpected error while MockNM is hearbeating.
|
||||
*/
|
||||
public boolean waitForState(MockNM nm, ContainerId containerId,
|
||||
RMContainerState containerState) throws Exception {
|
||||
// default is wait for 30,000 ms
|
||||
return waitForState(nm, containerId, containerState, 30 * 1000);
|
||||
return waitForState(nm, containerId, containerState,
|
||||
TIMEOUT_MS_FOR_CONTAINER_AND_NODE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until a container has reached a specified state.
|
||||
* The timeout is specified by the parameter.
|
||||
* @param nm A mock nodemanager
|
||||
* @param containerId the id of a container
|
||||
* @param containerState the container state waited
|
||||
* @param timeoutMsecs the length of timeout in milliseconds
|
||||
* @return if reach the state before timeout; false otherwise.
|
||||
* @throws Exception
|
||||
* if interrupted while waiting for the state transition
|
||||
* or an unexpected error while MockNM is hearbeating.
|
||||
*/
|
||||
public boolean waitForState(MockNM nm, ContainerId containerId,
|
||||
RMContainerState containerState, int timeoutMillisecs) throws Exception {
|
||||
RMContainerState containerState, int timeoutMsecs) throws Exception {
|
||||
return waitForState(Arrays.asList(nm), containerId, containerState,
|
||||
timeoutMillisecs);
|
||||
timeoutMsecs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until a container has reached a specified state.
|
||||
* The timeout is 10 seconds.
|
||||
* @param nms array of mock nodemanagers
|
||||
* @param containerId the id of a container
|
||||
* @param containerState the container state waited
|
||||
* @return if reach the state before timeout; false otherwise.
|
||||
* @throws Exception
|
||||
* if interrupted while waiting for the state transition
|
||||
* or an unexpected error while MockNM is hearbeating.
|
||||
*/
|
||||
public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
|
||||
RMContainerState containerState, int timeoutMillisecs) throws Exception {
|
||||
RMContainerState containerState) throws Exception {
|
||||
return waitForState(nms, containerId, containerState,
|
||||
TIMEOUT_MS_FOR_CONTAINER_AND_NODE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until a container has reached a specified state.
|
||||
* The timeout is specified by the parameter.
|
||||
* @param nms array of mock nodemanagers
|
||||
* @param containerId the id of a container
|
||||
* @param containerState the container state waited
|
||||
* @param timeoutMsecs the length of timeout in milliseconds
|
||||
* @return if reach the state before timeout; false otherwise.
|
||||
* @throws Exception
|
||||
* if interrupted while waiting for the state transition
|
||||
* or an unexpected error while MockNM is hearbeating.
|
||||
*/
|
||||
public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
|
||||
RMContainerState containerState, int timeoutMsecs) throws Exception {
|
||||
RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
||||
int timeoutSecs = 0;
|
||||
while(container == null && timeoutSecs++ < timeoutMillisecs / 100) {
|
||||
int timeWaiting = 0;
|
||||
while (container == null) {
|
||||
if (timeWaiting >= timeoutMsecs) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (MockNM nm : nms) {
|
||||
nm.nodeHeartbeat(true);
|
||||
}
|
||||
container = getResourceScheduler().getRMContainer(containerId);
|
||||
System.out.println("Waiting for container " + containerId + " to be "
|
||||
+ containerState + ", container is null right now.");
|
||||
Thread.sleep(100);
|
||||
Thread.sleep(WAIT_MS_PER_LOOP);
|
||||
timeWaiting += WAIT_MS_PER_LOOP;
|
||||
}
|
||||
|
||||
if (timeoutMillisecs <= timeoutSecs * 100) {
|
||||
while (!containerState.equals(container.getState())) {
|
||||
if (timeWaiting >= timeoutMsecs) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Assert.assertNotNull("Container shouldn't be null", container);
|
||||
while (!containerState.equals(container.getState())
|
||||
&& timeoutSecs++ < timeoutMillisecs / 100) {
|
||||
|
||||
System.out.println("Container : " + containerId + " State is : "
|
||||
+ container.getState() + " Waiting for state : " + containerState);
|
||||
for (MockNM nm : nms) {
|
||||
nm.nodeHeartbeat(true);
|
||||
}
|
||||
Thread.sleep(100);
|
||||
|
||||
if (timeoutMillisecs <= timeoutSecs * 100) {
|
||||
return false;
|
||||
}
|
||||
Thread.sleep(WAIT_MS_PER_LOOP);
|
||||
timeWaiting += WAIT_MS_PER_LOOP;
|
||||
}
|
||||
|
||||
System.out.println("Container State is : " + container.getState());
|
||||
Assert.assertEquals("Container state is not correct (timedout)",
|
||||
containerState, container.getState());
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -640,16 +696,30 @@ public class MockRM extends ResourceManager {
|
|||
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE));
|
||||
}
|
||||
|
||||
public void NMwaitForState(NodeId nodeid, NodeState finalState)
|
||||
throws Exception {
|
||||
RMNode node = getRMContext().getRMNodes().get(nodeid);
|
||||
/**
|
||||
* Wait until a node has reached a specified state.
|
||||
* The timeout is 10 seconds.
|
||||
* @param nodeId the id of a node
|
||||
* @param finalState the node state waited
|
||||
* @throws InterruptedException
|
||||
* if interrupted while waiting for the state transition
|
||||
*/
|
||||
public void waitForState(NodeId nodeId, NodeState finalState)
|
||||
throws InterruptedException {
|
||||
RMNode node = getRMContext().getRMNodes().get(nodeId);
|
||||
Assert.assertNotNull("node shouldn't be null", node);
|
||||
int timeoutSecs = 0;
|
||||
while (!finalState.equals(node.getState()) && timeoutSecs++ < 20) {
|
||||
int timeWaiting = 0;
|
||||
while (!finalState.equals(node.getState())) {
|
||||
if (timeWaiting >= TIMEOUT_MS_FOR_CONTAINER_AND_NODE) {
|
||||
break;
|
||||
}
|
||||
|
||||
System.out.println("Node State is : " + node.getState()
|
||||
+ " Waiting for state : " + finalState);
|
||||
Thread.sleep(500);
|
||||
Thread.sleep(WAIT_MS_PER_LOOP);
|
||||
timeWaiting += WAIT_MS_PER_LOOP;
|
||||
}
|
||||
|
||||
System.out.println("Node State is : " + node.getState());
|
||||
Assert.assertEquals("Node state is not correct (timedout)", finalState,
|
||||
node.getState());
|
||||
|
@ -673,7 +743,7 @@ public class MockRM extends ResourceManager {
|
|||
public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId)
|
||||
throws Exception {
|
||||
MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
|
||||
am.waitForState(RMAppAttemptState.ALLOCATED);
|
||||
waitForState(appAttemptId, RMAppAttemptState.ALLOCATED);
|
||||
//create and set AMRMToken
|
||||
Token<AMRMTokenIdentifier> amrmToken =
|
||||
this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
|
@ -692,7 +762,7 @@ public class MockRM extends ResourceManager {
|
|||
public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId)
|
||||
throws Exception {
|
||||
MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
|
||||
am.waitForState(RMAppAttemptState.ALLOCATED);
|
||||
waitForState(am.getApplicationAttemptId(), RMAppAttemptState.ALLOCATED);
|
||||
getRMContext().getDispatcher().getEventHandler()
|
||||
.handle(new RMAppAttemptEvent(appAttemptId,
|
||||
RMAppAttemptEventType.LAUNCH_FAILED, "Failed"));
|
||||
|
@ -842,9 +912,9 @@ public class MockRM extends ResourceManager {
|
|||
FinishApplicationMasterRequest.newInstance(
|
||||
FinalApplicationStatus.SUCCEEDED, "", "");
|
||||
am.unregisterAppAttempt(req,true);
|
||||
am.waitForState(RMAppAttemptState.FINISHING);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
|
||||
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
|
||||
}
|
||||
|
||||
|
|
|
@ -124,7 +124,7 @@ public class TestApplicationCleanup {
|
|||
am.unregisterAppAttempt();
|
||||
NodeHeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
|
||||
ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
|
||||
//currently only containers are cleaned via this
|
||||
//AM container is cleaned via container launcher
|
||||
|
|
|
@ -190,7 +190,7 @@ public class TestApplicationMasterLauncher {
|
|||
|
||||
//complete the AM container to finish the app normally
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
|
||||
waitCount = 0;
|
||||
while (containerManager.cleanedup == false && waitCount++ < 20) {
|
||||
|
@ -199,7 +199,7 @@ public class TestApplicationMasterLauncher {
|
|||
}
|
||||
Assert.assertTrue(containerManager.cleanedup);
|
||||
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
|
@ -248,14 +248,13 @@ public class TestApplicationMasterLauncher {
|
|||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
|
||||
|
||||
RMApp app = rm.submitApp(2000);
|
||||
final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
||||
.getAppAttemptId();
|
||||
|
||||
// kick the scheduling
|
||||
nm1.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
rm.waitForState(appAttemptId, RMAppAttemptState.LAUNCHED, 500);
|
||||
MockRM.waitForState(app.getCurrentAppAttempt(),
|
||||
RMAppAttemptState.LAUNCHED, 500);
|
||||
}
|
||||
|
||||
|
||||
|
@ -310,7 +309,7 @@ public class TestApplicationMasterLauncher {
|
|||
am.unregisterAppAttempt();
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
|
||||
ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
|
||||
try {
|
||||
amrs = am.allocate(new ArrayList<ResourceRequest>(),
|
||||
|
|
|
@ -270,7 +270,7 @@ public class TestApplicationMasterService {
|
|||
am1.registerAppAttempt();
|
||||
|
||||
am1.unregisterAppAttempt(req, false);
|
||||
am1.waitForState(RMAppAttemptState.FINISHING);
|
||||
rm.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
|
||||
} finally {
|
||||
if (rm != null) {
|
||||
rm.stop();
|
||||
|
@ -310,6 +310,9 @@ public class TestApplicationMasterService {
|
|||
rm.start();
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||
RMApp app1 = rm.submitApp(2048);
|
||||
//Wait to make sure the attempt has the right state
|
||||
//TODO explore a better way than sleeping for a while (YARN-4929)
|
||||
Thread.sleep(1000);
|
||||
nm1.nodeHeartbeat(true);
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
|
|
|
@ -188,7 +188,7 @@ public class TestClientRMService {
|
|||
MockNM lostNode = rm.registerNode("host2:1235", 1024);
|
||||
rm.sendNodeStarted(lostNode);
|
||||
lostNode.nodeHeartbeat(true);
|
||||
rm.NMwaitForState(lostNode.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(lostNode.getNodeId(), NodeState.RUNNING);
|
||||
rm.sendNodeLost(lostNode);
|
||||
|
||||
// Create a client.
|
||||
|
@ -214,7 +214,7 @@ public class TestClientRMService {
|
|||
|
||||
// Now make the node unhealthy.
|
||||
node.nodeHeartbeat(false);
|
||||
rm.NMwaitForState(node.getNodeId(), NodeState.UNHEALTHY);
|
||||
rm.waitForState(node.getNodeId(), NodeState.UNHEALTHY);
|
||||
|
||||
// Call again
|
||||
nodeReports = client.getClusterNodes(request).getNodeReports();
|
||||
|
|
|
@ -308,7 +308,7 @@ public class TestContainerResourceUsage {
|
|||
app.getCurrentAppAttempt().getMasterContainer().getId();
|
||||
nm.nodeHeartbeat(am0.getApplicationAttemptId(),
|
||||
amContainerId.getContainerId(), ContainerState.COMPLETE);
|
||||
am0.waitForState(RMAppAttemptState.FAILED);
|
||||
rm.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
|
||||
long memorySeconds = 0;
|
||||
long vcoreSeconds = 0;
|
||||
|
@ -347,6 +347,8 @@ public class TestContainerResourceUsage {
|
|||
.equals(am0.getApplicationAttemptId()));
|
||||
|
||||
// launch the new AM
|
||||
//TODO explore a better way than sleeping for a while (YARN-4929)
|
||||
Thread.sleep(1000);
|
||||
nm.nodeHeartbeat(true);
|
||||
MockAM am1 = rm.sendAMLaunched(attempt2.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
|
|
|
@ -134,7 +134,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
|
|||
am.registerAppAttempt();
|
||||
am.unregisterAppAttempt();
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
|
@ -191,7 +191,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
|
|||
|
||||
am.unregisterAppAttempt();
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
@ -398,7 +398,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
|
|||
}
|
||||
nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1,
|
||||
ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
Assert.assertFalse(nmTokenSecretManager
|
||||
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
|
||||
} finally {
|
||||
|
@ -498,7 +498,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
|
|||
RMApp app2 = rm1.submitApp(200);
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am2.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
|
||||
|
||||
// a killed app
|
||||
|
@ -545,7 +545,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
|
|||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||
nm1
|
||||
.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am2.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
// before new attempt is launched, the app report returns the invalid AM
|
||||
|
@ -610,7 +610,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
|
|||
// a failed app
|
||||
RMApp application = rm.submitApp(200);
|
||||
MockAM am = MockRM.launchAM(application, rm, nm1);
|
||||
am.waitForState(RMAppAttemptState.LAUNCHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.RUNNING);
|
||||
rm.waitForState(application.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
|
|
|
@ -463,7 +463,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
|
||||
// fail the AM by sending CONTAINER_FINISHED event without registering.
|
||||
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am0.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
|
||||
ApplicationStateData appState = rmAppState.get(app0.getApplicationId());
|
||||
// assert the AM failed state is saved.
|
||||
|
@ -517,7 +517,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
MockAM am1 = launchAM(app1, rm1, nm1);
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
// Fail first AM.
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
|
||||
// launch another AM.
|
||||
MockAM am2 = launchAM(app1, rm1, nm1);
|
||||
|
@ -688,7 +688,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
FinishApplicationMasterRequest.newInstance(
|
||||
FinalApplicationStatus.SUCCEEDED, "", "");
|
||||
am0.unregisterAppAttempt(req, true);
|
||||
am0.waitForState(RMAppAttemptState.FINISHING);
|
||||
rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
|
||||
// app final state is not saved. This guarantees that RMApp cannot be
|
||||
// recovered via its own saved state, but only via the event notification
|
||||
// from the RMAppAttempt on recovery.
|
||||
|
@ -729,7 +729,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
|
||||
// fail the AM by sending CONTAINER_FINISHED event without registering.
|
||||
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am0.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
||||
|
||||
// assert the app/attempt failed state is saved.
|
||||
|
@ -924,7 +924,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
MockAM am1 = launchAM(app1, rm1, nm1);
|
||||
// fail the AM by sending CONTAINER_FINISHED event without registering.
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||
|
||||
// a killed app.
|
||||
|
@ -1005,7 +1005,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
|
||||
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
||||
throws Exception {
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
|
||||
nm.nodeHeartbeat(true);
|
||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
am.registerAppAttempt();
|
||||
|
@ -1043,9 +1043,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
am.unregisterAppAttempt(req,true);
|
||||
am.waitForState(RMAppAttemptState.FINISHING);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
|
||||
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
|
||||
// check that app/attempt is saved with the final state
|
||||
ApplicationStateData appState = rmAppState.get(rmApp.getApplicationId());
|
||||
|
@ -2232,7 +2232,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
rm2.waitForState(applicationId, RMAppState.ACCEPTED);
|
||||
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
|
||||
|
||||
//Wait to make sure the loadedApp0 has the right number of attempts
|
||||
//TODO explore a better way than sleeping for a while (YARN-4929)
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(2, loadedApp0.getAppAttempts().size());
|
||||
rm2.waitForState(appAttemptId2, RMAppAttemptState.SCHEDULED);
|
||||
|
||||
|
@ -2340,7 +2342,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1,
|
||||
ContainerState.COMPLETE);
|
||||
am0.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(cs,
|
||||
am0.getApplicationAttemptId());
|
||||
|
||||
|
@ -2353,7 +2355,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
// kill app0-attempt
|
||||
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(
|
||||
app0.getCurrentAppAttempt().getMasterContainer().getId()));
|
||||
am0.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(cs,
|
||||
am0.getApplicationAttemptId());
|
||||
}
|
||||
|
@ -2434,7 +2436,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
throws Exception {
|
||||
MockAM am = launchAM(app, rm, nm);
|
||||
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FAILED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
return am;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,7 +106,7 @@ public class TestSignalContainer {
|
|||
|
||||
am.unregisterAppAttempt();
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
|
|
@ -446,6 +446,9 @@ public class TestRMApplicationHistoryWriter {
|
|||
MockNM nm = rm.registerNode("127.0.0.1:1234", 1024 * 10100);
|
||||
|
||||
RMApp app = rm.submitApp(1024);
|
||||
//Wait to make sure the attempt has the right state
|
||||
//TODO explore a better way than sleeping for a while (YARN-4929)
|
||||
Thread.sleep(1000);
|
||||
nm.nodeHeartbeat(true);
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
|
@ -470,9 +473,9 @@ public class TestRMApplicationHistoryWriter {
|
|||
Assert.assertEquals(request, allocatedSize);
|
||||
|
||||
am.unregisterAppAttempt();
|
||||
am.waitForState(RMAppAttemptState.FINISHING);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
|
||||
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
|
||||
NodeHeartbeatResponse resp = nm.nodeHeartbeat(true);
|
||||
List<ContainerId> cleaned = resp.getContainersToCleanup();
|
||||
|
|
|
@ -98,7 +98,7 @@ public class TestAMRMRPCNodeUpdates {
|
|||
|
||||
private void syncNodeLost(MockNM nm) throws Exception {
|
||||
rm.sendNodeStarted(nm);
|
||||
rm.NMwaitForState(nm.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm.getNodeId(), NodeState.RUNNING);
|
||||
rm.sendNodeLost(nm);
|
||||
dispatcher.await();
|
||||
}
|
||||
|
|
|
@ -118,7 +118,6 @@ public class TestAMRestart {
|
|||
nm1.nodeHeartbeat(true);
|
||||
ContainerId containerId5 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
|
||||
rm1.waitForContainerAllocated(nm1, containerId5);
|
||||
rm1.waitForState(nm1, containerId5, RMContainerState.ALLOCATED);
|
||||
|
||||
// 6th container is in Reserved state.
|
||||
|
@ -141,7 +140,7 @@ public class TestAMRestart {
|
|||
|
||||
// fail the AM by sending CONTAINER_FINISHED event without registering.
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
|
||||
// wait for some time. previous AM's running containers should still remain
|
||||
// in scheduler even though am failed
|
||||
|
@ -326,7 +325,7 @@ public class TestAMRestart {
|
|||
|
||||
// fail am1
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
// restart the am
|
||||
|
@ -365,7 +364,7 @@ public class TestAMRestart {
|
|||
|
||||
// fail am2.
|
||||
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am2.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
// restart am
|
||||
|
@ -488,7 +487,7 @@ public class TestAMRestart {
|
|||
BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
|
||||
"", exitStatus, Resources.createResource(200));
|
||||
currentNode.containerStatus(containerStatus);
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
// restart the am
|
||||
|
@ -570,7 +569,7 @@ public class TestAMRestart {
|
|||
// Preempt the first attempt;
|
||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
||||
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||
am1.getApplicationAttemptId());
|
||||
|
||||
|
@ -589,7 +588,7 @@ public class TestAMRestart {
|
|||
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
|
||||
|
||||
am2.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||
am2.getApplicationAttemptId());
|
||||
|
||||
|
@ -612,7 +611,7 @@ public class TestAMRestart {
|
|||
Collections.singletonList(containerStatus));
|
||||
nm1.nodeHeartbeat(conts, true);
|
||||
|
||||
am3.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||
am3.getApplicationAttemptId());
|
||||
|
||||
|
@ -634,7 +633,7 @@ public class TestAMRestart {
|
|||
// nm1 heartbeats to report unhealthy
|
||||
// This will mimic ContainerExitStatus.ABORT
|
||||
nm1.nodeHeartbeat(false);
|
||||
am4.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am4.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||
am4.getApplicationAttemptId());
|
||||
|
||||
|
@ -651,7 +650,7 @@ public class TestAMRestart {
|
|||
// fail the AM normally
|
||||
nm2
|
||||
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am5.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||
am5.getApplicationAttemptId());
|
||||
|
||||
|
@ -696,7 +695,7 @@ public class TestAMRestart {
|
|||
// Forcibly preempt the am container;
|
||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
||||
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
|
@ -823,30 +822,33 @@ public class TestAMRestart {
|
|||
// Fail current attempt normally
|
||||
nm1.nodeHeartbeat(am.getApplicationAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
// launch the second attempt
|
||||
rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
||||
Assert.assertEquals(2, app.getAppAttempts().size());
|
||||
Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt())
|
||||
.mayBeLastAttempt());
|
||||
MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
|
||||
am_2.waitForState(RMAppAttemptState.RUNNING);
|
||||
rm1.waitForState(am_2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
||||
nm1.nodeHeartbeat(am_2.getApplicationAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
am_2.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am_2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
// current app should be failed.
|
||||
rm1.waitForState(app.getApplicationId(), RMAppState.FAILED);
|
||||
|
||||
ControlledClock clock = new ControlledClock();
|
||||
// set window size to 10s
|
||||
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000);;
|
||||
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000);
|
||||
app1.setSystemClock(clock);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
// Fail attempt1 normally
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
//Wait to make sure attempt1 be removed in State Store
|
||||
//TODO explore a better way than sleeping for a while (YARN-4929)
|
||||
Thread.sleep(15 * 1000);
|
||||
|
||||
// launch the second attempt
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
@ -855,14 +857,14 @@ public class TestAMRestart {
|
|||
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
||||
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
am2.waitForState(RMAppAttemptState.RUNNING);
|
||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
||||
|
||||
// wait for 10 seconds
|
||||
clock.setTime(System.currentTimeMillis() + 10*1000);
|
||||
// Fail attempt2 normally
|
||||
nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
am2.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
|
||||
// can launch the third attempt successfully
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
@ -870,7 +872,7 @@ public class TestAMRestart {
|
|||
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
|
||||
clock.reset();
|
||||
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
am3.waitForState(RMAppAttemptState.RUNNING);
|
||||
rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
||||
|
||||
// Restart rm.
|
||||
@SuppressWarnings("resource")
|
||||
|
@ -891,6 +893,9 @@ public class TestAMRestart {
|
|||
nm1.registerNode(Collections.singletonList(status), null);
|
||||
|
||||
rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
|
||||
//Wait to make sure attempt3 be removed in State Store
|
||||
//TODO explore a better way than sleeping for a while (YARN-4929)
|
||||
Thread.sleep(15 * 1000);
|
||||
Assert.assertEquals(2, app1State.getAttemptCount());
|
||||
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
@ -904,7 +909,7 @@ public class TestAMRestart {
|
|||
// Fail attempt4 normally
|
||||
nm1
|
||||
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am4.waitForState(RMAppAttemptState.FAILED);
|
||||
rm2.waitForState(am4.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
Assert.assertEquals(2, app1State.getAttemptCount());
|
||||
|
||||
// can launch the 5th attempt successfully
|
||||
|
@ -913,12 +918,12 @@ public class TestAMRestart {
|
|||
MockAM am5 =
|
||||
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1);
|
||||
clock.reset();
|
||||
am5.waitForState(RMAppAttemptState.RUNNING);
|
||||
rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
||||
|
||||
// Fail attempt5 normally
|
||||
nm1
|
||||
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am5.waitForState(RMAppAttemptState.FAILED);
|
||||
rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
Assert.assertEquals(2, app1State.getAttemptCount());
|
||||
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||
|
@ -991,7 +996,7 @@ public class TestAMRestart {
|
|||
// fail the AM by sending CONTAINER_FINISHED event without registering.
|
||||
nm1.nodeHeartbeat(
|
||||
am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
|
||||
// wait for app to start a new attempt.
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
|
|
@ -85,7 +85,7 @@ public class TestNodesListManager {
|
|||
am.registerAppAttempt();
|
||||
am.unregisterAppAttempt();
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
|
||||
// Create submitted App
|
||||
RMApp subrmApp = rm.submitApp(200);
|
||||
|
|
|
@ -475,7 +475,6 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
|||
nm2.nodeHeartbeat(true);
|
||||
ContainerId containerId3 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||
rm1.waitForContainerAllocated(nm2, containerId3);
|
||||
rm1.waitForState(nm2, containerId3, RMContainerState.ALLOCATED);
|
||||
|
||||
// NodeManager restart
|
||||
|
@ -563,7 +562,6 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
|||
node.nodeHeartbeat(true);
|
||||
ContainerId allocatedContainerID =
|
||||
ContainerId.newContainerId(applicationAttemptOneID, 3);
|
||||
rm.waitForContainerAllocated(node, allocatedContainerID);
|
||||
rm.waitForState(node, allocatedContainerID, RMContainerState.ALLOCATED);
|
||||
RMContainer allocatedContainer =
|
||||
rm.getResourceScheduler().getRMContainer(allocatedContainerID);
|
||||
|
@ -576,8 +574,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
|||
|
||||
// AM crashes, and a new app-attempt gets created
|
||||
node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE);
|
||||
rm.waitForContainerState(am1ContainerID, RMContainerState.COMPLETED,
|
||||
30 * 1000);
|
||||
rm.waitForState(node, am1ContainerID, RMContainerState.COMPLETED, 30 * 1000);
|
||||
RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm);
|
||||
ApplicationAttemptId applicationAttemptTwoID =
|
||||
rmAppAttempt2.getAppAttemptId();
|
||||
|
|
|
@ -1093,7 +1093,7 @@ public class TestCapacityScheduler {
|
|||
ContainerId containerId2 =
|
||||
ContainerId.newContainerId(applicationAttemptId, 2);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
|
||||
// Acquire the container
|
||||
allocateRequest = AllocateRequest.newInstance(1, 0.0f, null, null, null);
|
||||
|
@ -2579,7 +2579,7 @@ public class TestCapacityScheduler {
|
|||
ContainerId containerId =
|
||||
ContainerId.newContainerId(am.getApplicationAttemptId(), cId);
|
||||
Assert.assertTrue(rm.waitForState(nm, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2816,10 +2816,10 @@ public class TestCapacityScheduler {
|
|||
ContainerId containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||
Assert.assertTrue(rm.waitForState(nm2, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
|
||||
checkPendingResource(rm, "a1", 0 * GB, null);
|
||||
checkPendingResource(rm, "a", 0 * GB, null);
|
||||
|
@ -3161,7 +3161,7 @@ public class TestCapacityScheduler {
|
|||
ContainerId containerId3 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId3,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
// Acquire them
|
||||
am1.allocate(null, null);
|
||||
sentRMContainerLaunched(rm,
|
||||
|
|
|
@ -181,7 +181,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|||
// request a container.
|
||||
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
|
||||
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
rm.waitForState(nm1, containerId, RMContainerState.ALLOCATED, 10 * 1000);
|
||||
rm.waitForState(nm1, containerId, RMContainerState.ALLOCATED);
|
||||
appResourceUsageReport =
|
||||
rm.getResourceScheduler().getAppResourceUsageReport(
|
||||
am1.getApplicationAttemptId());
|
||||
|
@ -242,7 +242,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|||
containerId1 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
|
||||
// check used resource:
|
||||
// queue-a used x=1G, ""=1G
|
||||
|
@ -415,12 +415,12 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|||
ContainerId containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
am1.allocate("*", GB, 1, new ArrayList<ContainerId>());
|
||||
containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||
Assert.assertTrue(rm.waitForState(nm2, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
|
||||
// app2
|
||||
RMApp app2 = rm.submitApp(GB, "app", "u2", null, "a");
|
||||
|
@ -431,7 +431,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|||
containerId =
|
||||
ContainerId.newContainerId(am2.getApplicationAttemptId(), 3);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
|
||||
// check used resource:
|
||||
// queue-a used x=1G, ""=1G
|
||||
|
@ -513,7 +513,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
|
||||
// check used resource:
|
||||
// queue-a used x=2G
|
||||
|
|
|
@ -254,7 +254,7 @@ public class TestContainerResizing {
|
|||
ContainerId containerId2 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm1.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
// Acquire them, and NM report RUNNING
|
||||
am1.allocate(null, null);
|
||||
sentRMContainerLaunched(rm1, containerId2);
|
||||
|
@ -359,7 +359,7 @@ public class TestContainerResizing {
|
|||
ContainerId containerId2 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm1.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
// Acquire them, and NM report RUNNING
|
||||
am1.allocate(null, null);
|
||||
sentRMContainerLaunched(rm1, containerId2);
|
||||
|
@ -436,7 +436,7 @@ public class TestContainerResizing {
|
|||
ContainerId containerId2 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm1.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
// Acquire them, and NM report RUNNING
|
||||
am1.allocate(null, null);
|
||||
sentRMContainerLaunched(rm1, containerId2);
|
||||
|
@ -547,7 +547,7 @@ public class TestContainerResizing {
|
|||
ContainerId containerId2 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm1.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
// Acquire them, and NM report RUNNING
|
||||
am1.allocate(null, null);
|
||||
sentRMContainerLaunched(rm1, containerId2);
|
||||
|
@ -659,11 +659,12 @@ public class TestContainerResizing {
|
|||
ContainerId containerId2 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm1.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
// Acquire them, and NM report RUNNING
|
||||
am1.allocate(null, null);
|
||||
sentRMContainerLaunched(rm1, containerId2);
|
||||
rm1.waitForContainerState(containerId2, RMContainerState.RUNNING);
|
||||
rm1.waitForState(Arrays.asList(nm1, nm2), containerId2,
|
||||
RMContainerState.RUNNING);
|
||||
|
||||
// am1 asks to change its AM container from 2GB to 8GB
|
||||
am1.sendContainerResizingRequest(Arrays.asList(
|
||||
|
@ -757,8 +758,7 @@ public class TestContainerResizing {
|
|||
ContainerId containerId2 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(
|
||||
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED,
|
||||
10 * 1000));
|
||||
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED));
|
||||
// Acquire them, and NM report RUNNING
|
||||
am1.allocate(null, null);
|
||||
sentRMContainerLaunched(rm1, containerId2);
|
||||
|
@ -834,7 +834,7 @@ public class TestContainerResizing {
|
|||
ContainerId lastContainerId = ContainerId.newContainerId(
|
||||
am.getApplicationAttemptId(), startContainerId + nContainer - 1);
|
||||
Assert.assertTrue(rm.waitForState(nm, lastContainerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
// Acquire them, and NM report RUNNING
|
||||
am.allocate(null, null);
|
||||
|
||||
|
@ -842,7 +842,7 @@ public class TestContainerResizing {
|
|||
+ nContainer; cId++) {
|
||||
sentRMContainerLaunched(rm,
|
||||
ContainerId.newContainerId(am.getApplicationAttemptId(), cId));
|
||||
rm.waitForContainerState(
|
||||
rm.waitForState(nm,
|
||||
ContainerId.newContainerId(am.getApplicationAttemptId(), cId),
|
||||
RMContainerState.RUNNING);
|
||||
}
|
||||
|
|
|
@ -180,13 +180,13 @@ public class TestNodeLabelContainerAllocation {
|
|||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
|
||||
Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
// Cannot allocate 2nd label=empty container
|
||||
containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
|
||||
Assert.assertFalse(rm1.waitForState(nm3, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
|
||||
// A has default user limit = 100, so it can use all resource in label = x
|
||||
// We can allocate floor(8000 / 1024) = 7 containers
|
||||
|
@ -195,7 +195,7 @@ public class TestNodeLabelContainerAllocation {
|
|||
ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
|
||||
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
|
||||
Assert.assertTrue(rm1.waitForState(nm1, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
}
|
||||
rm1.close();
|
||||
}
|
||||
|
@ -270,7 +270,7 @@ public class TestNodeLabelContainerAllocation {
|
|||
containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
|
||||
Assert.assertTrue(rm1.waitForState(nm2, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h2");
|
||||
|
||||
|
@ -285,9 +285,9 @@ public class TestNodeLabelContainerAllocation {
|
|||
am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
||||
containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
|
||||
Assert.assertFalse(rm1.waitForState(nm4, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
Assert.assertFalse(rm1.waitForState(nm5, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
|
||||
// launch an app to queue b2
|
||||
RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
|
||||
|
@ -298,9 +298,9 @@ public class TestNodeLabelContainerAllocation {
|
|||
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
|
||||
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
|
||||
Assert.assertFalse(rm1.waitForState(nm1, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
||||
"h3");
|
||||
|
||||
|
@ -309,7 +309,7 @@ public class TestNodeLabelContainerAllocation {
|
|||
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
|
||||
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
|
||||
Assert.assertTrue(rm1.waitForState(nm4, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
||||
"h4");
|
||||
|
||||
|
@ -349,9 +349,9 @@ public class TestNodeLabelContainerAllocation {
|
|||
containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertFalse(rm1.waitForState(nm2, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
Assert.assertTrue(rm1.waitForState(nm1, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h1");
|
||||
|
||||
|
@ -364,9 +364,9 @@ public class TestNodeLabelContainerAllocation {
|
|||
am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
|
||||
containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
|
||||
Assert.assertFalse(rm1.waitForState(nm1, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
Assert.assertTrue(rm1.waitForState(nm2, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
|
||||
"h2");
|
||||
|
||||
|
@ -379,9 +379,9 @@ public class TestNodeLabelContainerAllocation {
|
|||
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
||||
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
|
||||
Assert.assertFalse(rm1.waitForState(nm2, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
||||
"h3");
|
||||
|
||||
|
@ -425,9 +425,9 @@ public class TestNodeLabelContainerAllocation {
|
|||
containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertFalse(rm1.waitForState(nm3, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
Assert.assertTrue(rm1.waitForState(nm1, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
||||
"h1");
|
||||
|
||||
|
@ -440,9 +440,9 @@ public class TestNodeLabelContainerAllocation {
|
|||
am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
||||
containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
|
||||
Assert.assertFalse(rm1.waitForState(nm3, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
Assert.assertTrue(rm1.waitForState(nm2, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
|
||||
"h2");
|
||||
|
||||
|
@ -455,9 +455,9 @@ public class TestNodeLabelContainerAllocation {
|
|||
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
||||
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
|
||||
Assert.assertFalse(rm1.waitForState(nm2, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
||||
"h3");
|
||||
|
||||
|
@ -780,7 +780,7 @@ public class TestNodeLabelContainerAllocation {
|
|||
am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
|
||||
am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");
|
||||
Assert.assertTrue(rm1.waitForState(nm1, nextContainerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
|
||||
// Check pending resource for am2, priority=1 doesn't get allocated before
|
||||
// priority=2 allocated
|
||||
|
@ -834,7 +834,7 @@ public class TestNodeLabelContainerAllocation {
|
|||
nextContainerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), i);
|
||||
Assert.assertTrue(rm1.waitForState(Arrays.asList(nm1, nm2),
|
||||
nextContainerId, RMContainerState.ALLOCATED, 10 * 1000));
|
||||
nextContainerId, RMContainerState.ALLOCATED));
|
||||
}
|
||||
// no more container allocated on nm1
|
||||
checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 0);
|
||||
|
|
|
@ -166,7 +166,7 @@ public class TestWorkPreservingRMRestartForNodeLabel {
|
|||
containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm1.waitForState(nm1, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkRMContainerLabelExpression(ContainerId.newContainerId(
|
||||
am1.getApplicationAttemptId(), 1), rm1, "x");
|
||||
checkRMContainerLabelExpression(ContainerId.newContainerId(
|
||||
|
@ -181,7 +181,7 @@ public class TestWorkPreservingRMRestartForNodeLabel {
|
|||
am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
||||
containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm1.waitForState(nm2, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkRMContainerLabelExpression(ContainerId.newContainerId(
|
||||
am2.getApplicationAttemptId(), 1), rm1, "y");
|
||||
checkRMContainerLabelExpression(ContainerId.newContainerId(
|
||||
|
@ -196,7 +196,7 @@ public class TestWorkPreservingRMRestartForNodeLabel {
|
|||
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
||||
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED));
|
||||
checkRMContainerLabelExpression(ContainerId.newContainerId(
|
||||
am3.getApplicationAttemptId(), 1), rm1, "");
|
||||
checkRMContainerLabelExpression(ContainerId.newContainerId(
|
||||
|
|
|
@ -1460,7 +1460,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
|||
while (true) {
|
||||
// fail the AM by sending CONTAINER_FINISHED event without registering.
|
||||
amNodeManager.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FAILED);
|
||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
if (numAttempt == maxAppAttempts) {
|
||||
rm.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||
break;
|
||||
|
|
|
@ -133,13 +133,13 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||
MockNM nm2 = rm.registerNode("h2:1235", 5121);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.NEW);
|
||||
|
||||
MockNM nm3 = rm.registerNode("h3:1236", 5122);
|
||||
rm.NMwaitForState(nm3.getNodeId(), NodeState.NEW);
|
||||
rm.waitForState(nm3.getNodeId(), NodeState.NEW);
|
||||
rm.sendNodeStarted(nm3);
|
||||
rm.NMwaitForState(nm3.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);
|
||||
RMNodeImpl node = (RMNodeImpl) rm.getRMContext().getRMNodes()
|
||||
.get(nm3.getNodeId());
|
||||
NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false,
|
||||
|
@ -147,7 +147,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
NodeStatus nodeStatus = NodeStatus.newInstance(nm3.getNodeId(), 1,
|
||||
new ArrayList<ContainerStatus>(), null, nodeHealth, null, null, null);
|
||||
node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeStatus, null));
|
||||
rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY);
|
||||
rm.waitForState(nm3.getNodeId(), NodeState.UNHEALTHY);
|
||||
|
||||
ClientResponse response =
|
||||
r.path("ws").path("v1").path("cluster").path("nodes")
|
||||
|
@ -169,8 +169,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||
MockNM nm2 = rm.registerNode("h2:1235", 5121);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.NEW);
|
||||
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path("nodes").queryParam("states", NodeState.NEW.toString())
|
||||
|
@ -250,8 +250,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
MockNM nm2 = rm.registerNode("h2:1234", 5120);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.sendNodeStarted(nm2);
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
|
||||
rm.sendNodeLost(nm1);
|
||||
rm.sendNodeLost(nm2);
|
||||
|
||||
|
@ -284,8 +284,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
MockNM nm2 = rm.registerNode("h2:1234", 5120);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.sendNodeStarted(nm2);
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
|
||||
rm.sendNodeLost(nm1);
|
||||
rm.sendNodeLost(nm2);
|
||||
|
||||
|
@ -314,8 +314,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||
MockNM nm2 = rm.registerNode("h2:1235", 5121);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.NEW);
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path("nodes").queryParam("states", "running")
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
|
@ -334,8 +334,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||
MockNM nm2 = rm.registerNode("h2:1235", 5121);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.NEW);
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path("nodes").queryParam("states", "UNHEALTHY")
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
|
@ -352,8 +352,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
MockNM nm2 = rm.registerNode("h2:1235", 5121);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.sendNodeStarted(nm2);
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
|
||||
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path(path).accept(media).get(ClientResponse.class);
|
||||
|
@ -623,8 +623,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
MockNM nm3 = rm.registerNode("h3:1236", 5122);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.sendNodeStarted(nm3);
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.NEW);
|
||||
rm.sendNodeLost(nm3);
|
||||
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
|
@ -645,7 +645,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
WebResource r = resource();
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
|
||||
RMNodeImpl node = (RMNodeImpl) rm.getRMContext().getRMNodes()
|
||||
.get(nm1.getNodeId());
|
||||
|
@ -659,7 +659,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
new ArrayList<ContainerStatus>(), null, nodeHealth, containerResource,
|
||||
nodeResource, null);
|
||||
node.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus, null));
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path("nodes").accept(MediaType.APPLICATION_JSON)
|
||||
|
|
Loading…
Reference in New Issue