MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many reducers complete consecutively. Contributed by Jason Lowe.
svn merge --ignore-ancestry -c 1400264 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1400266 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
edd3bc5cc2
commit
3d8fa1d6c4
|
@ -449,6 +449,9 @@ Release 0.23.5 - UNRELEASED
|
||||||
MAPREDUCE-4479. Fix parameter order in assertEquals() in
|
MAPREDUCE-4479. Fix parameter order in assertEquals() in
|
||||||
TestCombineInputFileFormat.java (Mariappan Asokan via bobby)
|
TestCombineInputFileFormat.java (Mariappan Asokan via bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many
|
||||||
|
reducers complete consecutively. (Jason Lowe via vinodkv)
|
||||||
|
|
||||||
Release 0.23.4 - UNRELEASED
|
Release 0.23.4 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -22,7 +22,6 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
|
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
|
||||||
|
@ -253,31 +251,23 @@ public class TaskAttemptListenerImpl extends CompositeService
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MapTaskCompletionEventsUpdate getMapCompletionEvents(
|
public MapTaskCompletionEventsUpdate getMapCompletionEvents(
|
||||||
JobID jobIdentifier, int fromEventId, int maxEvents,
|
JobID jobIdentifier, int startIndex, int maxEvents,
|
||||||
TaskAttemptID taskAttemptID) throws IOException {
|
TaskAttemptID taskAttemptID) throws IOException {
|
||||||
LOG.info("MapCompletionEvents request from " + taskAttemptID.toString()
|
LOG.info("MapCompletionEvents request from " + taskAttemptID.toString()
|
||||||
+ ". fromEventID " + fromEventId + " maxEvents " + maxEvents);
|
+ ". startIndex " + startIndex + " maxEvents " + maxEvents);
|
||||||
|
|
||||||
// TODO: shouldReset is never used. See TT. Ask for Removal.
|
// TODO: shouldReset is never used. See TT. Ask for Removal.
|
||||||
boolean shouldReset = false;
|
boolean shouldReset = false;
|
||||||
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
|
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
|
||||||
TypeConverter.toYarn(taskAttemptID);
|
TypeConverter.toYarn(taskAttemptID);
|
||||||
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[] events =
|
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[] events =
|
||||||
context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents(
|
context.getJob(attemptID.getTaskId().getJobId()).getMapAttemptCompletionEvents(
|
||||||
fromEventId, maxEvents);
|
startIndex, maxEvents);
|
||||||
|
|
||||||
taskHeartbeatHandler.progressing(attemptID);
|
taskHeartbeatHandler.progressing(attemptID);
|
||||||
|
|
||||||
// filter the events to return only map completion events in old format
|
|
||||||
List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>();
|
|
||||||
for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent event : events) {
|
|
||||||
if (TaskType.MAP.equals(event.getAttemptId().getTaskId().getTaskType())) {
|
|
||||||
mapEvents.add(TypeConverter.fromYarn(event));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return new MapTaskCompletionEventsUpdate(
|
return new MapTaskCompletionEventsUpdate(
|
||||||
mapEvents.toArray(new TaskCompletionEvent[0]), shouldReset);
|
TypeConverter.fromYarn(events), shouldReset);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -88,6 +88,9 @@ public interface Job {
|
||||||
TaskAttemptCompletionEvent[]
|
TaskAttemptCompletionEvent[]
|
||||||
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
|
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
|
||||||
|
|
||||||
|
TaskAttemptCompletionEvent[]
|
||||||
|
getMapAttemptCompletionEvents(int startIndex, int maxEvents);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return information for MR AppMasters (previously failed and current)
|
* @return information for MR AppMasters (previously failed and current)
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -190,6 +190,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
private int allowedMapFailuresPercent = 0;
|
private int allowedMapFailuresPercent = 0;
|
||||||
private int allowedReduceFailuresPercent = 0;
|
private int allowedReduceFailuresPercent = 0;
|
||||||
private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
|
private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
|
||||||
|
private List<TaskAttemptCompletionEvent> mapAttemptCompletionEvents;
|
||||||
private final List<String> diagnostics = new ArrayList<String>();
|
private final List<String> diagnostics = new ArrayList<String>();
|
||||||
|
|
||||||
//task/attempt related datastructures
|
//task/attempt related datastructures
|
||||||
|
@ -548,14 +549,28 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
@Override
|
@Override
|
||||||
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
|
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
|
||||||
int fromEventId, int maxEvents) {
|
int fromEventId, int maxEvents) {
|
||||||
|
return getAttemptCompletionEvents(taskAttemptCompletionEvents,
|
||||||
|
fromEventId, maxEvents);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
|
||||||
|
int startIndex, int maxEvents) {
|
||||||
|
return getAttemptCompletionEvents(mapAttemptCompletionEvents,
|
||||||
|
startIndex, maxEvents);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
|
||||||
|
List<TaskAttemptCompletionEvent> eventList,
|
||||||
|
int startIndex, int maxEvents) {
|
||||||
TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
|
TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (taskAttemptCompletionEvents.size() > fromEventId) {
|
if (eventList.size() > startIndex) {
|
||||||
int actualMax = Math.min(maxEvents,
|
int actualMax = Math.min(maxEvents,
|
||||||
(taskAttemptCompletionEvents.size() - fromEventId));
|
(eventList.size() - startIndex));
|
||||||
events = taskAttemptCompletionEvents.subList(fromEventId,
|
events = eventList.subList(startIndex,
|
||||||
actualMax + fromEventId).toArray(events);
|
actualMax + startIndex).toArray(events);
|
||||||
}
|
}
|
||||||
return events;
|
return events;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1070,6 +1085,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
job.taskAttemptCompletionEvents =
|
job.taskAttemptCompletionEvents =
|
||||||
new ArrayList<TaskAttemptCompletionEvent>(
|
new ArrayList<TaskAttemptCompletionEvent>(
|
||||||
job.numMapTasks + job.numReduceTasks + 10);
|
job.numMapTasks + job.numReduceTasks + 10);
|
||||||
|
job.mapAttemptCompletionEvents =
|
||||||
|
new ArrayList<TaskAttemptCompletionEvent>(job.numMapTasks + 10);
|
||||||
|
|
||||||
job.allowedMapFailuresPercent =
|
job.allowedMapFailuresPercent =
|
||||||
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
|
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
|
||||||
|
@ -1341,6 +1358,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
//eventId is equal to index in the arraylist
|
//eventId is equal to index in the arraylist
|
||||||
tce.setEventId(job.taskAttemptCompletionEvents.size());
|
tce.setEventId(job.taskAttemptCompletionEvents.size());
|
||||||
job.taskAttemptCompletionEvents.add(tce);
|
job.taskAttemptCompletionEvents.add(tce);
|
||||||
|
if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
|
||||||
|
job.mapAttemptCompletionEvents.add(tce);
|
||||||
|
}
|
||||||
|
|
||||||
TaskAttemptId attemptId = tce.getAttemptId();
|
TaskAttemptId attemptId = tce.getAttemptId();
|
||||||
TaskId taskId = attemptId.getTaskId();
|
TaskId taskId = attemptId.getTaskId();
|
||||||
|
|
|
@ -17,20 +17,33 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
|
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestTaskAttemptListenerImpl {
|
public class TestTaskAttemptListenerImpl {
|
||||||
|
@ -115,4 +128,67 @@ public class TestTaskAttemptListenerImpl {
|
||||||
|
|
||||||
listener.stop();
|
listener.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetMapCompletionEvents() throws IOException {
|
||||||
|
TaskAttemptCompletionEvent[] empty = {};
|
||||||
|
TaskAttemptCompletionEvent[] taskEvents = {
|
||||||
|
createTce(0, true, TaskAttemptCompletionEventStatus.OBSOLETE),
|
||||||
|
createTce(1, false, TaskAttemptCompletionEventStatus.FAILED),
|
||||||
|
createTce(2, true, TaskAttemptCompletionEventStatus.SUCCEEDED),
|
||||||
|
createTce(3, false, TaskAttemptCompletionEventStatus.FAILED) };
|
||||||
|
TaskAttemptCompletionEvent[] mapEvents = { taskEvents[0], taskEvents[2] };
|
||||||
|
Job mockJob = mock(Job.class);
|
||||||
|
when(mockJob.getTaskAttemptCompletionEvents(0, 100))
|
||||||
|
.thenReturn(taskEvents);
|
||||||
|
when(mockJob.getTaskAttemptCompletionEvents(0, 2))
|
||||||
|
.thenReturn(Arrays.copyOfRange(taskEvents, 0, 2));
|
||||||
|
when(mockJob.getTaskAttemptCompletionEvents(2, 100))
|
||||||
|
.thenReturn(Arrays.copyOfRange(taskEvents, 2, 4));
|
||||||
|
when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents);
|
||||||
|
when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents);
|
||||||
|
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty);
|
||||||
|
|
||||||
|
AppContext appCtx = mock(AppContext.class);
|
||||||
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
||||||
|
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
||||||
|
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
||||||
|
TaskAttemptListenerImpl listener =
|
||||||
|
new TaskAttemptListenerImpl(appCtx, secret) {
|
||||||
|
@Override
|
||||||
|
protected void registerHeartbeatHandler(Configuration conf) {
|
||||||
|
taskHeartbeatHandler = hbHandler;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
listener.init(conf);
|
||||||
|
listener.start();
|
||||||
|
|
||||||
|
JobID jid = new JobID("12345", 1);
|
||||||
|
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
|
||||||
|
MapTaskCompletionEventsUpdate update =
|
||||||
|
listener.getMapCompletionEvents(jid, 0, 100, tid);
|
||||||
|
assertEquals(2, update.events.length);
|
||||||
|
update = listener.getMapCompletionEvents(jid, 0, 2, tid);
|
||||||
|
assertEquals(2, update.events.length);
|
||||||
|
update = listener.getMapCompletionEvents(jid, 2, 100, tid);
|
||||||
|
assertEquals(0, update.events.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TaskAttemptCompletionEvent createTce(int eventId,
|
||||||
|
boolean isMap, TaskAttemptCompletionEventStatus status) {
|
||||||
|
JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
|
||||||
|
TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
|
||||||
|
isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
|
||||||
|
: org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
|
||||||
|
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
|
||||||
|
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
TaskAttemptCompletionEvent tce = recordFactory
|
||||||
|
.newRecordInstance(TaskAttemptCompletionEvent.class);
|
||||||
|
tce.setEventId(eventId);
|
||||||
|
tce.setAttemptId(attemptId);
|
||||||
|
tce.setStatus(status);
|
||||||
|
return tce;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -555,6 +555,12 @@ public class MockJobs extends MockApps {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
|
||||||
|
int startIndex, int maxEvents) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<TaskId, Task> getTasks(TaskType taskType) {
|
public Map<TaskId, Task> getTasks(TaskType taskType) {
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
|
|
@ -21,8 +21,6 @@ package org.apache.hadoop.mapreduce.v2.app;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||||
|
@ -40,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEv
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestFetchFailure {
|
public class TestFetchFailure {
|
||||||
|
@ -144,6 +143,15 @@ public class TestFetchFailure {
|
||||||
TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
|
TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
|
||||||
Assert.assertEquals("Event status not correct for reduce attempt1",
|
Assert.assertEquals("Event status not correct for reduce attempt1",
|
||||||
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
|
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
|
||||||
|
|
||||||
|
TaskAttemptCompletionEvent mapEvents[] =
|
||||||
|
job.getMapAttemptCompletionEvents(0, 2);
|
||||||
|
Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
|
||||||
|
Assert.assertArrayEquals("Unexpected map events",
|
||||||
|
Arrays.copyOfRange(events, 0, 2), mapEvents);
|
||||||
|
mapEvents = job.getMapAttemptCompletionEvents(2, 200);
|
||||||
|
Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
|
||||||
|
Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -440,6 +440,12 @@ public class TestRuntimeEstimators {
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskAttemptCompletionEvent[]
|
||||||
|
getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getName() {
|
public String getName() {
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.hs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -81,6 +82,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
|
||||||
private Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
|
private Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
|
||||||
private Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
|
private Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
|
||||||
private List<TaskAttemptCompletionEvent> completionEvents = null;
|
private List<TaskAttemptCompletionEvent> completionEvents = null;
|
||||||
|
private List<TaskAttemptCompletionEvent> mapCompletionEvents = null;
|
||||||
private JobACLsManager aclsMgr;
|
private JobACLsManager aclsMgr;
|
||||||
|
|
||||||
|
|
||||||
|
@ -176,11 +178,28 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
|
||||||
if (completionEvents == null) {
|
if (completionEvents == null) {
|
||||||
constructTaskAttemptCompletionEvents();
|
constructTaskAttemptCompletionEvents();
|
||||||
}
|
}
|
||||||
|
return getAttemptCompletionEvents(completionEvents,
|
||||||
|
fromEventId, maxEvents);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
|
||||||
|
int startIndex, int maxEvents) {
|
||||||
|
if (mapCompletionEvents == null) {
|
||||||
|
constructTaskAttemptCompletionEvents();
|
||||||
|
}
|
||||||
|
return getAttemptCompletionEvents(mapCompletionEvents,
|
||||||
|
startIndex, maxEvents);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
|
||||||
|
List<TaskAttemptCompletionEvent> eventList,
|
||||||
|
int startIndex, int maxEvents) {
|
||||||
TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0];
|
TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0];
|
||||||
if (completionEvents.size() > fromEventId) {
|
if (eventList.size() > startIndex) {
|
||||||
int actualMax = Math.min(maxEvents,
|
int actualMax = Math.min(maxEvents,
|
||||||
(completionEvents.size() - fromEventId));
|
(eventList.size() - startIndex));
|
||||||
events = completionEvents.subList(fromEventId, actualMax + fromEventId)
|
events = eventList.subList(startIndex, actualMax + startIndex)
|
||||||
.toArray(events);
|
.toArray(events);
|
||||||
}
|
}
|
||||||
return events;
|
return events;
|
||||||
|
@ -190,11 +209,15 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
|
||||||
loadAllTasks();
|
loadAllTasks();
|
||||||
completionEvents = new LinkedList<TaskAttemptCompletionEvent>();
|
completionEvents = new LinkedList<TaskAttemptCompletionEvent>();
|
||||||
List<TaskAttempt> allTaskAttempts = new LinkedList<TaskAttempt>();
|
List<TaskAttempt> allTaskAttempts = new LinkedList<TaskAttempt>();
|
||||||
|
int numMapAttempts = 0;
|
||||||
for (TaskId taskId : tasks.keySet()) {
|
for (TaskId taskId : tasks.keySet()) {
|
||||||
Task task = tasks.get(taskId);
|
Task task = tasks.get(taskId);
|
||||||
for (TaskAttemptId taskAttemptId : task.getAttempts().keySet()) {
|
for (TaskAttemptId taskAttemptId : task.getAttempts().keySet()) {
|
||||||
TaskAttempt taskAttempt = task.getAttempts().get(taskAttemptId);
|
TaskAttempt taskAttempt = task.getAttempts().get(taskAttemptId);
|
||||||
allTaskAttempts.add(taskAttempt);
|
allTaskAttempts.add(taskAttempt);
|
||||||
|
if (task.getType() == TaskType.MAP) {
|
||||||
|
++numMapAttempts;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Collections.sort(allTaskAttempts, new Comparator<TaskAttempt>() {
|
Collections.sort(allTaskAttempts, new Comparator<TaskAttempt>() {
|
||||||
|
@ -223,6 +246,8 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
mapCompletionEvents =
|
||||||
|
new ArrayList<TaskAttemptCompletionEvent>(numMapAttempts);
|
||||||
int eventId = 0;
|
int eventId = 0;
|
||||||
for (TaskAttempt taskAttempt : allTaskAttempts) {
|
for (TaskAttempt taskAttempt : allTaskAttempts) {
|
||||||
|
|
||||||
|
@ -253,6 +278,9 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
|
||||||
.getAssignedContainerMgrAddress());
|
.getAssignedContainerMgrAddress());
|
||||||
tace.setStatus(taceStatus);
|
tace.setStatus(taceStatus);
|
||||||
completionEvents.add(tace);
|
completionEvents.add(tace);
|
||||||
|
if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP) {
|
||||||
|
mapCompletionEvents.add(tace);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -153,6 +153,12 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
|
||||||
|
int startIndex, int maxEvents) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
|
public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -125,6 +125,12 @@ public class MockHistoryJobs extends MockJobs {
|
||||||
return job.getTaskAttemptCompletionEvents(fromEventId, maxEvents);
|
return job.getTaskAttemptCompletionEvents(fromEventId, maxEvents);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
|
||||||
|
int startIndex, int maxEvents) {
|
||||||
|
return job.getMapAttemptCompletionEvents(startIndex, maxEvents);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<TaskId, Task> getTasks() {
|
public Map<TaskId, Task> getTasks() {
|
||||||
return job.getTasks();
|
return job.getTasks();
|
||||||
|
|
Loading…
Reference in New Issue