From dbb0a5f51f3ea25fcbfdb0dbb7bb46e760c9e414 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 29 Jun 2018 13:06:28 -0500 Subject: [PATCH] YARN-8451. Multiple NM heartbeat thread created when a slow NM resync with RM. Contributed by Botong Huang (cherry picked from commit 100470140d86eede0fa240a9aa93226f274ee4f5) --- .../yarn/server/nodemanager/NodeManager.java | 66 ++++++++++++------- .../nodemanager/TestNodeManagerResync.java | 56 ++++++++++++++++ 2 files changed, 98 insertions(+), 24 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 2748a8fb568..c8234bd2d47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -128,6 +128,7 @@ public class NodeManager extends CompositeService // the NM collector service is set only if the timeline service v.2 is enabled private NMCollectorService nmCollectorService; private NodeStatusUpdater nodeStatusUpdater; + private AtomicBoolean resyncingWithRM = new AtomicBoolean(false); private NodeResourceMonitor nodeResourceMonitor; private static CompositeServiceShutdownHook nodeManagerShutdownHook; private NMStateStoreService nmStore = null; @@ -393,7 +394,7 @@ public class NodeManager extends CompositeService addService(del); // NodeManager level dispatcher - this.dispatcher = new AsyncDispatcher("NM Event dispatcher"); + this.dispatcher = createNMDispatcher(); nodeHealthChecker = new NodeHealthCheckerService( @@ -517,31 +518,41 @@ public class NodeManager extends CompositeService } protected void resyncWithRM() { - //we do not want to block dispatcher thread here - new Thread() { - @Override - public void run() { - try { - if (!rmWorkPreservingRestartEnabled) { - LOG.info("Cleaning up running containers on resync"); - containerManager.cleanupContainersOnNMResync(); - // Clear all known collectors for resync. - if (context.getKnownCollectors() != null) { - context.getKnownCollectors().clear(); + // Create a thread for resync because we do not want to block dispatcher + // thread here. Also use locking to make sure only one thread is running at + // a time. + if (this.resyncingWithRM.getAndSet(true)) { + // Some other thread is already created for resyncing, do nothing + } else { + // We have got the lock, create a new thread + new Thread() { + @Override + public void run() { + try { + if (!rmWorkPreservingRestartEnabled) { + LOG.info("Cleaning up running containers on resync"); + containerManager.cleanupContainersOnNMResync(); + // Clear all known collectors for resync. + if (context.getKnownCollectors() != null) { + context.getKnownCollectors().clear(); + } + } else { + LOG.info("Preserving containers on resync"); + // Re-register known timeline collectors. + reregisterCollectors(); } - } else { - LOG.info("Preserving containers on resync"); - // Re-register known timeline collectors. - reregisterCollectors(); + ((NodeStatusUpdaterImpl) nodeStatusUpdater) + .rebootNodeStatusUpdaterAndRegisterWithRM(); + } catch (YarnRuntimeException e) { + LOG.error("Error while rebooting NodeStatusUpdater.", e); + shutDown(NodeManagerStatus.EXCEPTION.getExitCode()); + } finally { + // Release lock + resyncingWithRM.set(false); } - ((NodeStatusUpdaterImpl) nodeStatusUpdater) - .rebootNodeStatusUpdaterAndRegisterWithRM(); - } catch (YarnRuntimeException e) { - LOG.error("Error while rebooting NodeStatusUpdater.", e); - shutDown(NodeManagerStatus.EXCEPTION.getExitCode()); } - } - }.start(); + }.start(); + } } /** @@ -946,7 +957,14 @@ public class NodeManager extends CompositeService ContainerManagerImpl getContainerManager() { return containerManager; } - + + /** + * Unit test friendly. + */ + protected AsyncDispatcher createNMDispatcher() { + return new AsyncDispatcher("NM Event dispatcher"); + } + //For testing Dispatcher getNMDispatcher(){ return dispatcher; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index cf33775fcf2..b3f4e1bcb86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -37,6 +37,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; @@ -64,7 +65,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -107,6 +110,7 @@ public class TestNodeManagerResync { private FileContext localFS; private CyclicBarrier syncBarrier; private CyclicBarrier updateBarrier; + private AtomicInteger resyncThreadCount; private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false); private final NodeManagerEvent resyncEvent = @@ -125,6 +129,7 @@ public class TestNodeManagerResync { nmLocalDir.mkdirs(); syncBarrier = new CyclicBarrier(2); updateBarrier = new CyclicBarrier(2); + resyncThreadCount = new AtomicInteger(0); } @After @@ -185,6 +190,41 @@ public class TestNodeManagerResync { } } + @SuppressWarnings("resource") + @Test(timeout = 30000) + public void testNMMultipleResyncEvent() + throws IOException, InterruptedException { + TestNodeManager1 nm = new TestNodeManager1(false); + YarnConfiguration conf = createNMConfig(); + + int resyncEventCount = 4; + try { + nm.init(conf); + nm.start(); + Assert.assertEquals(1, nm.getNMRegistrationCount()); + for (int i = 0; i < resyncEventCount; i++) { + nm.getNMDispatcher().getEventHandler().handle(resyncEvent); + } + + DrainDispatcher dispatcher = (DrainDispatcher) nm.getNMDispatcher(); + dispatcher.await(); + LOG.info("NM dispatcher drained"); + + // Wait for the resync thread to finish + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + LOG.info("Barrier wait done for the resync thread"); + + // Resync should only happen once + Assert.assertEquals(2, nm.getNMRegistrationCount()); + Assert.assertFalse("NM shutdown called.", isNMShutdownCalled.get()); + } finally { + nm.stop(); + } + } + @SuppressWarnings("resource") @Test(timeout=10000) public void testNMshutdownWhenResyncThrowException() throws IOException, @@ -399,6 +439,11 @@ public class TestNodeManagerResync { existingCid = cId; } + @Override + protected AsyncDispatcher createNMDispatcher() { + return new DrainDispatcher(); + } + @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { @@ -410,6 +455,14 @@ public class TestNodeManagerResync { return registrationCount; } + @Override + protected void shutDown(int exitCode) { + synchronized (isNMShutdownCalled) { + isNMShutdownCalled.set(true); + isNMShutdownCalled.notify(); + } + } + class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater { public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher, @@ -428,6 +481,9 @@ public class TestNodeManagerResync { ConcurrentMap containers = getNMContext().getContainers(); + if (resyncThreadCount.incrementAndGet() > 1) { + throw new YarnRuntimeException("Multiple resync thread created!"); + } try { try { if (containersShouldBePreserved) {