merge MAPREDUCE-4946 from trunk. Fix a performance problem for large jobs by reducing the number of map completion event type conversions. Contributed by Jason Lowe
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1437104 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b486f7e2de
commit
b981b59c06
|
@ -495,6 +495,9 @@ Release 0.23.7 - UNRELEASED
|
|||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the
|
||||
number of map completion event type conversions. (Jason Lowe via sseth)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-4458. Warn if java.library.path is used for AM or Task
|
||||
|
|
|
@ -275,14 +275,13 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|||
boolean shouldReset = false;
|
||||
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
|
||||
TypeConverter.toYarn(taskAttemptID);
|
||||
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[] events =
|
||||
TaskCompletionEvent[] events =
|
||||
context.getJob(attemptID.getTaskId().getJobId()).getMapAttemptCompletionEvents(
|
||||
startIndex, maxEvents);
|
||||
|
||||
taskHeartbeatHandler.progressing(attemptID);
|
||||
|
||||
return new MapTaskCompletionEventsUpdate(
|
||||
TypeConverter.fromYarn(events), shouldReset);
|
||||
return new MapTaskCompletionEventsUpdate(events, shouldReset);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Map;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
|
@ -88,7 +89,7 @@ public interface Job {
|
|||
TaskAttemptCompletionEvent[]
|
||||
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
|
||||
|
||||
TaskAttemptCompletionEvent[]
|
||||
TaskCompletionEvent[]
|
||||
getMapAttemptCompletionEvents(int startIndex, int maxEvents);
|
||||
|
||||
/**
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.JobACLsManager;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
|
@ -130,6 +131,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
private static final TaskAttemptCompletionEvent[]
|
||||
EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TaskAttemptCompletionEvent[0];
|
||||
|
||||
private static final TaskCompletionEvent[]
|
||||
EMPTY_TASK_COMPLETION_EVENTS = new TaskCompletionEvent[0];
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(JobImpl.class);
|
||||
|
||||
//The maximum fraction of fetch failures allowed for a map
|
||||
|
@ -196,7 +200,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
private int allowedMapFailuresPercent = 0;
|
||||
private int allowedReduceFailuresPercent = 0;
|
||||
private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
|
||||
private List<TaskAttemptCompletionEvent> mapAttemptCompletionEvents;
|
||||
private List<TaskCompletionEvent> mapAttemptCompletionEvents;
|
||||
private List<Integer> taskCompletionIdxToMapCompletionIdx;
|
||||
private final List<String> diagnostics = new ArrayList<String>();
|
||||
|
||||
//task/attempt related datastructures
|
||||
|
@ -684,27 +689,31 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
@Override
|
||||
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
|
||||
int fromEventId, int maxEvents) {
|
||||
return getAttemptCompletionEvents(taskAttemptCompletionEvents,
|
||||
fromEventId, maxEvents);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
|
||||
int startIndex, int maxEvents) {
|
||||
return getAttemptCompletionEvents(mapAttemptCompletionEvents,
|
||||
startIndex, maxEvents);
|
||||
}
|
||||
|
||||
private TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
|
||||
List<TaskAttemptCompletionEvent> eventList,
|
||||
int startIndex, int maxEvents) {
|
||||
TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
|
||||
readLock.lock();
|
||||
try {
|
||||
if (eventList.size() > startIndex) {
|
||||
if (taskAttemptCompletionEvents.size() > fromEventId) {
|
||||
int actualMax = Math.min(maxEvents,
|
||||
(eventList.size() - startIndex));
|
||||
events = eventList.subList(startIndex,
|
||||
(taskAttemptCompletionEvents.size() - fromEventId));
|
||||
events = taskAttemptCompletionEvents.subList(fromEventId,
|
||||
actualMax + fromEventId).toArray(events);
|
||||
}
|
||||
return events;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
|
||||
int startIndex, int maxEvents) {
|
||||
TaskCompletionEvent[] events = EMPTY_TASK_COMPLETION_EVENTS;
|
||||
readLock.lock();
|
||||
try {
|
||||
if (mapAttemptCompletionEvents.size() > startIndex) {
|
||||
int actualMax = Math.min(maxEvents,
|
||||
(mapAttemptCompletionEvents.size() - startIndex));
|
||||
events = mapAttemptCompletionEvents.subList(startIndex,
|
||||
actualMax + startIndex).toArray(events);
|
||||
}
|
||||
return events;
|
||||
|
@ -1247,7 +1256,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
new ArrayList<TaskAttemptCompletionEvent>(
|
||||
job.numMapTasks + job.numReduceTasks + 10);
|
||||
job.mapAttemptCompletionEvents =
|
||||
new ArrayList<TaskAttemptCompletionEvent>(job.numMapTasks + 10);
|
||||
new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
|
||||
job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
|
||||
job.numMapTasks + job.numReduceTasks + 10);
|
||||
|
||||
job.allowedMapFailuresPercent =
|
||||
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
|
||||
|
@ -1562,19 +1573,37 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
//eventId is equal to index in the arraylist
|
||||
tce.setEventId(job.taskAttemptCompletionEvents.size());
|
||||
job.taskAttemptCompletionEvents.add(tce);
|
||||
int mapEventIdx = -1;
|
||||
if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
|
||||
job.mapAttemptCompletionEvents.add(tce);
|
||||
// we track map completions separately from task completions because
|
||||
// - getMapAttemptCompletionEvents uses index ranges specific to maps
|
||||
// - type converting the same events over and over is expensive
|
||||
mapEventIdx = job.mapAttemptCompletionEvents.size();
|
||||
job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce));
|
||||
}
|
||||
job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx);
|
||||
|
||||
TaskAttemptId attemptId = tce.getAttemptId();
|
||||
TaskId taskId = attemptId.getTaskId();
|
||||
//make the previous completion event as obsolete if it exists
|
||||
Object successEventNo =
|
||||
job.successAttemptCompletionEventNoMap.remove(taskId);
|
||||
Integer successEventNo =
|
||||
job.successAttemptCompletionEventNoMap.remove(taskId);
|
||||
if (successEventNo != null) {
|
||||
TaskAttemptCompletionEvent successEvent =
|
||||
job.taskAttemptCompletionEvents.get((Integer) successEventNo);
|
||||
job.taskAttemptCompletionEvents.get(successEventNo);
|
||||
successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
|
||||
int mapCompletionIdx =
|
||||
job.taskCompletionIdxToMapCompletionIdx.get(successEventNo);
|
||||
if (mapCompletionIdx >= 0) {
|
||||
// update the corresponding TaskCompletionEvent for the map
|
||||
TaskCompletionEvent mapEvent =
|
||||
job.mapAttemptCompletionEvents.get(mapCompletionIdx);
|
||||
job.mapAttemptCompletionEvents.set(mapCompletionIdx,
|
||||
new TaskCompletionEvent(mapEvent.getEventId(),
|
||||
mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(),
|
||||
mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE,
|
||||
mapEvent.getTaskTrackerHttp()));
|
||||
}
|
||||
}
|
||||
|
||||
// if this attempt is not successful then why is the previous successful
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.Arrays;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
||||
|
@ -153,9 +154,12 @@ public class TestTaskAttemptListenerImpl {
|
|||
.thenReturn(Arrays.copyOfRange(taskEvents, 0, 2));
|
||||
when(mockJob.getTaskAttemptCompletionEvents(2, 100))
|
||||
.thenReturn(Arrays.copyOfRange(taskEvents, 2, 4));
|
||||
when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents);
|
||||
when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents);
|
||||
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty);
|
||||
when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(
|
||||
TypeConverter.fromYarn(mapEvents));
|
||||
when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(
|
||||
TypeConverter.fromYarn(mapEvents));
|
||||
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
|
||||
TypeConverter.fromYarn(empty));
|
||||
|
||||
AppContext appCtx = mock(AppContext.class);
|
||||
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileContext;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobACLsManager;
|
||||
import org.apache.hadoop.mapred.ShuffleHandler;
|
||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
|
@ -556,7 +557,7 @@ public class MockJobs extends MockApps {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
|
||||
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
|
||||
int startIndex, int maxEvents) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -25,8 +25,10 @@ import java.util.Arrays;
|
|||
import java.util.Iterator;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
|
@ -150,14 +152,16 @@ public class TestFetchFailure {
|
|||
Assert.assertEquals("Event status not correct for reduce attempt1",
|
||||
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
|
||||
|
||||
TaskAttemptCompletionEvent mapEvents[] =
|
||||
TaskCompletionEvent mapEvents[] =
|
||||
job.getMapAttemptCompletionEvents(0, 2);
|
||||
TaskCompletionEvent convertedEvents[] = TypeConverter.fromYarn(events);
|
||||
Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
|
||||
Assert.assertArrayEquals("Unexpected map events",
|
||||
Arrays.copyOfRange(events, 0, 2), mapEvents);
|
||||
Arrays.copyOfRange(convertedEvents, 0, 2), mapEvents);
|
||||
mapEvents = job.getMapAttemptCompletionEvents(2, 200);
|
||||
Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
|
||||
Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
|
||||
Assert.assertEquals("Unexpected map event", convertedEvents[2],
|
||||
mapEvents[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -395,14 +399,16 @@ public class TestFetchFailure {
|
|||
Assert.assertEquals("Event status not correct for reduce attempt1",
|
||||
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
|
||||
|
||||
TaskAttemptCompletionEvent mapEvents[] =
|
||||
TaskCompletionEvent mapEvents[] =
|
||||
job.getMapAttemptCompletionEvents(0, 2);
|
||||
TaskCompletionEvent convertedEvents[] = TypeConverter.fromYarn(events);
|
||||
Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
|
||||
Assert.assertArrayEquals("Unexpected map events",
|
||||
Arrays.copyOfRange(events, 0, 2), mapEvents);
|
||||
Arrays.copyOfRange(convertedEvents, 0, 2), mapEvents);
|
||||
mapEvents = job.getMapAttemptCompletionEvents(2, 200);
|
||||
Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
|
||||
Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
|
||||
Assert.assertEquals("Unexpected map event", convertedEvents[2],
|
||||
mapEvents[0]);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
|
@ -441,7 +442,7 @@ public class TestRuntimeEstimators {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TaskAttemptCompletionEvent[]
|
||||
public TaskCompletionEvent[]
|
||||
getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobACLsManager;
|
||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.TaskID;
|
||||
|
@ -183,13 +184,13 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
|
||||
public synchronized TaskCompletionEvent[] getMapAttemptCompletionEvents(
|
||||
int startIndex, int maxEvents) {
|
||||
if (mapCompletionEvents == null) {
|
||||
constructTaskAttemptCompletionEvents();
|
||||
}
|
||||
return getAttemptCompletionEvents(mapCompletionEvents,
|
||||
startIndex, maxEvents);
|
||||
return TypeConverter.fromYarn(getAttemptCompletionEvents(
|
||||
mapCompletionEvents, startIndex, maxEvents));
|
||||
}
|
||||
|
||||
private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
|
@ -154,7 +155,7 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
|
||||
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
|
||||
int startIndex, int maxEvents) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Map;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
|
@ -143,7 +144,7 @@ public class MockHistoryJobs extends MockJobs {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
|
||||
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
|
||||
int startIndex, int maxEvents) {
|
||||
return job.getMapAttemptCompletionEvents(startIndex, maxEvents);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue