diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c00aed6bd95..f54f14c7db7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1277,6 +1277,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2874. Fix formatting of ApplicationId in web-ui. (Eric Payne via acmurthy) + MAPREDUCE-2995. Better handling of expired containers in MapReduce + ApplicationMaster. (vinodkv via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index c1feb7d77d3..3d4dcb5ed0e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -204,6 +204,11 @@ TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition( .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, new DeallocateContainerTransition(TaskAttemptState.FAILED, false)) + .addTransition(TaskAttemptState.ASSIGNED, + TaskAttemptState.FAIL_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_CONTAINER_COMPLETED, + CLEANUP_CONTAINER_TRANSITION) + // ^ If RM kills the container due to expiry, preemption etc. .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) @@ -925,7 +930,8 @@ public void handle(TaskAttemptEvent event) { try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); + LOG.error("Can't handle this event at current state for " + + this.attemptId, e); eventHandler.handle(new JobDiagnosticsUpdateEvent( this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() + " on TaskAttempt " + this.attemptId)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index cd2a540b978..71f8823e687 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -528,7 +528,8 @@ public void handle(TaskEvent event) { try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); + LOG.error("Can't handle this event at current state for " + + this.taskId, e); internalError(event.getType()); } if (oldState != getState()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 4744035f5e2..e08d4f2a316 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app; +import java.io.IOException; import java.util.Iterator; import java.util.Map; @@ -36,6 +37,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; +import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerToken; import org.junit.Test; /** @@ -160,6 +167,74 @@ public void testTimedOutTask() throws Exception { } } + @Test + public void testTaskFailWithUnusedContainer() throws Exception { + MRApp app = new FailingTaskWithUnusedContainer(); + Configuration conf = new Configuration(); + int maxAttempts = 1; + conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); + // disable uberization (requires entire job to be reattempted, so max for + // subtask attempts is overridden to 1) + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Map tasks = job.getTasks(); + Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); + Task task = tasks.values().iterator().next(); + app.waitForState(task, TaskState.SCHEDULED); + Map attempts = tasks.values().iterator() + .next().getAttempts(); + Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts + .size()); + TaskAttempt attempt = attempts.values().iterator().next(); + app.waitForState(attempt, TaskAttemptState.ASSIGNED); + app.getDispatcher().getEventHandler().handle( + new TaskAttemptEvent(attempt.getID(), + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + app.waitForState(job, JobState.FAILED); + } + + static class FailingTaskWithUnusedContainer extends MRApp { + + public FailingTaskWithUnusedContainer() { + super(1, 0, false, "TaskFailWithUnsedContainer", true); + } + + protected ContainerLauncher createContainerLauncher(AppContext context, + boolean isLocal) { + return new ContainerLauncherImpl(context) { + @Override + public void handle(ContainerLauncherEvent event) { + + switch (event.getType()) { + case CONTAINER_REMOTE_LAUNCH: + super.handle(event); + break; + case CONTAINER_REMOTE_CLEANUP: + getContext().getEventHandler().handle( + new TaskAttemptEvent(event.getTaskAttemptID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + break; + } + } + + @Override + protected ContainerManager getCMProxy(ContainerId containerID, + String containerManagerBindAddr, ContainerToken containerToken) + throws IOException { + try { + synchronized (this) { + wait(); // Just hang the thread simulating a very slow NM. + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + return null; + } + }; + }; + } + static class TimeOutTaskMRApp extends MRApp { TimeOutTaskMRApp(int maps, int reduces) { super(maps, reduces, false, "TimeOutTaskMRApp", true); @@ -232,5 +307,6 @@ public static void main(String[] args) throws Exception { t.testTimedOutTask(); t.testMapFailureMaxPercent(); t.testReduceFailureMaxPercent(); + t.testTaskFailWithUnusedContainer(); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 8dfe799a041..8e7ebd9fac7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -46,7 +46,7 @@ public class SchedulerUtils { "Container of a completed application"; public static final String EXPIRED_CONTAINER = - "Container expired since it unused"; + "Container expired since it was unused"; public static final String UNRESERVED_CONTAINER = "Container reservation no longer required.";