From 2ef6051da93843789f09b26ad69e666db227da98 Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Thu, 1 Dec 2011 08:37:14 +0000 Subject: [PATCH] MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev) - Merging r1208994 from trunk git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1208995 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 12 +++++-- .../v2/app/recover/RecoveryService.java | 32 +++++++++++++++---- .../hadoop/mapreduce/v2/app/TestRecovery.java | 29 ++++++++++++++++- .../hadoop/yarn/event/AsyncDispatcher.java | 15 +++++++-- .../hadoop/yarn/event/DrainDispatcher.java | 2 +- 6 files changed, 79 insertions(+), 14 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e4a1ffe8d2c..f1fb6f5d2ac 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -159,6 +159,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3488. Streaming jobs are failing because the main class isnt set in the pom files. (mahadev) + + MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with + java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev) Release 0.23.0 - 2011-11-01 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 7a6b86a0f80..800dfa9d365 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -217,8 +217,7 @@ public class MRAppMaster extends CompositeService { && appAttemptID.getAttemptId() > 1) { LOG.info("Recovery is enabled. " + "Will try to recover from previous life on best effort basis."); - recoveryServ = new RecoveryService(appAttemptID, clock, - committer); + recoveryServ = createRecoveryService(context); addIfService(recoveryServ); dispatcher = recoveryServ.getDispatcher(); clock = recoveryServ.getClock(); @@ -425,6 +424,15 @@ public class MRAppMaster extends CompositeService { return new JobFinishEventHandler(); } + /** + * Create the recovery service. + * @return an instance of the recovery service. + */ + protected Recovery createRecoveryService(AppContext appContext) { + return new RecoveryService(appContext.getApplicationAttemptId(), + appContext.getClock(), getCommitter()); + } + /** Create and initialize (but don't start) a single job. */ protected Job createJob(Configuration conf) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 843e666c873..30cbdae67b0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -76,8 +76,6 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -97,8 +95,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils; public class RecoveryService extends CompositeService implements Recovery { - private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private static final Log LOG = LogFactory.getLog(RecoveryService.class); private final ApplicationAttemptId applicationAttemptId; @@ -120,7 +116,7 @@ public class RecoveryService extends CompositeService implements Recovery { super("RecoveringDispatcher"); this.applicationAttemptId = applicationAttemptId; this.committer = committer; - this.dispatcher = new RecoveryDispatcher(); + this.dispatcher = createRecoveryDispatcher(); this.clock = new ControlledClock(clock); addService((Service) dispatcher); } @@ -209,17 +205,32 @@ public class RecoveryService extends CompositeService implements Recovery { LOG.info("Read completed tasks from history " + completedTasks.size()); } + + protected Dispatcher createRecoveryDispatcher() { + return new RecoveryDispatcher(); + } + + protected Dispatcher createRecoveryDispatcher(boolean exitOnException) { + return new RecoveryDispatcher(exitOnException); + } + @SuppressWarnings("rawtypes") class RecoveryDispatcher extends AsyncDispatcher { private final EventHandler actualHandler; private final EventHandler handler; - RecoveryDispatcher() { + RecoveryDispatcher(boolean exitOnException) { + super(exitOnException); actualHandler = super.getEventHandler(); handler = new InterceptingEventHandler(actualHandler); } + RecoveryDispatcher() { + this(false); + } + @Override + @SuppressWarnings("unchecked") public void dispatch(Event event) { if (recoveryMode) { if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { @@ -267,6 +278,10 @@ public class RecoveryService extends CompositeService implements Recovery { } } } + realDispatch(event); + } + + public void realDispatch(Event event) { super.dispatch(event); } @@ -281,6 +296,7 @@ public class RecoveryService extends CompositeService implements Recovery { return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id)); } + @SuppressWarnings({"rawtypes", "unchecked"}) private class InterceptingEventHandler implements EventHandler { EventHandler actualHandler; @@ -407,7 +423,9 @@ public class RecoveryService extends CompositeService implements Recovery { LOG.info("Sending assigned event to " + yarnAttemptID); ContainerId cId = attemptInfo.getContainerId(); - NodeId nodeId = ConverterUtils.toNodeId(attemptInfo.getHostname()); + NodeId nodeId = + ConverterUtils.toNodeId(attemptInfo.getHostname() + ":" + + attemptInfo.getPort()); // Resource/Priority/ApplicationACLs are only needed while launching the // container on an NM, these are already completed tasks, so setting them // to null diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 277b097da4f..ec492de7fed 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -52,7 +52,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; +import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; @@ -407,6 +412,13 @@ public class TestRecovery { super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); } + @Override + protected Recovery createRecoveryService(AppContext appContext) { + return new RecoveryServiceWithCustomDispatcher( + appContext.getApplicationAttemptId(), appContext.getClock(), + getCommitter()); + } + @Override protected ContainerLauncher createContainerLauncher(AppContext context) { MockContainerLauncher launcher = new MockContainerLauncher(); @@ -422,7 +434,22 @@ public class TestRecovery { return eventHandler; } } - + + class RecoveryServiceWithCustomDispatcher extends RecoveryService { + + public RecoveryServiceWithCustomDispatcher( + ApplicationAttemptId applicationAttemptId, Clock clock, + OutputCommitter committer) { + super(applicationAttemptId, clock, committer); + } + + @Override + public Dispatcher createRecoveryDispatcher() { + return super.createRecoveryDispatcher(false); + } + + } + public static void main(String[] arg) throws Exception { TestRecovery test = new TestRecovery(); test.testCrashed(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 278d671ea50..8a5fceecb09 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -45,18 +45,25 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { private Thread eventHandlingThread; protected final Map, EventHandler> eventDispatchers; + private boolean exitOnDispatchException; public AsyncDispatcher() { this(new HashMap, EventHandler>(), - new LinkedBlockingQueue()); + new LinkedBlockingQueue(), true); + } + + public AsyncDispatcher(boolean exitOnException) { + this(new HashMap, EventHandler>(), + new LinkedBlockingQueue(), exitOnException); } AsyncDispatcher( Map, EventHandler> eventDispatchers, - BlockingQueue eventQueue) { + BlockingQueue eventQueue, boolean exitOnException) { super("Dispatcher"); this.eventQueue = eventQueue; this.eventDispatchers = eventDispatchers; + this.exitOnDispatchException = exitOnException; } Runnable createThread() { @@ -118,7 +125,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { catch (Throwable t) { //TODO Maybe log the state of the queue LOG.fatal("Error in dispatcher thread. Exiting..", t); - System.exit(-1); + if (exitOnDispatchException) { + System.exit(-1); + } } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 8a61f6f5763..20d7dfca94c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -36,7 +36,7 @@ public class DrainDispatcher extends AsyncDispatcher { } private DrainDispatcher(BlockingQueue eventQueue) { - super(new HashMap, EventHandler>(), eventQueue); + super(new HashMap, EventHandler>(), eventQueue, true); this.queue = eventQueue; }