YARN-4756. Unnecessary wait in Node Status Updater during reboot. (Eric Badger via kasha)
(cherry picked from commite82f961a39
) (cherry picked from commit2b97a50eec
)
This commit is contained in:
parent
413e5b4926
commit
ddb1407980
|
@ -284,6 +284,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.isStopped = true;
|
this.isStopped = true;
|
||||||
|
sendOutofBandHeartBeat();
|
||||||
try {
|
try {
|
||||||
statusUpdater.join();
|
statusUpdater.join();
|
||||||
registerWithRM();
|
registerWithRM();
|
||||||
|
|
|
@ -108,6 +108,7 @@ public class TestNodeManagerResync {
|
||||||
static final String user = "nobody";
|
static final String user = "nobody";
|
||||||
private FileContext localFS;
|
private FileContext localFS;
|
||||||
private CyclicBarrier syncBarrier;
|
private CyclicBarrier syncBarrier;
|
||||||
|
private CyclicBarrier updateBarrier;
|
||||||
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
|
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
|
||||||
private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
|
private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
|
||||||
private final NodeManagerEvent resyncEvent =
|
private final NodeManagerEvent resyncEvent =
|
||||||
|
@ -125,6 +126,7 @@ public class TestNodeManagerResync {
|
||||||
remoteLogsDir.mkdirs();
|
remoteLogsDir.mkdirs();
|
||||||
nmLocalDir.mkdirs();
|
nmLocalDir.mkdirs();
|
||||||
syncBarrier = new CyclicBarrier(2);
|
syncBarrier = new CyclicBarrier(2);
|
||||||
|
updateBarrier = new CyclicBarrier(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -803,9 +805,11 @@ public class TestNodeManagerResync {
|
||||||
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
|
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
|
||||||
assertEquals(Resource.newInstance(1024, 1),
|
assertEquals(Resource.newInstance(1024, 1),
|
||||||
containerStatus.getCapability());
|
containerStatus.getCapability());
|
||||||
|
updateBarrier.await();
|
||||||
// Call the actual rebootNodeStatusUpdaterAndRegisterWithRM().
|
// Call the actual rebootNodeStatusUpdaterAndRegisterWithRM().
|
||||||
// This function should be synchronized with
|
// This function should be synchronized with
|
||||||
// increaseContainersResource().
|
// increaseContainersResource().
|
||||||
|
updateBarrier.await();
|
||||||
super.rebootNodeStatusUpdaterAndRegisterWithRM();
|
super.rebootNodeStatusUpdaterAndRegisterWithRM();
|
||||||
// Check status after registerWithRM
|
// Check status after registerWithRM
|
||||||
containerStatus = getContainerManager()
|
containerStatus = getContainerManager()
|
||||||
|
@ -831,17 +835,24 @@ public class TestNodeManagerResync {
|
||||||
List<Token> increaseTokens = new ArrayList<Token>();
|
List<Token> increaseTokens = new ArrayList<Token>();
|
||||||
// Add increase request.
|
// Add increase request.
|
||||||
Resource targetResource = Resource.newInstance(4096, 2);
|
Resource targetResource = Resource.newInstance(4096, 2);
|
||||||
try {
|
try{
|
||||||
increaseTokens.add(getContainerToken(targetResource));
|
try {
|
||||||
IncreaseContainersResourceRequest increaseRequest =
|
updateBarrier.await();
|
||||||
IncreaseContainersResourceRequest.newInstance(increaseTokens);
|
increaseTokens.add(getContainerToken(targetResource));
|
||||||
IncreaseContainersResourceResponse increaseResponse =
|
IncreaseContainersResourceRequest increaseRequest =
|
||||||
getContainerManager()
|
IncreaseContainersResourceRequest.newInstance(increaseTokens);
|
||||||
.increaseContainersResource(increaseRequest);
|
IncreaseContainersResourceResponse increaseResponse =
|
||||||
Assert.assertEquals(
|
getContainerManager()
|
||||||
1, increaseResponse.getSuccessfullyIncreasedContainers()
|
.increaseContainersResource(increaseRequest);
|
||||||
.size());
|
Assert.assertEquals(
|
||||||
Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
|
1, increaseResponse.getSuccessfullyIncreasedContainers()
|
||||||
|
.size());
|
||||||
|
Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
updateBarrier.await();
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue