diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 17caf5a0de5..473ea7dd579 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -313,6 +313,10 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory after the history service is stopped. (Jason Lowe via sseth) + MAPREDUCE-3932. Fix the TaskAttempt state machine to handle + CONTIANER_LAUNCHED and CONTIANER_LAUNCH_FAILED events in additional + states. (Robert Joseph Evans via sseth) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 98472d33cf1..f2f7a6c848c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -347,6 +347,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_DIAGNOSTIC_UPDATE, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.INTERNAL_ERROR)) + .addTransition(JobState.ERROR, JobState.ERROR, + JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) // create the topology tables .installTopology(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 53c7211d62d..7ac334c8ffc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -316,7 +316,9 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, + // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_TIMED_OUT)) @@ -338,6 +340,7 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) TaskAttemptEventType.TA_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_CONTAINER_LAUNCHED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_TIMED_OUT)) @@ -359,7 +362,10 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) TaskAttemptEventType.TA_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, - TaskAttemptEventType.TA_FAILMSG)) + TaskAttemptEventType.TA_FAILMSG, + // Container launch events can arrive late + TaskAttemptEventType.TA_CONTAINER_LAUNCHED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) // Transitions from KILL_TASK_CLEANUP .addTransition(TaskAttemptState.KILL_TASK_CLEANUP, @@ -377,7 +383,10 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) TaskAttemptEventType.TA_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, - TaskAttemptEventType.TA_FAILMSG)) + TaskAttemptEventType.TA_FAILMSG, + // Container launch events can arrive late + TaskAttemptEventType.TA_CONTAINER_LAUNCHED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) // Transitions from SUCCEEDED .addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts @@ -405,7 +414,9 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) TaskAttemptEventType.TA_ASSIGNED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_UPDATE, + // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG)) @@ -420,7 +431,9 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) TaskAttemptEventType.TA_ASSIGNED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_UPDATE, + // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG)) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 1b54a1ae667..e5ad3fd8226 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -68,6 +69,9 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; @@ -81,9 +85,12 @@ import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; @@ -91,12 +98,16 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; -@SuppressWarnings("unchecked") +@SuppressWarnings({"unchecked", "rawtypes"}) public class TestTaskAttempt{ - - @SuppressWarnings("rawtypes") @Test public void testAttemptContainerRequest() throws Exception { + //WARNING: This test must run first. This is because there is an + // optimization where the credentials passed in are cached statically so + // they do not need to be recomputed when creating a new + // ContainerLaunchContext. if other tests run first this code will cache + // their credentials and this test will fail trying to look for the + // credentials it inserted in. final Text SECRET_KEY_ALIAS = new Text("secretkeyalias"); final byte[] SECRET_KEY = ("secretkey").getBytes(); Map acls = @@ -125,7 +136,7 @@ public void testAttemptContainerRequest() throws Exception { Token jobToken = new Token( ("tokenid").getBytes(), ("tokenpw").getBytes(), new Text("tokenkind"), new Text("tokenservice")); - + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, mock(TaskSplitMetaInfo.class), jobConf, taListener, @@ -134,7 +145,7 @@ public void testAttemptContainerRequest() throws Exception { jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1); - + ContainerLaunchContext launchCtx = TaskAttemptImpl.createContainerLaunchContext(acls, containerId, jobConf, jobToken, taImpl.createRemoteTask(), @@ -185,7 +196,6 @@ public void testMRAppHistoryForReduce() throws Exception { testMRAppHistory(app); } - @SuppressWarnings("rawtypes") @Test public void testSingleRackRequest() throws Exception { TaskAttemptImpl.RequestContainerTransition rct = @@ -213,11 +223,10 @@ public void testSingleRackRequest() throws Exception { ContainerRequestEvent cre = (ContainerRequestEvent) arg.getAllValues().get(1); String[] requestedRacks = cre.getRacks(); - //Only a single occurance of /DefaultRack + //Only a single occurrence of /DefaultRack assertEquals(1, requestedRacks.length); } - @SuppressWarnings("rawtypes") @Test public void testHostResolveAttempt() throws Exception { TaskAttemptImpl.RequestContainerTransition rct = @@ -316,14 +325,12 @@ public void verifySlotMillis(int mapMemMb, int reduceMemMb, .getValue()); } - @SuppressWarnings("rawtypes") private TaskAttemptImpl createMapTaskAttemptImplForTest( EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) { Clock clock = new SystemClock(); return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock); } - @SuppressWarnings("rawtypes") private TaskAttemptImpl createMapTaskAttemptImplForTest( EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) { ApplicationId appId = BuilderUtils.newApplicationId(1, 1); @@ -394,4 +401,67 @@ public void handle(JobHistoryEvent event) { }; } } + + @Test + public void testLaunchFailedWhileKilling() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener, + mock(OutputCommitter.class), mock(Token.class), new Credentials(), + new SystemClock(), null); + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); + assertFalse(eventHandler.internalError); + } + + public static class MockEventHandler implements EventHandler { + public boolean internalError; + + @Override + public void handle(Event event) { + if (event instanceof JobEvent) { + JobEvent je = ((JobEvent) event); + if (JobEventType.INTERNAL_ERROR == je.getType()) { + internalError = true; + } + } + } + + }; }