MAPREDUCE-3932. Fix the TaskAttempt state machine to handle CONTIANER_LAUNCHED and CONTIANER_LAUNCH_FAILED events in additional states. (Contributed by Robert Joseph Evans)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1324903 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-04-11 18:20:08 +00:00
parent 415e7fecb1
commit 462f9a5249
4 changed files with 101 additions and 12 deletions

View File

@ -214,6 +214,10 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory
after the history service is stopped. (Jason Lowe via sseth)
MAPREDUCE-3932. Fix the TaskAttempt state machine to handle
CONTIANER_LAUNCHED and CONTIANER_LAUNCH_FAILED events in additional
states. (Robert Joseph Evans via sseth)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -347,6 +347,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.INTERNAL_ERROR))
.addTransition(JobState.ERROR, JobState.ERROR,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// create the topology tables
.installTopology();

View File

@ -316,7 +316,9 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_TIMED_OUT))
@ -338,6 +340,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_TIMED_OUT))
@ -359,7 +362,10 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG))
TaskAttemptEventType.TA_FAILMSG,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
// Transitions from KILL_TASK_CLEANUP
.addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
@ -377,7 +383,10 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG))
TaskAttemptEventType.TA_FAILMSG,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
// Transitions from SUCCEEDED
.addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts
@ -405,7 +414,9 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_ASSIGNED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG))
@ -420,7 +431,9 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_ASSIGNED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG))

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@ -68,6 +69,9 @@ import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@ -81,9 +85,12 @@ import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
@ -91,12 +98,16 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestTaskAttempt{
@SuppressWarnings("rawtypes")
@Test
public void testAttemptContainerRequest() throws Exception {
//WARNING: This test must run first. This is because there is an
// optimization where the credentials passed in are cached statically so
// they do not need to be recomputed when creating a new
// ContainerLaunchContext. if other tests run first this code will cache
// their credentials and this test will fail trying to look for the
// credentials it inserted in.
final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
final byte[] SECRET_KEY = ("secretkey").getBytes();
Map<ApplicationAccessType, String> acls =
@ -185,7 +196,6 @@ public class TestTaskAttempt{
testMRAppHistory(app);
}
@SuppressWarnings("rawtypes")
@Test
public void testSingleRackRequest() throws Exception {
TaskAttemptImpl.RequestContainerTransition rct =
@ -213,11 +223,10 @@ public class TestTaskAttempt{
ContainerRequestEvent cre =
(ContainerRequestEvent) arg.getAllValues().get(1);
String[] requestedRacks = cre.getRacks();
//Only a single occurance of /DefaultRack
//Only a single occurrence of /DefaultRack
assertEquals(1, requestedRacks.length);
}
@SuppressWarnings("rawtypes")
@Test
public void testHostResolveAttempt() throws Exception {
TaskAttemptImpl.RequestContainerTransition rct =
@ -316,14 +325,12 @@ public class TestTaskAttempt{
.getValue());
}
@SuppressWarnings("rawtypes")
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
Clock clock = new SystemClock();
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
}
@SuppressWarnings("rawtypes")
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
@ -394,4 +401,67 @@ public class TestTaskAttempt{
};
}
}
@Test
public void testLaunchFailedWhileKilling() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
Path jobFile = mock(Path.class);
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
jobConf.setBoolean("fs.file.impl.disable.cache", true);
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
new SystemClock(), null);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_SCHEDULE));
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
container, mock(Map.class)));
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_KILL));
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
assertFalse(eventHandler.internalError);
}
public static class MockEventHandler implements EventHandler {
public boolean internalError;
@Override
public void handle(Event event) {
if (event instanceof JobEvent) {
JobEvent je = ((JobEvent) event);
if (JobEventType.INTERNAL_ERROR == je.getType()) {
internalError = true;
}
}
}
};
}