From 3d8fa1d6c487d4acb3608f5e6b416565316e61ad Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 19 Oct 2012 20:20:03 +0000 Subject: [PATCH] MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many reducers complete consecutively. Contributed by Jason Lowe. svn merge --ignore-ancestry -c 1400264 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1400266 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapred/TaskAttemptListenerImpl.java | 20 ++--- .../hadoop/mapreduce/v2/app/job/Job.java | 3 + .../mapreduce/v2/app/job/impl/JobImpl.java | 28 ++++++- .../mapred/TestTaskAttemptListenerImpl.java | 76 +++++++++++++++++++ .../hadoop/mapreduce/v2/app/MockJobs.java | 6 ++ .../mapreduce/v2/app/TestFetchFailure.java | 12 ++- .../v2/app/TestRuntimeEstimators.java | 6 ++ .../hadoop/mapreduce/v2/hs/CompletedJob.java | 34 ++++++++- .../hadoop/mapreduce/v2/hs/PartialJob.java | 6 ++ .../mapreduce/v2/hs/MockHistoryJobs.java | 6 ++ 11 files changed, 176 insertions(+), 24 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 64db181fe40..e670471742e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -449,6 +449,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4479. Fix parameter order in assertEquals() in TestCombineInputFileFormat.java (Mariappan Asokan via bobby) + MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many + reducers complete consecutively. (Jason Lowe via vinodkv) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index fdcec65a90a..c061dc93b49 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; -import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; @@ -253,31 +251,23 @@ public class TaskAttemptListenerImpl extends CompositeService @Override public MapTaskCompletionEventsUpdate getMapCompletionEvents( - JobID jobIdentifier, int fromEventId, int maxEvents, + JobID jobIdentifier, int startIndex, int maxEvents, TaskAttemptID taskAttemptID) throws IOException { LOG.info("MapCompletionEvents request from " + taskAttemptID.toString() - + ". fromEventID " + fromEventId + " maxEvents " + maxEvents); + + ". startIndex " + startIndex + " maxEvents " + maxEvents); // TODO: shouldReset is never used. See TT. Ask for Removal. boolean shouldReset = false; org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[] events = - context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents( - fromEventId, maxEvents); + context.getJob(attemptID.getTaskId().getJobId()).getMapAttemptCompletionEvents( + startIndex, maxEvents); taskHeartbeatHandler.progressing(attemptID); - - // filter the events to return only map completion events in old format - List mapEvents = new ArrayList(); - for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent event : events) { - if (TaskType.MAP.equals(event.getAttemptId().getTaskId().getTaskType())) { - mapEvents.add(TypeConverter.fromYarn(event)); - } - } return new MapTaskCompletionEventsUpdate( - mapEvents.toArray(new TaskCompletionEvent[0]), shouldReset); + TypeConverter.fromYarn(events), shouldReset); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java index 5fd47158eb3..ffa245bfb40 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java @@ -88,6 +88,9 @@ public interface Job { TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(int fromEventId, int maxEvents); + TaskAttemptCompletionEvent[] + getMapAttemptCompletionEvents(int startIndex, int maxEvents); + /** * @return information for MR AppMasters (previously failed and current) */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 2fd9757ff60..3053f1901f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -190,6 +190,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private int allowedMapFailuresPercent = 0; private int allowedReduceFailuresPercent = 0; private List taskAttemptCompletionEvents; + private List mapAttemptCompletionEvents; private final List diagnostics = new ArrayList(); //task/attempt related datastructures @@ -548,14 +549,28 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, @Override public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents( int fromEventId, int maxEvents) { + return getAttemptCompletionEvents(taskAttemptCompletionEvents, + fromEventId, maxEvents); + } + + @Override + public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + return getAttemptCompletionEvents(mapAttemptCompletionEvents, + startIndex, maxEvents); + } + + private TaskAttemptCompletionEvent[] getAttemptCompletionEvents( + List eventList, + int startIndex, int maxEvents) { TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS; readLock.lock(); try { - if (taskAttemptCompletionEvents.size() > fromEventId) { + if (eventList.size() > startIndex) { int actualMax = Math.min(maxEvents, - (taskAttemptCompletionEvents.size() - fromEventId)); - events = taskAttemptCompletionEvents.subList(fromEventId, - actualMax + fromEventId).toArray(events); + (eventList.size() - startIndex)); + events = eventList.subList(startIndex, + actualMax + startIndex).toArray(events); } return events; } finally { @@ -1070,6 +1085,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.taskAttemptCompletionEvents = new ArrayList( job.numMapTasks + job.numReduceTasks + 10); + job.mapAttemptCompletionEvents = + new ArrayList(job.numMapTasks + 10); job.allowedMapFailuresPercent = job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0); @@ -1341,6 +1358,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, //eventId is equal to index in the arraylist tce.setEventId(job.taskAttemptCompletionEvents.size()); job.taskAttemptCompletionEvents.add(tce); + if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) { + job.mapAttemptCompletionEvents.add(tce); + } TaskAttemptId attemptId = tce.getAttemptId(); TaskId taskId = attemptId.getTaskId(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 8737864e413..dc623e5b153 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -17,20 +17,33 @@ */ package org.apache.hadoop.mapred; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Arrays; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.junit.Test; public class TestTaskAttemptListenerImpl { @@ -115,4 +128,67 @@ public class TestTaskAttemptListenerImpl { listener.stop(); } + + @Test + public void testGetMapCompletionEvents() throws IOException { + TaskAttemptCompletionEvent[] empty = {}; + TaskAttemptCompletionEvent[] taskEvents = { + createTce(0, true, TaskAttemptCompletionEventStatus.OBSOLETE), + createTce(1, false, TaskAttemptCompletionEventStatus.FAILED), + createTce(2, true, TaskAttemptCompletionEventStatus.SUCCEEDED), + createTce(3, false, TaskAttemptCompletionEventStatus.FAILED) }; + TaskAttemptCompletionEvent[] mapEvents = { taskEvents[0], taskEvents[2] }; + Job mockJob = mock(Job.class); + when(mockJob.getTaskAttemptCompletionEvents(0, 100)) + .thenReturn(taskEvents); + when(mockJob.getTaskAttemptCompletionEvents(0, 2)) + .thenReturn(Arrays.copyOfRange(taskEvents, 0, 2)); + when(mockJob.getTaskAttemptCompletionEvents(2, 100)) + .thenReturn(Arrays.copyOfRange(taskEvents, 2, 4)); + when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents); + when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents); + when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty); + + AppContext appCtx = mock(AppContext.class); + when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptListenerImpl listener = + new TaskAttemptListenerImpl(appCtx, secret) { + @Override + protected void registerHeartbeatHandler(Configuration conf) { + taskHeartbeatHandler = hbHandler; + } + }; + Configuration conf = new Configuration(); + listener.init(conf); + listener.start(); + + JobID jid = new JobID("12345", 1); + TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0); + MapTaskCompletionEventsUpdate update = + listener.getMapCompletionEvents(jid, 0, 100, tid); + assertEquals(2, update.events.length); + update = listener.getMapCompletionEvents(jid, 0, 2, tid); + assertEquals(2, update.events.length); + update = listener.getMapCompletionEvents(jid, 2, 100, tid); + assertEquals(0, update.events.length); + } + + private static TaskAttemptCompletionEvent createTce(int eventId, + boolean isMap, TaskAttemptCompletionEventStatus status) { + JobId jid = MRBuilderUtils.newJobId(12345, 1, 1); + TaskId tid = MRBuilderUtils.newTaskId(jid, 0, + isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP + : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + TaskAttemptCompletionEvent tce = recordFactory + .newRecordInstance(TaskAttemptCompletionEvent.class); + tce.setEventId(eventId); + tce.setAttemptId(attemptId); + tce.setStatus(status); + return tce; + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index a9e4e4910c0..638a8da86ca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -555,6 +555,12 @@ public class MockJobs extends MockApps { return null; } + @Override + public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + return null; + } + @Override public Map getTasks(TaskType taskType) { throw new UnsupportedOperationException("Not supported yet."); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java index bc895a4ff10..cdc8537c103 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java @@ -21,8 +21,6 @@ package org.apache.hadoop.mapreduce.v2.app; import java.util.Arrays; import java.util.Iterator; -import junit.framework.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; @@ -40,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEv import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.yarn.event.EventHandler; +import org.junit.Assert; import org.junit.Test; public class TestFetchFailure { @@ -144,6 +143,15 @@ public class TestFetchFailure { TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus()); Assert.assertEquals("Event status not correct for reduce attempt1", TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus()); + + TaskAttemptCompletionEvent mapEvents[] = + job.getMapAttemptCompletionEvents(0, 2); + Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length); + Assert.assertArrayEquals("Unexpected map events", + Arrays.copyOfRange(events, 0, 2), mapEvents); + mapEvents = job.getMapAttemptCompletionEvents(2, 200); + Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length); + Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 80c48233c2f..be897fa37db 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -440,6 +440,12 @@ public class TestRuntimeEstimators { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public TaskAttemptCompletionEvent[] + getMapAttemptCompletionEvents(int startIndex, int maxEvents) { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public String getName() { throw new UnsupportedOperationException("Not supported yet."); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index 1801a1ed87a..cfa7e290595 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.hs; import java.io.IOException; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -81,6 +82,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job private Map mapTasks = new HashMap(); private Map reduceTasks = new HashMap(); private List completionEvents = null; + private List mapCompletionEvents = null; private JobACLsManager aclsMgr; @@ -176,11 +178,28 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job if (completionEvents == null) { constructTaskAttemptCompletionEvents(); } + return getAttemptCompletionEvents(completionEvents, + fromEventId, maxEvents); + } + + @Override + public synchronized TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + if (mapCompletionEvents == null) { + constructTaskAttemptCompletionEvents(); + } + return getAttemptCompletionEvents(mapCompletionEvents, + startIndex, maxEvents); + } + + private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents( + List eventList, + int startIndex, int maxEvents) { TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0]; - if (completionEvents.size() > fromEventId) { + if (eventList.size() > startIndex) { int actualMax = Math.min(maxEvents, - (completionEvents.size() - fromEventId)); - events = completionEvents.subList(fromEventId, actualMax + fromEventId) + (eventList.size() - startIndex)); + events = eventList.subList(startIndex, actualMax + startIndex) .toArray(events); } return events; @@ -190,11 +209,15 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job loadAllTasks(); completionEvents = new LinkedList(); List allTaskAttempts = new LinkedList(); + int numMapAttempts = 0; for (TaskId taskId : tasks.keySet()) { Task task = tasks.get(taskId); for (TaskAttemptId taskAttemptId : task.getAttempts().keySet()) { TaskAttempt taskAttempt = task.getAttempts().get(taskAttemptId); allTaskAttempts.add(taskAttempt); + if (task.getType() == TaskType.MAP) { + ++numMapAttempts; + } } } Collections.sort(allTaskAttempts, new Comparator() { @@ -223,6 +246,8 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job } }); + mapCompletionEvents = + new ArrayList(numMapAttempts); int eventId = 0; for (TaskAttempt taskAttempt : allTaskAttempts) { @@ -253,6 +278,9 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job .getAssignedContainerMgrAddress()); tace.setStatus(taceStatus); completionEvents.add(tace); + if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP) { + mapCompletionEvents.add(tace); + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java index dd5bb01a401..0bfffac1b07 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java @@ -153,6 +153,12 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job { return null; } + @Override + public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + return null; + } + @Override public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) { return true; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java index 74ca32c98bb..da983948d10 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java @@ -125,6 +125,12 @@ public class MockHistoryJobs extends MockJobs { return job.getTaskAttemptCompletionEvents(fromEventId, maxEvents); } + @Override + public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + return job.getMapAttemptCompletionEvents(startIndex, maxEvents); + } + @Override public Map getTasks() { return job.getTasks();