MAPREDUCE-6384. Add the last reporting reducer info for too many fetch failure diagnostics. Contributed by Chang Li

This commit is contained in:
Jason Lowe 2015-06-30 21:22:30 +00:00
parent 147e020c7a
commit b6ba56457c
7 changed files with 162 additions and 84 deletions

View File

@ -364,6 +364,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6408. Queue name and user name should be printed on the job page. MAPREDUCE-6408. Queue name and user name should be printed on the job page.
(Siqi Li via gera) (Siqi Li via gera)
MAPREDUCE-6384. Add the last reporting reducer info for too many fetch
failure diagnostics (Chang Li via jlowe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -28,13 +28,15 @@ public class JobTaskAttemptFetchFailureEvent extends JobEvent {
private final TaskAttemptId reduce; private final TaskAttemptId reduce;
private final List<TaskAttemptId> maps; private final List<TaskAttemptId> maps;
private final String hostname;
public JobTaskAttemptFetchFailureEvent(TaskAttemptId reduce, public JobTaskAttemptFetchFailureEvent(TaskAttemptId reduce,
List<TaskAttemptId> maps) { List<TaskAttemptId> maps, String host) {
super(reduce.getTaskId().getJobId(), super(reduce.getTaskId().getJobId(),
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE); JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE);
this.reduce = reduce; this.reduce = reduce;
this.maps = maps; this.maps = maps;
this.hostname = host;
} }
public List<TaskAttemptId> getMaps() { public List<TaskAttemptId> getMaps() {
@ -45,4 +47,7 @@ public class JobTaskAttemptFetchFailureEvent extends JobEvent {
return reduce; return reduce;
} }
public String getHost() {
return hostname;
}
} }

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
/**
* TaskAttemptTooManyFetchFailureEvent is used for TA_TOO_MANY_FETCH_FAILURE.
*/
public class TaskAttemptTooManyFetchFailureEvent extends TaskAttemptEvent {
private TaskAttemptId reduceID;
private String reduceHostname;
/**
* Create a new TaskAttemptTooManyFetchFailureEvent.
* @param attemptId the id of the mapper task attempt
* @param reduceId the id of the reporting reduce task attempt.
* @param reduceHost the hostname of the reporting reduce task attempt.
*/
public TaskAttemptTooManyFetchFailureEvent(TaskAttemptId attemptId,
TaskAttemptId reduceId, String reduceHost) {
super(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE);
this.reduceID = reduceId;
this.reduceHostname = reduceHost;
}
public TaskAttemptId getReduceId() {
return reduceID;
}
public String getReduceHost() {
return reduceHostname;
}
}

View File

@ -103,9 +103,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
@ -1914,8 +1913,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
&& failureRate >= job.getMaxAllowedFetchFailuresFraction()) { && failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
LOG.info("Too many fetch-failures for output of task attempt: " + LOG.info("Too many fetch-failures for output of task attempt: " +
mapId + " ... raising fetch failure to map"); mapId + " ... raising fetch failure to map");
job.eventHandler.handle(new TaskAttemptEvent(mapId, job.eventHandler.handle(new TaskAttemptTooManyFetchFailureEvent(mapId,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); fetchfailureEvent.getReduce(), fetchfailureEvent.getHost()));
job.fetchFailuresMapping.remove(mapId); job.fetchFailuresMapping.remove(mapId);
} }
} }

View File

@ -95,6 +95,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
@ -1916,12 +1917,17 @@ public abstract class TaskAttemptImpl implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
TaskAttemptTooManyFetchFailureEvent fetchFailureEvent =
(TaskAttemptTooManyFetchFailureEvent) event;
// too many fetch failure can only happen for map tasks // too many fetch failure can only happen for map tasks
Preconditions Preconditions
.checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP); .checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP);
//add to diagnostic //add to diagnostic
taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt"); taskAttempt.addDiagnosticInfo("Too many fetch failures."
+ " Failing the attempt. Last failure reported by " +
fetchFailureEvent.getReduceId() +
" from host " + fetchFailureEvent.getReduceHost());
if (taskAttempt.getLaunchTime() != 0) { if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, true)); .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
@ -2225,8 +2231,11 @@ public abstract class TaskAttemptImpl implements
//this only will happen in reduce attempt type //this only will happen in reduce attempt type
if (taskAttempt.reportedStatus.fetchFailedMaps != null && if (taskAttempt.reportedStatus.fetchFailedMaps != null &&
taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) { taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
String hostname = taskAttempt.container == null ? "UNKNOWN"
: taskAttempt.container.getNodeId().getHost();
taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent( taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent(
taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps)); taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps,
hostname));
} }
} }
} }

View File

@ -94,10 +94,10 @@ public class TestFetchFailure {
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
//send 3 fetch failures from reduce to trigger map re execution //send 3 fetch failures from reduce to trigger map re execution
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
//wait for map Task state move back to RUNNING //wait for map Task state move back to RUNNING
app.waitForState(mapTask, TaskState.RUNNING); app.waitForState(mapTask, TaskState.RUNNING);
@ -215,9 +215,9 @@ public class TestFetchFailure {
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
//send 3 fetch failures from reduce to trigger map re execution //send 3 fetch failures from reduce to trigger map re execution
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
//wait for map Task state move back to RUNNING //wait for map Task state move back to RUNNING
app.waitForState(mapTask, TaskState.RUNNING); app.waitForState(mapTask, TaskState.RUNNING);
@ -324,8 +324,8 @@ public class TestFetchFailure {
updateStatus(app, reduceAttempt3, Phase.SHUFFLE); updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
//send 2 fetch failures from reduce to prepare for map re execution //send 2 fetch failures from reduce to prepare for map re execution
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt, mapAttempt1, "host1");
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt2, mapAttempt1, "host2");
//We should not re-launch the map task yet //We should not re-launch the map task yet
assertEquals(TaskState.SUCCEEDED, mapTask.getState()); assertEquals(TaskState.SUCCEEDED, mapTask.getState());
@ -333,7 +333,7 @@ public class TestFetchFailure {
updateStatus(app, reduceAttempt3, Phase.REDUCE); updateStatus(app, reduceAttempt3, Phase.REDUCE);
//send 3rd fetch failures from reduce to trigger map re execution //send 3rd fetch failures from reduce to trigger map re execution
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt3, mapAttempt1, "host3");
//wait for map Task state move back to RUNNING //wait for map Task state move back to RUNNING
app.waitForState(mapTask, TaskState.RUNNING); app.waitForState(mapTask, TaskState.RUNNING);
@ -342,6 +342,11 @@ public class TestFetchFailure {
Assert.assertEquals("Map TaskAttempt state not correct", Assert.assertEquals("Map TaskAttempt state not correct",
TaskAttemptState.FAILED, mapAttempt1.getState()); TaskAttemptState.FAILED, mapAttempt1.getState());
Assert.assertEquals(mapAttempt1.getDiagnostics().get(0),
"Too many fetch failures. Failing the attempt. "
+ "Last failure reported by "
+ reduceAttempt3.getID().toString() + " from host host3");
Assert.assertEquals("Num attempts in Map Task not correct", Assert.assertEquals("Num attempts in Map Task not correct",
2, mapTask.getAttempts().size()); 2, mapTask.getAttempts().size());
@ -410,7 +415,6 @@ public class TestFetchFailure {
Assert.assertEquals("Unexpected map event", convertedEvents[2], Assert.assertEquals("Unexpected map event", convertedEvents[2],
mapEvents[0]); mapEvents[0]);
} }
private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) { private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus(); TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
@ -430,11 +434,12 @@ public class TestFetchFailure {
} }
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
TaskAttempt mapAttempt) { TaskAttempt mapAttempt, String hostname) {
app.getContext().getEventHandler().handle( app.getContext().getEventHandler().handle(
new JobTaskAttemptFetchFailureEvent( new JobTaskAttemptFetchFailureEvent(
reduceAttempt.getID(), reduceAttempt.getID(),
Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()}))); Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()}),
hostname));
} }
static class MRAppWithHistory extends MRApp { static class MRAppWithHistory extends MRApp {

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
@ -507,6 +508,9 @@ public class TestTaskAttempt{
JobId jobId = MRBuilderUtils.newJobId(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
TaskId reduceTaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
TaskAttemptId reduceTAId =
MRBuilderUtils.newTaskAttemptId(reduceTaskId, 0);
Path jobFile = mock(Path.class); Path jobFile = mock(Path.class);
MockEventHandler eventHandler = new MockEventHandler(); MockEventHandler eventHandler = new MockEventHandler();
@ -554,8 +558,8 @@ public class TestTaskAttempt{
assertEquals("Task attempt is not in succeeded state", taImpl.getState(), assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED); TaskAttemptState.SUCCEEDED);
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); reduceTAId, "Host"));
assertEquals("Task attempt is not in FAILED state", taImpl.getState(), assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
TaskAttemptState.FAILED); TaskAttemptState.FAILED);
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptEvent(attemptId,
@ -735,72 +739,75 @@ public class TestTaskAttempt{
@Test @Test
public void testFetchFailureAttemptFinishTime() throws Exception{ public void testFetchFailureAttemptFinishTime() throws Exception{
ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0); ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
Path jobFile = mock(Path.class); TaskId reducetaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
TaskAttemptId reduceTAId =
MRBuilderUtils.newTaskAttemptId(reducetaskId, 0);
Path jobFile = mock(Path.class);
MockEventHandler eventHandler = new MockEventHandler(); MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class); TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn( when(taListener.getAddress()).thenReturn(
new InetSocketAddress("localhost", 0)); new InetSocketAddress("localhost", 0));
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.setBoolean("fs.file.impl.disable.cache", true);
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
AppContext appCtx = mock(AppContext.class); AppContext appCtx = mock(AppContext.class);
ClusterInfo clusterInfo = mock(ClusterInfo.class); ClusterInfo clusterInfo = mock(ClusterInfo.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,mock(Token.class), new Credentials(), splits, jobConf, taListener,mock(Token.class), new Credentials(),
new SystemClock(), appCtx); new SystemClock(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
Container container = mock(Container.class); Container container = mock(Container.class);
when(container.getId()).thenReturn(contId); when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid); when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0"); when(container.getNodeHttpAddress()).thenReturn("localhost:0");
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_SCHEDULE)); TaskAttemptEventType.TA_SCHEDULE));
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
container, mock(Map.class))); container, mock(Map.class)));
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_COMPLETED)); TaskAttemptEventType.TA_CONTAINER_COMPLETED));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(), assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED); TaskAttemptState.SUCCEEDED);
assertTrue("Task Attempt finish time is not greater than 0", assertTrue("Task Attempt finish time is not greater than 0",
taImpl.getFinishTime() > 0); taImpl.getFinishTime() > 0);
Long finishTime = taImpl.getFinishTime(); Long finishTime = taImpl.getFinishTime();
Thread.sleep(5); Thread.sleep(5);
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); reduceTAId, "Host"));
assertEquals("Task attempt is not in Too Many Fetch Failure state", assertEquals("Task attempt is not in Too Many Fetch Failure state",
taImpl.getState(), TaskAttemptState.FAILED); taImpl.getState(), TaskAttemptState.FAILED);
assertEquals("After TA_TOO_MANY_FETCH_FAILURE," assertEquals("After TA_TOO_MANY_FETCH_FAILURE,"
+ " Task attempt finish time is not the same ", + " Task attempt finish time is not the same ",
finishTime, Long.valueOf(taImpl.getFinishTime())); finishTime, Long.valueOf(taImpl.getFinishTime()));
} }
@Test @Test