From e82f961a3925aadf9e53a009820a48ba9e4f78b6 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Thu, 7 Apr 2016 17:05:29 -0700 Subject: [PATCH] YARN-4756. Unnecessary wait in Node Status Updater during reboot. (Eric Badger via kasha) --- .../nodemanager/NodeStatusUpdaterImpl.java | 1 + .../nodemanager/TestNodeManagerResync.java | 33 ++++++++++++------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index ad983fe1654..72769bfeca9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -284,6 +284,7 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { return; } this.isStopped = true; + sendOutofBandHeartBeat(); try { statusUpdater.join(); registerWithRM(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index e8c4634e77e..b3d44f526ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -108,6 +108,7 @@ public class TestNodeManagerResync { static final String user = "nobody"; private FileContext localFS; private CyclicBarrier syncBarrier; + private CyclicBarrier updateBarrier; private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false); private final NodeManagerEvent resyncEvent = @@ -125,6 +126,7 @@ public void setup() throws UnsupportedFileSystemException { remoteLogsDir.mkdirs(); nmLocalDir.mkdirs(); syncBarrier = new CyclicBarrier(2); + updateBarrier = new CyclicBarrier(2); } @After @@ -803,9 +805,11 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { .getContainerStatuses(gcsRequest).getContainerStatuses().get(0); assertEquals(Resource.newInstance(1024, 1), containerStatus.getCapability()); + updateBarrier.await(); // Call the actual rebootNodeStatusUpdaterAndRegisterWithRM(). // This function should be synchronized with // increaseContainersResource(). + updateBarrier.await(); super.rebootNodeStatusUpdaterAndRegisterWithRM(); // Check status after registerWithRM containerStatus = getContainerManager() @@ -831,17 +835,24 @@ public void run() { List increaseTokens = new ArrayList(); // Add increase request. Resource targetResource = Resource.newInstance(4096, 2); - try { - increaseTokens.add(getContainerToken(targetResource)); - IncreaseContainersResourceRequest increaseRequest = - IncreaseContainersResourceRequest.newInstance(increaseTokens); - IncreaseContainersResourceResponse increaseResponse = - getContainerManager() - .increaseContainersResource(increaseRequest); - Assert.assertEquals( - 1, increaseResponse.getSuccessfullyIncreasedContainers() - .size()); - Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty()); + try{ + try { + updateBarrier.await(); + increaseTokens.add(getContainerToken(targetResource)); + IncreaseContainersResourceRequest increaseRequest = + IncreaseContainersResourceRequest.newInstance(increaseTokens); + IncreaseContainersResourceResponse increaseResponse = + getContainerManager() + .increaseContainersResource(increaseRequest); + Assert.assertEquals( + 1, increaseResponse.getSuccessfullyIncreasedContainers() + .size()); + Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty()); + } catch (Exception e) { + e.printStackTrace(); + } finally { + updateBarrier.await(); + } } catch (Exception e) { e.printStackTrace(); }