MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev) - Merging r1208994 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1208995 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-12-01 08:37:14 +00:00
parent ca3fb12d09
commit 2ef6051da9
6 changed files with 79 additions and 14 deletions

View File

@ -159,6 +159,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3488. Streaming jobs are failing because the main class MAPREDUCE-3488. Streaming jobs are failing because the main class
isnt set in the pom files. (mahadev) isnt set in the pom files. (mahadev)
MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with
java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01

View File

@ -217,8 +217,7 @@ public class MRAppMaster extends CompositeService {
&& appAttemptID.getAttemptId() > 1) { && appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. " LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis."); + "Will try to recover from previous life on best effort basis.");
recoveryServ = new RecoveryService(appAttemptID, clock, recoveryServ = createRecoveryService(context);
committer);
addIfService(recoveryServ); addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher(); dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock(); clock = recoveryServ.getClock();
@ -425,6 +424,15 @@ public class MRAppMaster extends CompositeService {
return new JobFinishEventHandler(); return new JobFinishEventHandler();
} }
/**
* Create the recovery service.
* @return an instance of the recovery service.
*/
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryService(appContext.getApplicationAttemptId(),
appContext.getClock(), getCommitter());
}
/** Create and initialize (but don't start) a single job. */ /** Create and initialize (but don't start) a single job. */
protected Job createJob(Configuration conf) { protected Job createJob(Configuration conf) {

View File

@ -76,8 +76,6 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
@ -97,8 +95,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
public class RecoveryService extends CompositeService implements Recovery { public class RecoveryService extends CompositeService implements Recovery {
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(RecoveryService.class); private static final Log LOG = LogFactory.getLog(RecoveryService.class);
private final ApplicationAttemptId applicationAttemptId; private final ApplicationAttemptId applicationAttemptId;
@ -120,7 +116,7 @@ public class RecoveryService extends CompositeService implements Recovery {
super("RecoveringDispatcher"); super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId; this.applicationAttemptId = applicationAttemptId;
this.committer = committer; this.committer = committer;
this.dispatcher = new RecoveryDispatcher(); this.dispatcher = createRecoveryDispatcher();
this.clock = new ControlledClock(clock); this.clock = new ControlledClock(clock);
addService((Service) dispatcher); addService((Service) dispatcher);
} }
@ -209,17 +205,32 @@ public class RecoveryService extends CompositeService implements Recovery {
LOG.info("Read completed tasks from history " LOG.info("Read completed tasks from history "
+ completedTasks.size()); + completedTasks.size());
} }
protected Dispatcher createRecoveryDispatcher() {
return new RecoveryDispatcher();
}
protected Dispatcher createRecoveryDispatcher(boolean exitOnException) {
return new RecoveryDispatcher(exitOnException);
}
@SuppressWarnings("rawtypes")
class RecoveryDispatcher extends AsyncDispatcher { class RecoveryDispatcher extends AsyncDispatcher {
private final EventHandler actualHandler; private final EventHandler actualHandler;
private final EventHandler handler; private final EventHandler handler;
RecoveryDispatcher() { RecoveryDispatcher(boolean exitOnException) {
super(exitOnException);
actualHandler = super.getEventHandler(); actualHandler = super.getEventHandler();
handler = new InterceptingEventHandler(actualHandler); handler = new InterceptingEventHandler(actualHandler);
} }
RecoveryDispatcher() {
this(false);
}
@Override @Override
@SuppressWarnings("unchecked")
public void dispatch(Event event) { public void dispatch(Event event) {
if (recoveryMode) { if (recoveryMode) {
if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
@ -267,6 +278,10 @@ public class RecoveryService extends CompositeService implements Recovery {
} }
} }
} }
realDispatch(event);
}
public void realDispatch(Event event) {
super.dispatch(event); super.dispatch(event);
} }
@ -281,6 +296,7 @@ public class RecoveryService extends CompositeService implements Recovery {
return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id)); return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
} }
@SuppressWarnings({"rawtypes", "unchecked"})
private class InterceptingEventHandler implements EventHandler { private class InterceptingEventHandler implements EventHandler {
EventHandler actualHandler; EventHandler actualHandler;
@ -407,7 +423,9 @@ public class RecoveryService extends CompositeService implements Recovery {
LOG.info("Sending assigned event to " + yarnAttemptID); LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = attemptInfo.getContainerId(); ContainerId cId = attemptInfo.getContainerId();
NodeId nodeId = ConverterUtils.toNodeId(attemptInfo.getHostname()); NodeId nodeId =
ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
+ attemptInfo.getPort());
// Resource/Priority/ApplicationACLs are only needed while launching the // Resource/Priority/ApplicationACLs are only needed while launching the
// container on an NM, these are already completed tasks, so setting them // container on an NM, these are already completed tasks, so setting them
// to null // to null

View File

@ -52,7 +52,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test; import org.junit.Test;
@ -407,6 +412,13 @@ public class TestRecovery {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
} }
@Override
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryServiceWithCustomDispatcher(
appContext.getApplicationAttemptId(), appContext.getClock(),
getCommitter());
}
@Override @Override
protected ContainerLauncher createContainerLauncher(AppContext context) { protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher(); MockContainerLauncher launcher = new MockContainerLauncher();
@ -422,7 +434,22 @@ public class TestRecovery {
return eventHandler; return eventHandler;
} }
} }
class RecoveryServiceWithCustomDispatcher extends RecoveryService {
public RecoveryServiceWithCustomDispatcher(
ApplicationAttemptId applicationAttemptId, Clock clock,
OutputCommitter committer) {
super(applicationAttemptId, clock, committer);
}
@Override
public Dispatcher createRecoveryDispatcher() {
return super.createRecoveryDispatcher(false);
}
}
public static void main(String[] arg) throws Exception { public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery(); TestRecovery test = new TestRecovery();
test.testCrashed(); test.testCrashed();

View File

@ -45,18 +45,25 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
private Thread eventHandlingThread; private Thread eventHandlingThread;
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
private boolean exitOnDispatchException;
public AsyncDispatcher() { public AsyncDispatcher() {
this(new HashMap<Class<? extends Enum>, EventHandler>(), this(new HashMap<Class<? extends Enum>, EventHandler>(),
new LinkedBlockingQueue<Event>()); new LinkedBlockingQueue<Event>(), true);
}
public AsyncDispatcher(boolean exitOnException) {
this(new HashMap<Class<? extends Enum>, EventHandler>(),
new LinkedBlockingQueue<Event>(), exitOnException);
} }
AsyncDispatcher( AsyncDispatcher(
Map<Class<? extends Enum>, EventHandler> eventDispatchers, Map<Class<? extends Enum>, EventHandler> eventDispatchers,
BlockingQueue<Event> eventQueue) { BlockingQueue<Event> eventQueue, boolean exitOnException) {
super("Dispatcher"); super("Dispatcher");
this.eventQueue = eventQueue; this.eventQueue = eventQueue;
this.eventDispatchers = eventDispatchers; this.eventDispatchers = eventDispatchers;
this.exitOnDispatchException = exitOnException;
} }
Runnable createThread() { Runnable createThread() {
@ -118,7 +125,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
catch (Throwable t) { catch (Throwable t) {
//TODO Maybe log the state of the queue //TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread. Exiting..", t); LOG.fatal("Error in dispatcher thread. Exiting..", t);
System.exit(-1); if (exitOnDispatchException) {
System.exit(-1);
}
} }
} }

View File

@ -36,7 +36,7 @@ public class DrainDispatcher extends AsyncDispatcher {
} }
private DrainDispatcher(BlockingQueue<Event> eventQueue) { private DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue); super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue, true);
this.queue = eventQueue; this.queue = eventQueue;
} }