YARN-8451. Multiple NM heartbeat thread created when a slow NM resync with RM. Contributed by Botong Huang

(cherry picked from commit 100470140d)
This commit is contained in:
Jason Lowe 2018-06-29 13:06:28 -05:00
parent 33e6eec7b8
commit 14c7dc3c1e
2 changed files with 98 additions and 24 deletions

View File

@ -128,6 +128,7 @@ public class NodeManager extends CompositeService
// the NM collector service is set only if the timeline service v.2 is enabled
private NMCollectorService nmCollectorService;
private NodeStatusUpdater nodeStatusUpdater;
private AtomicBoolean resyncingWithRM = new AtomicBoolean(false);
private NodeResourceMonitor nodeResourceMonitor;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
private NMStateStoreService nmStore = null;
@ -371,7 +372,7 @@ public class NodeManager extends CompositeService
addService(del);
// NodeManager level dispatcher
this.dispatcher = new AsyncDispatcher("NM Event dispatcher");
this.dispatcher = createNMDispatcher();
dirsHandler = new LocalDirsHandlerService(metrics);
nodeHealthChecker =
@ -489,31 +490,41 @@ public class NodeManager extends CompositeService
}
protected void resyncWithRM() {
//we do not want to block dispatcher thread here
new Thread() {
@Override
public void run() {
try {
if (!rmWorkPreservingRestartEnabled) {
LOG.info("Cleaning up running containers on resync");
containerManager.cleanupContainersOnNMResync();
// Clear all known collectors for resync.
if (context.getKnownCollectors() != null) {
context.getKnownCollectors().clear();
// Create a thread for resync because we do not want to block dispatcher
// thread here. Also use locking to make sure only one thread is running at
// a time.
if (this.resyncingWithRM.getAndSet(true)) {
// Some other thread is already created for resyncing, do nothing
} else {
// We have got the lock, create a new thread
new Thread() {
@Override
public void run() {
try {
if (!rmWorkPreservingRestartEnabled) {
LOG.info("Cleaning up running containers on resync");
containerManager.cleanupContainersOnNMResync();
// Clear all known collectors for resync.
if (context.getKnownCollectors() != null) {
context.getKnownCollectors().clear();
}
} else {
LOG.info("Preserving containers on resync");
// Re-register known timeline collectors.
reregisterCollectors();
}
} else {
LOG.info("Preserving containers on resync");
// Re-register known timeline collectors.
reregisterCollectors();
((NodeStatusUpdaterImpl) nodeStatusUpdater)
.rebootNodeStatusUpdaterAndRegisterWithRM();
} catch (YarnRuntimeException e) {
LOG.error("Error while rebooting NodeStatusUpdater.", e);
shutDown(NodeManagerStatus.EXCEPTION.getExitCode());
} finally {
// Release lock
resyncingWithRM.set(false);
}
((NodeStatusUpdaterImpl) nodeStatusUpdater)
.rebootNodeStatusUpdaterAndRegisterWithRM();
} catch (YarnRuntimeException e) {
LOG.error("Error while rebooting NodeStatusUpdater.", e);
shutDown(NodeManagerStatus.EXCEPTION.getExitCode());
}
}
}.start();
}.start();
}
}
/**
@ -861,7 +872,14 @@ public class NodeManager extends CompositeService
ContainerManagerImpl getContainerManager() {
return containerManager;
}
/**
* Unit test friendly.
*/
protected AsyncDispatcher createNMDispatcher() {
return new AsyncDispatcher("NM Event dispatcher");
}
//For testing
Dispatcher getNMDispatcher(){
return dispatcher;

View File

@ -37,6 +37,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
@ -64,7 +65,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -107,6 +110,7 @@ public class TestNodeManagerResync {
private FileContext localFS;
private CyclicBarrier syncBarrier;
private CyclicBarrier updateBarrier;
private AtomicInteger resyncThreadCount;
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
private final NodeManagerEvent resyncEvent =
@ -125,6 +129,7 @@ public class TestNodeManagerResync {
nmLocalDir.mkdirs();
syncBarrier = new CyclicBarrier(2);
updateBarrier = new CyclicBarrier(2);
resyncThreadCount = new AtomicInteger(0);
}
@After
@ -185,6 +190,41 @@ public class TestNodeManagerResync {
}
}
@SuppressWarnings("resource")
@Test(timeout = 30000)
public void testNMMultipleResyncEvent()
throws IOException, InterruptedException {
TestNodeManager1 nm = new TestNodeManager1(false);
YarnConfiguration conf = createNMConfig();
int resyncEventCount = 4;
try {
nm.init(conf);
nm.start();
Assert.assertEquals(1, nm.getNMRegistrationCount());
for (int i = 0; i < resyncEventCount; i++) {
nm.getNMDispatcher().getEventHandler().handle(resyncEvent);
}
DrainDispatcher dispatcher = (DrainDispatcher) nm.getNMDispatcher();
dispatcher.await();
LOG.info("NM dispatcher drained");
// Wait for the resync thread to finish
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
}
LOG.info("Barrier wait done for the resync thread");
// Resync should only happen once
Assert.assertEquals(2, nm.getNMRegistrationCount());
Assert.assertFalse("NM shutdown called.", isNMShutdownCalled.get());
} finally {
nm.stop();
}
}
@SuppressWarnings("resource")
@Test(timeout=10000)
public void testNMshutdownWhenResyncThrowException() throws IOException,
@ -399,6 +439,11 @@ public class TestNodeManagerResync {
existingCid = cId;
}
@Override
protected AsyncDispatcher createNMDispatcher() {
return new DrainDispatcher();
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
@ -410,6 +455,14 @@ public class TestNodeManagerResync {
return registrationCount;
}
@Override
protected void shutDown(int exitCode) {
synchronized (isNMShutdownCalled) {
isNMShutdownCalled.set(true);
isNMShutdownCalled.notify();
}
}
class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater {
public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher,
@ -428,6 +481,9 @@ public class TestNodeManagerResync {
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.Container> containers =
getNMContext().getContainers();
if (resyncThreadCount.incrementAndGet() > 1) {
throw new YarnRuntimeException("Multiple resync thread created!");
}
try {
try {
if (containersShouldBePreserved) {