From b6ba56457c6b01dae795c11d587c3fe3855ee707 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 30 Jun 2015 21:22:30 +0000 Subject: [PATCH] MAPREDUCE-6384. Add the last reporting reducer info for too many fetch failure diagnostics. Contributed by Chang Li --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../JobTaskAttemptFetchFailureEvent.java | 9 +- .../TaskAttemptTooManyFetchFailureEvent.java | 50 +++++++ .../mapreduce/v2/app/job/impl/JobImpl.java | 7 +- .../v2/app/job/impl/TaskAttemptImpl.java | 15 +- .../mapreduce/v2/app/TestFetchFailure.java | 31 +++-- .../v2/app/job/impl/TestTaskAttempt.java | 131 +++++++++--------- 7 files changed, 162 insertions(+), 84 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 785fce853f4..5a4d8263a18 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -364,6 +364,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6408. Queue name and user name should be printed on the job page. (Siqi Li via gera) + MAPREDUCE-6384. Add the last reporting reducer info for too many fetch + failure diagnostics (Chang Li via jlowe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java index 37e2034d6a0..787711cd8ad 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java @@ -28,13 +28,15 @@ public class JobTaskAttemptFetchFailureEvent extends JobEvent { private final TaskAttemptId reduce; private final List maps; + private final String hostname; public JobTaskAttemptFetchFailureEvent(TaskAttemptId reduce, - List maps) { - super(reduce.getTaskId().getJobId(), + List maps, String host) { + super(reduce.getTaskId().getJobId(), JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE); this.reduce = reduce; this.maps = maps; + this.hostname = host; } public List getMaps() { @@ -45,4 +47,7 @@ public class JobTaskAttemptFetchFailureEvent extends JobEvent { return reduce; } + public String getHost() { + return hostname; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java new file mode 100644 index 00000000000..662e712f7d0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java @@ -0,0 +1,50 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; + +/** + * TaskAttemptTooManyFetchFailureEvent is used for TA_TOO_MANY_FETCH_FAILURE. + */ +public class TaskAttemptTooManyFetchFailureEvent extends TaskAttemptEvent { + private TaskAttemptId reduceID; + private String reduceHostname; + + /** + * Create a new TaskAttemptTooManyFetchFailureEvent. + * @param attemptId the id of the mapper task attempt + * @param reduceId the id of the reporting reduce task attempt. + * @param reduceHost the hostname of the reporting reduce task attempt. + */ + public TaskAttemptTooManyFetchFailureEvent(TaskAttemptId attemptId, + TaskAttemptId reduceId, String reduceHost) { + super(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE); + this.reduceID = reduceId; + this.reduceHostname = reduceHost; + } + + public TaskAttemptId getReduceId() { + return reduceID; + } + + public String getReduceHost() { + return reduceHostname; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 6e9f13c3900..2c480197353 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -103,9 +103,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; @@ -1914,8 +1913,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, && failureRate >= job.getMaxAllowedFetchFailuresFraction()) { LOG.info("Too many fetch-failures for output of task attempt: " + mapId + " ... raising fetch failure to map"); - job.eventHandler.handle(new TaskAttemptEvent(mapId, - TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); + job.eventHandler.handle(new TaskAttemptTooManyFetchFailureEvent(mapId, + fetchfailureEvent.getReduce(), fetchfailureEvent.getHost())); job.fetchFailuresMapping.remove(mapId); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 3055a25e12d..3fa42fe7fec 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -95,6 +95,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; @@ -1916,12 +1917,17 @@ public abstract class TaskAttemptImpl implements @SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + TaskAttemptTooManyFetchFailureEvent fetchFailureEvent = + (TaskAttemptTooManyFetchFailureEvent) event; // too many fetch failure can only happen for map tasks Preconditions .checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP); //add to diagnostic - taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt"); - + taskAttempt.addDiagnosticInfo("Too many fetch failures." + + " Failing the attempt. Last failure reported by " + + fetchFailureEvent.getReduceId() + + " from host " + fetchFailureEvent.getReduceHost()); + if (taskAttempt.getLaunchTime() != 0) { taskAttempt.eventHandler .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true)); @@ -2225,8 +2231,11 @@ public abstract class TaskAttemptImpl implements //this only will happen in reduce attempt type if (taskAttempt.reportedStatus.fetchFailedMaps != null && taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) { + String hostname = taskAttempt.container == null ? "UNKNOWN" + : taskAttempt.container.getNodeId().getHost(); taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent( - taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps)); + taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps, + hostname)); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java index 4e4e2e71f4c..8d25079b97d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java @@ -94,10 +94,10 @@ public class TestFetchFailure { app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); //send 3 fetch failures from reduce to trigger map re execution - sendFetchFailure(app, reduceAttempt, mapAttempt1); - sendFetchFailure(app, reduceAttempt, mapAttempt1); - sendFetchFailure(app, reduceAttempt, mapAttempt1); - + sendFetchFailure(app, reduceAttempt, mapAttempt1, "host"); + sendFetchFailure(app, reduceAttempt, mapAttempt1, "host"); + sendFetchFailure(app, reduceAttempt, mapAttempt1, "host"); + //wait for map Task state move back to RUNNING app.waitForState(mapTask, TaskState.RUNNING); @@ -215,9 +215,9 @@ public class TestFetchFailure { app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); //send 3 fetch failures from reduce to trigger map re execution - sendFetchFailure(app, reduceAttempt, mapAttempt1); - sendFetchFailure(app, reduceAttempt, mapAttempt1); - sendFetchFailure(app, reduceAttempt, mapAttempt1); + sendFetchFailure(app, reduceAttempt, mapAttempt1, "host"); + sendFetchFailure(app, reduceAttempt, mapAttempt1, "host"); + sendFetchFailure(app, reduceAttempt, mapAttempt1, "host"); //wait for map Task state move back to RUNNING app.waitForState(mapTask, TaskState.RUNNING); @@ -324,8 +324,8 @@ public class TestFetchFailure { updateStatus(app, reduceAttempt3, Phase.SHUFFLE); //send 2 fetch failures from reduce to prepare for map re execution - sendFetchFailure(app, reduceAttempt, mapAttempt1); - sendFetchFailure(app, reduceAttempt, mapAttempt1); + sendFetchFailure(app, reduceAttempt, mapAttempt1, "host1"); + sendFetchFailure(app, reduceAttempt2, mapAttempt1, "host2"); //We should not re-launch the map task yet assertEquals(TaskState.SUCCEEDED, mapTask.getState()); @@ -333,7 +333,7 @@ public class TestFetchFailure { updateStatus(app, reduceAttempt3, Phase.REDUCE); //send 3rd fetch failures from reduce to trigger map re execution - sendFetchFailure(app, reduceAttempt, mapAttempt1); + sendFetchFailure(app, reduceAttempt3, mapAttempt1, "host3"); //wait for map Task state move back to RUNNING app.waitForState(mapTask, TaskState.RUNNING); @@ -342,6 +342,11 @@ public class TestFetchFailure { Assert.assertEquals("Map TaskAttempt state not correct", TaskAttemptState.FAILED, mapAttempt1.getState()); + Assert.assertEquals(mapAttempt1.getDiagnostics().get(0), + "Too many fetch failures. Failing the attempt. " + + "Last failure reported by " + + reduceAttempt3.getID().toString() + " from host host3"); + Assert.assertEquals("Num attempts in Map Task not correct", 2, mapTask.getAttempts().size()); @@ -410,7 +415,6 @@ public class TestFetchFailure { Assert.assertEquals("Unexpected map event", convertedEvents[2], mapEvents[0]); } - private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) { TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus(); @@ -430,11 +434,12 @@ public class TestFetchFailure { } private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, - TaskAttempt mapAttempt) { + TaskAttempt mapAttempt, String hostname) { app.getContext().getEventHandler().handle( new JobTaskAttemptFetchFailureEvent( reduceAttempt.getID(), - Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()}))); + Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()}), + hostname)); } static class MRAppWithHistory extends MRApp { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 79b88d846d1..a88a9358a8f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -70,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; @@ -507,6 +508,9 @@ public class TestTaskAttempt{ JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + TaskId reduceTaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE); + TaskAttemptId reduceTAId = + MRBuilderUtils.newTaskAttemptId(reduceTaskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); @@ -554,8 +558,8 @@ public class TestTaskAttempt{ assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); - taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); + taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId, + reduceTAId, "Host")); assertEquals("Task attempt is not in FAILED state", taImpl.getState(), TaskAttemptState.FAILED); taImpl.handle(new TaskAttemptEvent(attemptId, @@ -735,72 +739,75 @@ public class TestTaskAttempt{ @Test public void testFetchFailureAttemptFinishTime() throws Exception{ - ApplicationId appId = ApplicationId.newInstance(1, 2); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 0); - JobId jobId = MRBuilderUtils.newJobId(appId, 1); - TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); - TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); - Path jobFile = mock(Path.class); + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + TaskId reducetaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE); + TaskAttemptId reduceTAId = + MRBuilderUtils.newTaskAttemptId(reducetaskId, 0); + Path jobFile = mock(Path.class); - MockEventHandler eventHandler = new MockEventHandler(); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - when(taListener.getAddress()).thenReturn( - new InetSocketAddress("localhost", 0)); + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); - JobConf jobConf = new JobConf(); - jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); - jobConf.setBoolean("fs.file.impl.disable.cache", true); - jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); - jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); - TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); - when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); - AppContext appCtx = mock(AppContext.class); - ClusterInfo clusterInfo = mock(ClusterInfo.class); - when(appCtx.getClusterInfo()).thenReturn(clusterInfo); - setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); - TaskAttemptImpl taImpl = - new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, - splits, jobConf, taListener,mock(Token.class), new Credentials(), - new SystemClock(), appCtx); + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener,mock(Token.class), new Credentials(), + new SystemClock(), appCtx); - NodeId nid = NodeId.newInstance("127.0.0.1", 0); - ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); - Container container = mock(Container.class); - when(container.getId()).thenReturn(contId); - when(container.getNodeId()).thenReturn(nid); - when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - - taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_SCHEDULE)); - taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, - container, mock(Map.class))); - taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); - taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_DONE)); - taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_CONTAINER_COMPLETED)); - - assertEquals("Task attempt is not in succeeded state", taImpl.getState(), - TaskAttemptState.SUCCEEDED); - - assertTrue("Task Attempt finish time is not greater than 0", - taImpl.getFinishTime() > 0); - - Long finishTime = taImpl.getFinishTime(); - Thread.sleep(5); - taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); - - assertEquals("Task attempt is not in Too Many Fetch Failure state", - taImpl.getState(), TaskAttemptState.FAILED); - - assertEquals("After TA_TOO_MANY_FETCH_FAILURE," - + " Task attempt finish time is not the same ", - finishTime, Long.valueOf(taImpl.getFinishTime())); + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_DONE)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + + assertEquals("Task attempt is not in succeeded state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + + assertTrue("Task Attempt finish time is not greater than 0", + taImpl.getFinishTime() > 0); + + Long finishTime = taImpl.getFinishTime(); + Thread.sleep(5); + taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId, + reduceTAId, "Host")); + + assertEquals("Task attempt is not in Too Many Fetch Failure state", + taImpl.getState(), TaskAttemptState.FAILED); + + assertEquals("After TA_TOO_MANY_FETCH_FAILURE," + + " Task attempt finish time is not the same ", + finishTime, Long.valueOf(taImpl.getFinishTime())); } @Test