MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when their EventHandlers get exceptions. (vinodkv)

svn merge --ignore-ancestry -c 1291598 ../../trunk


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1291600 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-02-21 05:11:29 +00:00
parent 197d6d34ca
commit 518fed3ac0
22 changed files with 173 additions and 117 deletions

View File

@ -19,6 +19,7 @@ Release 0.23.2 - UNRELEASED
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering
DeletionService threads (Jason Lowe via bobby) DeletionService threads (Jason Lowe via bobby)
@ -34,6 +35,9 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3583. Change pid to String and stime to BigInteger in order to MAPREDUCE-3583. Change pid to String and stime to BigInteger in order to
avoid NumberFormatException caused by overflow. (Zhihong Yu via szetszwo) avoid NumberFormatException caused by overflow. (Zhihong Yu via szetszwo)
MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when
their EventHandlers get exceptions. (vinodkv)
Release 0.23.1 - 2012-02-17 Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -188,6 +188,8 @@ public class MRAppMaster extends CompositeService {
@Override @Override
public void init(final Configuration conf) { public void init(final Configuration conf) {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf); downloadTokensAndSetupUGI(conf);
context = new RunningAppContext(conf); context = new RunningAppContext(conf);

View File

@ -61,7 +61,8 @@ public class ContainerLauncherEvent
@Override @Override
public String toString() { public String toString() {
return super.toString() + " for taskAttempt " + taskAttemptID; return super.toString() + " for container " + containerID + " taskAttempt "
+ taskAttemptID;
} }
@Override @Override

View File

@ -334,7 +334,6 @@ public class ContainerLauncherImpl extends AbstractService implements
LOG.error("Returning, interrupted : " + e); LOG.error("Returning, interrupted : " + e);
return; return;
} }
int poolSize = launcherPool.getCorePoolSize(); int poolSize = launcherPool.getCorePoolSize();
// See if we need up the pool size only if haven't reached the // See if we need up the pool size only if haven't reached the

View File

@ -217,25 +217,17 @@ public class RecoveryService extends CompositeService implements Recovery {
return new RecoveryDispatcher(); return new RecoveryDispatcher();
} }
protected Dispatcher createRecoveryDispatcher(boolean exitOnException) {
return new RecoveryDispatcher(exitOnException);
}
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
class RecoveryDispatcher extends AsyncDispatcher { class RecoveryDispatcher extends AsyncDispatcher {
private final EventHandler actualHandler; private final EventHandler actualHandler;
private final EventHandler handler; private final EventHandler handler;
RecoveryDispatcher(boolean exitOnException) { RecoveryDispatcher() {
super(exitOnException); super();
actualHandler = super.getEventHandler(); actualHandler = super.getEventHandler();
handler = new InterceptingEventHandler(actualHandler); handler = new InterceptingEventHandler(actualHandler);
} }
RecoveryDispatcher() {
this(false);
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void dispatch(Event event) { public void dispatch(Event event) {

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.ContainerToken;
@ -49,6 +51,7 @@ import org.junit.Test;
* Tests the state machine with respect to Job/Task/TaskAttempt failure * Tests the state machine with respect to Job/Task/TaskAttempt failure
* scenarios. * scenarios.
*/ */
@SuppressWarnings("unchecked")
public class TestFail { public class TestFail {
@Test @Test
@ -247,10 +250,17 @@ public class TestFail {
//when attempt times out, heartbeat handler will send the lost event //when attempt times out, heartbeat handler will send the lost event
//leading to Attempt failure //leading to Attempt failure
return new TaskAttemptListenerImpl(getContext(), null) { return new TaskAttemptListenerImpl(getContext(), null) {
@Override
public void startRpcServer(){}; public void startRpcServer(){};
@Override
public void stopRpcServer(){}; public void stopRpcServer(){};
@Override
public InetSocketAddress getAddress() {
return NetUtils.createSocketAddr("localhost", 1234);
}
public void init(Configuration conf) { public void init(Configuration conf) {
conf.setInt("mapreduce.task.timeout", 1*1000);//reduce timeout conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
super.init(conf); super.init(conf);
} }
}; };

View File

@ -54,12 +54,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test; import org.junit.Test;
@ -724,13 +719,6 @@ public class TestRecovery {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
} }
@Override
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryServiceWithCustomDispatcher(
appContext.getApplicationAttemptId(), appContext.getClock(),
getCommitter());
}
@Override @Override
protected ContainerLauncher createContainerLauncher(AppContext context) { protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher() { MockContainerLauncher launcher = new MockContainerLauncher() {
@ -757,21 +745,6 @@ public class TestRecovery {
} }
} }
static class RecoveryServiceWithCustomDispatcher extends RecoveryService {
public RecoveryServiceWithCustomDispatcher(
ApplicationAttemptId applicationAttemptId, Clock clock,
OutputCommitter committer) {
super(applicationAttemptId, clock, committer);
}
@Override
public Dispatcher createRecoveryDispatcher() {
return super.createRecoveryDispatcher(false);
}
}
public static void main(String[] arg) throws Exception { public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery(); TestRecovery test = new TestRecovery();
test.testCrashed(); test.testCrashed();

View File

@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert; import junit.framework.Assert;
@ -64,8 +65,6 @@ public class TestContainerLauncher {
appId, 3); appId, 3);
JobId jobId = MRBuilderUtils.newJobId(appId, 8); JobId jobId = MRBuilderUtils.newJobId(appId, 8);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10);
AppContext context = mock(AppContext.class); AppContext context = mock(AppContext.class);
CustomContainerLauncher containerLauncher = new CustomContainerLauncher( CustomContainerLauncher containerLauncher = new CustomContainerLauncher(
@ -83,6 +82,8 @@ public class TestContainerLauncher {
containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, i);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null, containerId, "host" + i + ":1234", null,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@ -92,9 +93,21 @@ public class TestContainerLauncher {
Assert.assertNull(containerLauncher.foundErrors); Assert.assertNull(containerLauncher.foundErrors);
// Same set of hosts, so no change // Same set of hosts, so no change
containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
containerLauncher.finishEventHandling = true; containerLauncher.finishEventHandling = true;
int timeOut = 0;
while (containerLauncher.numEventsProcessed.get() < 10 && timeOut++ < 200) {
LOG.info("Waiting for number of events processed to become " + 10
+ ". It is now " + containerLauncher.numEventsProcessed.get()
+ ". Timeout is " + timeOut);
Thread.sleep(1000);
}
Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
containerLauncher.finishEventHandling = false;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
ContainerId containerId =
BuilderUtils.newContainerId(appAttemptId, i + 10);
TaskAttemptId taskAttemptId =
MRBuilderUtils.newTaskAttemptId(taskId, i + 10);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null, containerId, "host" + i + ":1234", null,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@ -106,14 +119,16 @@ public class TestContainerLauncher {
// Different hosts, there should be an increase in core-thread-pool size to // Different hosts, there should be an increase in core-thread-pool size to
// 21(11hosts+10buffer) // 21(11hosts+10buffer)
// Core pool size should be 21 but the live pool size should be only 11. // Core pool size should be 21 but the live pool size should be only 11.
containerLauncher.expectedCorePoolSize = 12 + ContainerLauncherImpl.INITIAL_POOL_SIZE; containerLauncher.expectedCorePoolSize =
for (int i = 1; i <= 2; i++) { 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerLauncher.finishEventHandling = false;
containerId, "host1" + i + ":1234", null, ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21);
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21);
} containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
waitForEvents(containerLauncher, 22); containerId, "host11:1234", null,
Assert.assertEquals(12, threadPool.getPoolSize()); ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
waitForEvents(containerLauncher, 21);
Assert.assertEquals(11, threadPool.getPoolSize());
Assert.assertNull(containerLauncher.foundErrors); Assert.assertNull(containerLauncher.foundErrors);
containerLauncher.stop(); containerLauncher.stop();
@ -172,15 +187,15 @@ public class TestContainerLauncher {
private void waitForEvents(CustomContainerLauncher containerLauncher, private void waitForEvents(CustomContainerLauncher containerLauncher,
int expectedNumEvents) throws InterruptedException { int expectedNumEvents) throws InterruptedException {
int timeOut = 20; int timeOut = 0;
while (expectedNumEvents != containerLauncher.numEventsProcessed while (containerLauncher.numEventsProcessing.get() < expectedNumEvents
|| timeOut++ < 20) { && timeOut++ < 20) {
LOG.info("Waiting for number of events to become " + expectedNumEvents LOG.info("Waiting for number of events to become " + expectedNumEvents
+ ". It is now " + containerLauncher.numEventsProcessed); + ". It is now " + containerLauncher.numEventsProcessing.get());
Thread.sleep(1000); Thread.sleep(1000);
} }
Assert Assert.assertEquals(expectedNumEvents,
.assertEquals(expectedNumEvents, containerLauncher.numEventsProcessed); containerLauncher.numEventsProcessing.get());
} }
@Test @Test
@ -244,9 +259,11 @@ public class TestContainerLauncher {
private final class CustomContainerLauncher extends ContainerLauncherImpl { private final class CustomContainerLauncher extends ContainerLauncherImpl {
private volatile int expectedCorePoolSize = 0; private volatile int expectedCorePoolSize = 0;
private volatile int numEventsProcessed = 0; private AtomicInteger numEventsProcessing = new AtomicInteger(0);
private AtomicInteger numEventsProcessed = new AtomicInteger(0);
private volatile String foundErrors = null; private volatile String foundErrors = null;
private volatile boolean finishEventHandling; private volatile boolean finishEventHandling;
private CustomContainerLauncher(AppContext context) { private CustomContainerLauncher(AppContext context) {
super(context); super(context);
} }
@ -255,8 +272,38 @@ public class TestContainerLauncher {
return super.launcherPool; return super.launcherPool;
} }
private final class CustomEventProcessor extends
ContainerLauncherImpl.EventProcessor {
private final ContainerLauncherEvent event;
private CustomEventProcessor(ContainerLauncherEvent event) {
super(event);
this.event = event;
}
@Override
public void run() {
// do nothing substantial
LOG.info("Processing the event " + event.toString());
numEventsProcessing.incrementAndGet();
// Stall
while (!finishEventHandling) {
synchronized (this) {
try {
wait(1000);
} catch (InterruptedException e) {
;
}
}
}
numEventsProcessed.incrementAndGet();
}
}
protected ContainerLauncherImpl.EventProcessor createEventProcessor( protected ContainerLauncherImpl.EventProcessor createEventProcessor(
ContainerLauncherEvent event) { final ContainerLauncherEvent event) {
// At this point of time, the EventProcessor is being created and so no // At this point of time, the EventProcessor is being created and so no
// additional threads would have been created. // additional threads would have been created.
@ -266,23 +313,7 @@ public class TestContainerLauncher {
+ launcherPool.getCorePoolSize(); + launcherPool.getCorePoolSize();
} }
return new ContainerLauncherImpl.EventProcessor(event) { return new CustomEventProcessor(event);
@Override
public void run() {
// do nothing substantial
numEventsProcessed++;
// Stall
synchronized(this) {
try {
while(!finishEventHandling) {
wait(1000);
}
} catch (InterruptedException e) {
;
}
}
}
};
} }
} }

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
/****************************************************************** /******************************************************************
@ -51,6 +52,9 @@ public class JobHistoryServer extends CompositeService {
@Override @Override
public synchronized void init(Configuration conf) { public synchronized void init(Configuration conf) {
Configuration config = new YarnConfiguration(conf); Configuration config = new YarnConfiguration(conf);
config.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
try { try {
doSecureLogin(conf); doSecureLogin(conf);
} catch(IOException ie) { } catch(IOException ie) {

View File

@ -53,7 +53,6 @@ public class TestJobHistoryEvents {
@Test @Test
public void testHistoryEvents() throws Exception { public void testHistoryEvents() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.USER_NAME, "test");
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true); MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true);
app.submit(conf); app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next(); Job job = app.getContext().getAllJobs().values().iterator().next();
@ -102,7 +101,6 @@ public class TestJobHistoryEvents {
public void testEventsFlushOnStop() throws Exception { public void testEventsFlushOnStop() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.USER_NAME, "test");
MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
.getClass().getName(), true); .getClass().getName(), true);
app.submit(conf); app.submit(conf);

View File

@ -193,6 +193,12 @@
<Method name="dispatch" /> <Method name="dispatch" />
<Bug pattern="DM_EXIT" /> <Bug pattern="DM_EXIT" />
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$SchedulerEventDispatcher$EventProcessor" />
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
<!-- Ignore heartbeat exception when killing localizer --> <!-- Ignore heartbeat exception when killing localizer -->
<Match> <Match>

View File

@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
@ -48,22 +49,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
private boolean exitOnDispatchException; private boolean exitOnDispatchException;
public AsyncDispatcher() { public AsyncDispatcher() {
this(new HashMap<Class<? extends Enum>, EventHandler>(), this(new LinkedBlockingQueue<Event>());
new LinkedBlockingQueue<Event>(), true);
} }
public AsyncDispatcher(boolean exitOnException) { public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
this(new HashMap<Class<? extends Enum>, EventHandler>(),
new LinkedBlockingQueue<Event>(), exitOnException);
}
AsyncDispatcher(
Map<Class<? extends Enum>, EventHandler> eventDispatchers,
BlockingQueue<Event> eventQueue, boolean exitOnException) {
super("Dispatcher"); super("Dispatcher");
this.eventQueue = eventQueue; this.eventQueue = eventQueue;
this.eventDispatchers = eventDispatchers; this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
this.exitOnDispatchException = exitOnException;
} }
Runnable createThread() { Runnable createThread() {
@ -86,6 +78,14 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}; };
} }
@Override
public synchronized void init(Configuration conf) {
this.exitOnDispatchException =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.init(conf);
}
@Override @Override
public void start() { public void start() {
//start all the components //start all the components
@ -103,7 +103,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try { try {
eventHandlingThread.join(); eventHandlingThread.join();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.debug("Interrupted Exception while stopping", ie); LOG.warn("Interrupted Exception while stopping", ie);
} }
} }
@ -126,8 +126,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
} }
catch (Throwable t) { catch (Throwable t) {
//TODO Maybe log the state of the queue //TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread. Exiting..", t); LOG.fatal("Error in dispatcher thread", t);
if (exitOnDispatchException) { if (exitOnDispatchException) {
LOG.info("Exiting, bbye..");
System.exit(-1); System.exit(-1);
} }
} }
@ -177,6 +178,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try { try {
eventQueue.put(event); eventQueue.put(event);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("AsyncDispatcher thread interrupted", e);
throw new YarnException(e); throw new YarnException(e);
} }
}; };

View File

@ -23,8 +23,18 @@ package org.apache.hadoop.yarn.event;
* event handlers based on event types. * event handlers based on event types.
* *
*/ */
@SuppressWarnings("rawtypes")
public interface Dispatcher { public interface Dispatcher {
// Configuration to make sure dispatcher crashes but doesn't do system-exit in
// case of errors. By default, it should be false, so that tests are not
// affected. For all daemons it should be explicitly set to true so that
// daemons can crash instead of hanging around.
public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
"yarn.dispatcher.exit-on-error";
public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
EventHandler getEventHandler(); EventHandler getEventHandler();
void register(Class<? extends Enum> eventType, EventHandler handler); void register(Class<? extends Enum> eventType, EventHandler handler);

View File

@ -17,10 +17,8 @@
*/ */
package org.apache.hadoop.yarn.event; package org.apache.hadoop.yarn.event;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public class DrainDispatcher extends AsyncDispatcher { public class DrainDispatcher extends AsyncDispatcher {
@ -36,7 +34,7 @@ public class DrainDispatcher extends AsyncDispatcher {
} }
private DrainDispatcher(BlockingQueue<Event> eventQueue) { private DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue, true); super(eventQueue);
this.queue = eventQueue; this.queue = eventQueue;
} }

View File

@ -99,6 +99,8 @@ public class NodeManager extends CompositeService implements
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
Context context = new NMContext(); Context context = new NMContext();
// Create the secretManager if need be. // Create the secretManager if need be.

View File

@ -137,6 +137,7 @@ public class ContainerManagerImpl extends CompositeService implements
this.context = context; this.context = context;
this.dirsHandler = dirsHandler; this.dirsHandler = dirsHandler;
// ContainerManager level dispatcher.
dispatcher = new AsyncDispatcher(); dispatcher = new AsyncDispatcher();
this.deletionService = deletionContext; this.deletionService = deletionContext;
this.metrics = metrics; this.metrics = metrics;

View File

@ -376,7 +376,7 @@ public class TestApplication {
WrappedApplication(int id, long timestamp, String user, int numContainers) { WrappedApplication(int id, long timestamp, String user, int numContainers) {
dispatcher = new DrainDispatcher(); dispatcher = new DrainDispatcher();
dispatcher.init(null); dispatcher.init(new Configuration());
localizerBus = mock(EventHandler.class); localizerBus = mock(EventHandler.class);
launcherBus = mock(EventHandler.class); launcherBus = mock(EventHandler.class);

View File

@ -517,7 +517,7 @@ public class TestContainer {
WrappedContainer(int appId, long timestamp, int id, String user, WrappedContainer(int appId, long timestamp, int id, String user,
boolean withLocalRes, boolean withServiceData) { boolean withLocalRes, boolean withServiceData) {
dispatcher = new DrainDispatcher(); dispatcher = new DrainDispatcher();
dispatcher.init(null); dispatcher.init(new Configuration());
localizerBus = mock(EventHandler.class); localizerBus = mock(EventHandler.class);
launcherBus = mock(EventHandler.class); launcherBus = mock(EventHandler.class);

View File

@ -17,6 +17,15 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -29,19 +38,14 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
public class TestLocalizedResource { public class TestLocalizedResource {
@ -62,7 +66,7 @@ public class TestLocalizedResource {
@SuppressWarnings("unchecked") // mocked generic @SuppressWarnings("unchecked") // mocked generic
public void testNotification() throws Exception { public void testNotification() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher(); DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(null); dispatcher.init(new Configuration());
try { try {
dispatcher.start(); dispatcher.start();
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class); EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
@ -175,7 +179,7 @@ public class TestLocalizedResource {
@Test @Test
public void testDirectLocalization() throws Exception { public void testDirectLocalization() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher(); DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(null); dispatcher.init(new Configuration());
try { try {
dispatcher.start(); dispatcher.start();
LocalResource apiRsrc = createMockResource(); LocalResource apiRsrc = createMockResource();

View File

@ -18,8 +18,23 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.net.InetSocketAddress; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -33,7 +48,6 @@ import java.util.Set;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -42,6 +56,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -60,7 +75,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@ -81,13 +95,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
public class TestResourceLocalizationService { public class TestResourceLocalizationService {
@ -98,11 +108,11 @@ public class TestResourceLocalizationService {
public void testLocalizationInit() throws Exception { public void testLocalizationInit() throws Exception {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher(); AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(null); dispatcher.init(new Configuration());
ContainerExecutor exec = mock(ContainerExecutor.class); ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = spy(new DeletionService(exec)); DeletionService delService = spy(new DeletionService(exec));
delService.init(null); delService.init(new Configuration());
delService.start(); delService.start();
AbstractFileSystem spylfs = AbstractFileSystem spylfs =
@ -371,7 +381,7 @@ public class TestResourceLocalizationService {
DeletionService delServiceReal = new DeletionService(exec); DeletionService delServiceReal = new DeletionService(exec);
DeletionService delService = spy(delServiceReal); DeletionService delService = spy(delServiceReal);
delService.init(null); delService.init(new Configuration());
delService.start(); delService.start();
ResourceLocalizationService rawService = ResourceLocalizationService rawService =

View File

@ -131,6 +131,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.conf = conf; this.conf = conf;
this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
this.rmDispatcher = createDispatcher(); this.rmDispatcher = createDispatcher();
addIfService(this.rmDispatcher); addIfService(this.rmDispatcher);
@ -265,6 +267,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private final BlockingQueue<SchedulerEvent> eventQueue = private final BlockingQueue<SchedulerEvent> eventQueue =
new LinkedBlockingQueue<SchedulerEvent>(); new LinkedBlockingQueue<SchedulerEvent>();
private final Thread eventProcessor; private final Thread eventProcessor;
private volatile boolean stopped = false;
public SchedulerEventDispatcher(ResourceScheduler scheduler) { public SchedulerEventDispatcher(ResourceScheduler scheduler) {
super(SchedulerEventDispatcher.class.getName()); super(SchedulerEventDispatcher.class.getName());
@ -285,7 +288,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
SchedulerEvent event; SchedulerEvent event;
while (!Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
try { try {
event = eventQueue.take(); event = eventQueue.take();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -296,9 +299,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
try { try {
scheduler.handle(event); scheduler.handle(event);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType() LOG.fatal("Error in handling event type " + event.getType()
+ " to the scheduler", t); + " to the scheduler", t);
return; // TODO: Kill RM. if (getConfig().getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR)) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
} }
} }
} }
@ -306,6 +313,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override @Override
public synchronized void stop() { public synchronized void stop() {
this.stopped = true;
this.eventProcessor.interrupt(); this.eventProcessor.interrupt();
try { try {
this.eventProcessor.join(); this.eventProcessor.join();