YARN-3999. RM hangs on draing events. Contributed by Jian He

(cherry picked from commit 3ae716fa69)
This commit is contained in:
Xuan 2015-08-11 18:25:11 -07:00
parent 3dcff42437
commit 2ebdf5bfce
16 changed files with 104 additions and 93 deletions

View File

@ -83,11 +83,13 @@ public void start() {
public void stop() {
shouldRun = false;
monitorThread.interrupt();
try {
monitorThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (monitorThread != null) {
monitorThread.interrupt();
try {
monitorThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -719,6 +719,7 @@ Release 2.7.2 - UNRELEASED
YARN-3978. Configurably turn off the saving of container info in Generic AHS
(Eric Payne via jeagles)
OPTIMIZATIONS
BUG FIXES
@ -749,6 +750,8 @@ Release 2.7.2 - UNRELEASED
YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when
Node is connected/disconnected (Bibin A Chundatt via jlowe)
YARN-3999. RM hangs on draing events. (Jian He via xgong)
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES

View File

@ -1300,6 +1300,11 @@ private static void addDeprecatedKeys() {
public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
= 30 * 1000;
public static final String DISPATCHER_DRAIN_EVENTS_TIMEOUT =
YARN_PREFIX + "dispatcher.drain-events.timeout";
public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000;
/**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
@ -138,9 +139,14 @@ protected void serviceStop() throws Exception {
if (drainEventsOnStop) {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
long endTime = System.currentTimeMillis() + getConfig()
.getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
synchronized (waitForDrained) {
while (!drained && eventHandlingThread != null
&& eventHandlingThread.isAlive()) {
&& eventHandlingThread.isAlive()
&& System.currentTimeMillis() < endTime) {
waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState());

View File

@ -90,6 +90,15 @@
<value>10</value>
</property>
<property>
<description>Timeout in milliseconds when YARN dispatcher tries to drain the
events. Typically, this happens when service is stopping. e.g. RM drains
the ATS events dispatcher when stopping.
</description>
<name>yarn.dispatcher.drain-events.timeout</name>
<value>300000</value>
</property>
<property>
<description>The expiry interval for application master reporting.</description>
<name>yarn.am.liveness-monitor.expiry-interval-ms</name>

View File

@ -18,18 +18,17 @@
package org.apache.hadoop.yarn.event;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.*;
public class TestAsyncDispatcher {
/* This test checks whether dispatcher hangs on close if following two things
@ -58,5 +57,23 @@ public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
eventQueue.isEmpty());
disp.close();
}
// Test dispatcher should timeout on draining events.
@Test(timeout=10000)
public void testDispatchStopOnTimeout() throws Exception {
BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>();
eventQueue = spy(eventQueue);
// simulate dispatcher is not drained.
when(eventQueue.isEmpty()).thenReturn(false);
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 2000);
DrainDispatcher disp = new DrainDispatcher(eventQueue);
disp.init(conf);
disp.setDrainEventsOnStop();
disp.start();
disp.waitForEventThreadToWait();
disp.close();
}
}

View File

@ -92,8 +92,6 @@ public class RMActiveServiceContext {
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private RMNodeLabelsManager nodeLabelManager;
private long epoch;
private Clock systemClock = new SystemClock();
@ -117,7 +115,6 @@ public RMActiveServiceContext(Dispatcher rmDispatcher,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
RMApplicationHistoryWriter rmApplicationHistoryWriter,
ResourceScheduler scheduler) {
this();
this.setContainerAllocationExpirer(containerAllocationExpirer);
@ -128,7 +125,6 @@ public RMActiveServiceContext(Dispatcher rmDispatcher,
this.setContainerTokenSecretManager(containerTokenSecretManager);
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
this.setScheduler(scheduler);
RMStateStore nullStore = new NullRMStateStore();
@ -368,32 +364,6 @@ public boolean isWorkPreservingRecoveryEnabled() {
return this.isWorkPreservingRecoveryEnabled;
}
@Private
@Unstable
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return rmApplicationHistoryWriter;
}
@Private
@Unstable
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
}
@Private
@Unstable
public SystemMetricsPublisher getSystemMetricsPublisher() {
return systemMetricsPublisher;
}
@Private
@Unstable
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
@Private
@Unstable
public long getEpoch() {

View File

@ -68,6 +68,9 @@ public class RMContextImpl implements RMContext {
private Configuration yarnConfiguration;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
@ -87,7 +90,6 @@ public RMContextImpl(Dispatcher rmDispatcher,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
RMApplicationHistoryWriter rmApplicationHistoryWriter,
ResourceScheduler scheduler) {
this();
this.setDispatcher(rmDispatcher);
@ -95,7 +97,7 @@ public RMContextImpl(Dispatcher rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
delegationTokenRenewer, appTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager,
clientToAMTokenSecretManager, rmApplicationHistoryWriter,
clientToAMTokenSecretManager,
scheduler));
ConfigurationProvider provider = new LocalConfigurationProvider();
@ -112,8 +114,7 @@ public RMContextImpl(Dispatcher rmDispatcher,
AMRMTokenSecretManager appTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this(
rmDispatcher,
containerAllocationExpirer,
@ -123,9 +124,7 @@ public RMContextImpl(Dispatcher rmDispatcher,
appTokenSecretManager,
containerTokenSecretManager,
nmTokenSecretManager,
clientToAMTokenSecretManager,
rmApplicationHistoryWriter,
null);
clientToAMTokenSecretManager, null);
}
@Override
@ -351,25 +350,25 @@ public boolean isWorkPreservingRecoveryEnabled() {
@Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return activeServiceContext.getRMApplicationHistoryWriter();
return this.rmApplicationHistoryWriter;
}
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher);
this.systemMetricsPublisher = systemMetricsPublisher;
}
@Override
public SystemMetricsPublisher getSystemMetricsPublisher() {
return activeServiceContext.getSystemMetricsPublisher();
return this.systemMetricsPublisher;
}
@Override
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
activeServiceContext
.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
@Override

View File

@ -250,7 +250,7 @@ protected void serviceInit(Configuration conf) throws Exception {
adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService);
rmContext.setYarnConfiguration(conf);
createAndInitActiveServices();
@ -259,6 +259,15 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.RM_BIND_HOST,
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
addService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
super.serviceInit(this.conf);
}
@ -411,7 +420,6 @@ protected void serviceInit(Configuration configuration) throws Exception {
rmContext.setActiveServiceContext(activeServiceContext);
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService);
@ -468,15 +476,6 @@ protected void serviceInit(Configuration configuration) throws Exception {
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
}
RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
addService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
@ -596,11 +595,13 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
DefaultMetricsSystem.shutdown();
super.serviceStop();
if (pauseMonitor != null) {
pauseMonitor.stop();
}
DefaultMetricsSystem.shutdown();
if (rmContext != null) {
RMStateStore store = rmContext.getStateStore();
try {
@ -610,7 +611,6 @@ protected void serviceStop() throws Exception {
}
}
super.serviceStop();
}
protected void createPolicyMonitors() {
@ -1033,12 +1033,12 @@ synchronized void transitionToStandby(boolean initialize)
}
LOG.info("Transitioning to standby state");
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.ACTIVE) {
HAServiceState state = rmContext.getHAServiceState();
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
stopActiveServices();
reinitialize(initialize);
}
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
LOG.info("Transitioned to standby state");
}

View File

@ -120,7 +120,7 @@ public RMContext mockRMContext(int n, long time) {
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext context = new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null, null, writer) {
null, null, null, null, null) {
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return map;
@ -128,7 +128,8 @@ public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
};
((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
metricsPublisher = mock(SystemMetricsPublisher.class);
((RMContextImpl)context).setSystemMetricsPublisher(metricsPublisher);
context.setSystemMetricsPublisher(metricsPublisher);
context.setRMApplicationHistoryWriter(writer);
return context;
}

View File

@ -66,6 +66,7 @@ public void setUp() throws Exception {
@After
public void tearDown() throws Exception {
resourceManager.stop();
}
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager

View File

@ -87,9 +87,9 @@ public void setUp() throws Exception {
rmContext =
new RMContextImpl(rmDispatcher, null, null, null,
null, null, null, null, null,
new RMApplicationHistoryWriter());
null, null, null, null, null);
rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
scheduler = mock(YarnScheduler.class);
doAnswer(

View File

@ -212,11 +212,11 @@ public void setUp() throws Exception {
renewer, new AMRMTokenSecretManager(conf, this.rmContext),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(),
writer);
new ClientToAMTokenSecretManagerInRM());
((RMContextImpl)realRMContext).setStateStore(store);
publisher = mock(SystemMetricsPublisher.class);
((RMContextImpl)realRMContext).setSystemMetricsPublisher(publisher);
realRMContext.setSystemMetricsPublisher(publisher);
realRMContext.setRMApplicationHistoryWriter(writer);
this.rmContext = spy(realRMContext);

View File

@ -258,14 +258,14 @@ public void setUp() throws Exception {
null, amRMTokenManager,
new RMContainerTokenSecretManager(conf),
nmTokenManager,
clientToAMTokenManager,
writer);
clientToAMTokenManager);
store = mock(RMStateStore.class);
((RMContextImpl) rmContext).setStateStore(store);
publisher = mock(SystemMetricsPublisher.class);
((RMContextImpl) rmContext).setSystemMetricsPublisher(publisher);
rmContext.setSystemMetricsPublisher(publisher);
rmContext.setRMApplicationHistoryWriter(writer);
scheduler = mock(YarnScheduler.class);
masterService = mock(ApplicationMasterService.class);
applicationMasterLauncher = mock(ApplicationMasterLauncher.class);

View File

@ -42,7 +42,6 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMActiveServiceContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@ -103,7 +102,7 @@ public EventHandler getEventHandler() {
new AMRMTokenSecretManager(conf, null),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), writer);
new ClientToAMTokenSecretManagerInRM());
RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
when(
nlm.getQueueResource(any(String.class), any(Set.class),
@ -117,8 +116,8 @@ public Resource answer(InvocationOnMock invocation) throws Throwable {
when(nlm.getResourceByLabel(any(String.class), any(Resource.class)))
.thenAnswer(new Answer<Resource>() {
@Override
public Resource answer(InvocationOnMock invocation) throws Throwable {
@Override public Resource answer(InvocationOnMock invocation)
throws Throwable {
Object[] args = invocation.getArguments();
return (Resource) args[1];
}
@ -126,7 +125,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable {
rmContext.setNodeLabelManager(nlm);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
ResourceScheduler mockScheduler = mock(ResourceScheduler.class);
when(mockScheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());

View File

@ -172,7 +172,7 @@ public void testAppAttemptMetrics() throws Exception {
FifoScheduler scheduler = new FifoScheduler();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null, writer, scheduler);
null, null, null, null, null, null, null, scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
@ -218,10 +218,10 @@ public void testNodeLocalAssignment() throws Exception {
FifoScheduler scheduler = new FifoScheduler();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext.setRMApplicationHistoryWriter(
mock(RMApplicationHistoryWriter.class));
((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration());
scheduler.setRMContext(rmContext);
@ -300,10 +300,9 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
}
};
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration());
NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
nlm.init(new Configuration());