YARN-8451. Multiple NM heartbeat thread created when a slow NM resync with RM. Contributed by Botong Huang

(cherry picked from commit 100470140d)
This commit is contained in:
Jason Lowe 2018-06-29 13:06:28 -05:00
parent 00406f583b
commit dbb0a5f51f
2 changed files with 98 additions and 24 deletions

View File

@ -128,6 +128,7 @@ public class NodeManager extends CompositeService
// the NM collector service is set only if the timeline service v.2 is enabled // the NM collector service is set only if the timeline service v.2 is enabled
private NMCollectorService nmCollectorService; private NMCollectorService nmCollectorService;
private NodeStatusUpdater nodeStatusUpdater; private NodeStatusUpdater nodeStatusUpdater;
private AtomicBoolean resyncingWithRM = new AtomicBoolean(false);
private NodeResourceMonitor nodeResourceMonitor; private NodeResourceMonitor nodeResourceMonitor;
private static CompositeServiceShutdownHook nodeManagerShutdownHook; private static CompositeServiceShutdownHook nodeManagerShutdownHook;
private NMStateStoreService nmStore = null; private NMStateStoreService nmStore = null;
@ -393,7 +394,7 @@ public class NodeManager extends CompositeService
addService(del); addService(del);
// NodeManager level dispatcher // NodeManager level dispatcher
this.dispatcher = new AsyncDispatcher("NM Event dispatcher"); this.dispatcher = createNMDispatcher();
nodeHealthChecker = nodeHealthChecker =
new NodeHealthCheckerService( new NodeHealthCheckerService(
@ -517,31 +518,41 @@ public class NodeManager extends CompositeService
} }
protected void resyncWithRM() { protected void resyncWithRM() {
//we do not want to block dispatcher thread here // Create a thread for resync because we do not want to block dispatcher
new Thread() { // thread here. Also use locking to make sure only one thread is running at
@Override // a time.
public void run() { if (this.resyncingWithRM.getAndSet(true)) {
try { // Some other thread is already created for resyncing, do nothing
if (!rmWorkPreservingRestartEnabled) { } else {
LOG.info("Cleaning up running containers on resync"); // We have got the lock, create a new thread
containerManager.cleanupContainersOnNMResync(); new Thread() {
// Clear all known collectors for resync. @Override
if (context.getKnownCollectors() != null) { public void run() {
context.getKnownCollectors().clear(); try {
if (!rmWorkPreservingRestartEnabled) {
LOG.info("Cleaning up running containers on resync");
containerManager.cleanupContainersOnNMResync();
// Clear all known collectors for resync.
if (context.getKnownCollectors() != null) {
context.getKnownCollectors().clear();
}
} else {
LOG.info("Preserving containers on resync");
// Re-register known timeline collectors.
reregisterCollectors();
} }
} else { ((NodeStatusUpdaterImpl) nodeStatusUpdater)
LOG.info("Preserving containers on resync"); .rebootNodeStatusUpdaterAndRegisterWithRM();
// Re-register known timeline collectors. } catch (YarnRuntimeException e) {
reregisterCollectors(); LOG.error("Error while rebooting NodeStatusUpdater.", e);
shutDown(NodeManagerStatus.EXCEPTION.getExitCode());
} finally {
// Release lock
resyncingWithRM.set(false);
} }
((NodeStatusUpdaterImpl) nodeStatusUpdater)
.rebootNodeStatusUpdaterAndRegisterWithRM();
} catch (YarnRuntimeException e) {
LOG.error("Error while rebooting NodeStatusUpdater.", e);
shutDown(NodeManagerStatus.EXCEPTION.getExitCode());
} }
} }.start();
}.start(); }
} }
/** /**
@ -946,7 +957,14 @@ public class NodeManager extends CompositeService
ContainerManagerImpl getContainerManager() { ContainerManagerImpl getContainerManager() {
return containerManager; return containerManager;
} }
/**
* Unit test friendly.
*/
protected AsyncDispatcher createNMDispatcher() {
return new AsyncDispatcher("NM Event dispatcher");
}
//For testing //For testing
Dispatcher getNMDispatcher(){ Dispatcher getNMDispatcher(){
return dispatcher; return dispatcher;

View File

@ -37,6 +37,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -64,7 +65,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -107,6 +110,7 @@ public class TestNodeManagerResync {
private FileContext localFS; private FileContext localFS;
private CyclicBarrier syncBarrier; private CyclicBarrier syncBarrier;
private CyclicBarrier updateBarrier; private CyclicBarrier updateBarrier;
private AtomicInteger resyncThreadCount;
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false); private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
private final NodeManagerEvent resyncEvent = private final NodeManagerEvent resyncEvent =
@ -125,6 +129,7 @@ public class TestNodeManagerResync {
nmLocalDir.mkdirs(); nmLocalDir.mkdirs();
syncBarrier = new CyclicBarrier(2); syncBarrier = new CyclicBarrier(2);
updateBarrier = new CyclicBarrier(2); updateBarrier = new CyclicBarrier(2);
resyncThreadCount = new AtomicInteger(0);
} }
@After @After
@ -185,6 +190,41 @@ public class TestNodeManagerResync {
} }
} }
@SuppressWarnings("resource")
@Test(timeout = 30000)
public void testNMMultipleResyncEvent()
throws IOException, InterruptedException {
TestNodeManager1 nm = new TestNodeManager1(false);
YarnConfiguration conf = createNMConfig();
int resyncEventCount = 4;
try {
nm.init(conf);
nm.start();
Assert.assertEquals(1, nm.getNMRegistrationCount());
for (int i = 0; i < resyncEventCount; i++) {
nm.getNMDispatcher().getEventHandler().handle(resyncEvent);
}
DrainDispatcher dispatcher = (DrainDispatcher) nm.getNMDispatcher();
dispatcher.await();
LOG.info("NM dispatcher drained");
// Wait for the resync thread to finish
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
}
LOG.info("Barrier wait done for the resync thread");
// Resync should only happen once
Assert.assertEquals(2, nm.getNMRegistrationCount());
Assert.assertFalse("NM shutdown called.", isNMShutdownCalled.get());
} finally {
nm.stop();
}
}
@SuppressWarnings("resource") @SuppressWarnings("resource")
@Test(timeout=10000) @Test(timeout=10000)
public void testNMshutdownWhenResyncThrowException() throws IOException, public void testNMshutdownWhenResyncThrowException() throws IOException,
@ -399,6 +439,11 @@ public class TestNodeManagerResync {
existingCid = cId; existingCid = cId;
} }
@Override
protected AsyncDispatcher createNMDispatcher() {
return new DrainDispatcher();
}
@Override @Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
@ -410,6 +455,14 @@ public class TestNodeManagerResync {
return registrationCount; return registrationCount;
} }
@Override
protected void shutDown(int exitCode) {
synchronized (isNMShutdownCalled) {
isNMShutdownCalled.set(true);
isNMShutdownCalled.notify();
}
}
class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater { class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater {
public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher, public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher,
@ -428,6 +481,9 @@ public class TestNodeManagerResync {
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.Container> containers = .containermanager.container.Container> containers =
getNMContext().getContainers(); getNMContext().getContainers();
if (resyncThreadCount.incrementAndGet() > 1) {
throw new YarnRuntimeException("Multiple resync thread created!");
}
try { try {
try { try {
if (containersShouldBePreserved) { if (containersShouldBePreserved) {