MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the number of map completion event type conversions. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1437103 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-01-22 19:07:08 +00:00
parent bb81a17e0b
commit 74ffc7a74d
11 changed files with 90 additions and 43 deletions

View File

@ -654,6 +654,9 @@ Release 0.23.7 - UNRELEASED
OPTIMIZATIONS
MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the
number of map completion event type conversions. (Jason Lowe via sseth)
BUG FIXES
MAPREDUCE-4458. Warn if java.library.path is used for AM or Task

View File

@ -275,14 +275,13 @@ public MapTaskCompletionEventsUpdate getMapCompletionEvents(
boolean shouldReset = false;
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[] events =
TaskCompletionEvent[] events =
context.getJob(attemptID.getTaskId().getJobId()).getMapAttemptCompletionEvents(
startIndex, maxEvents);
taskHeartbeatHandler.progressing(attemptID);
return new MapTaskCompletionEventsUpdate(
TypeConverter.fromYarn(events), shouldReset);
return new MapTaskCompletionEventsUpdate(events, shouldReset);
}
@Override

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
@ -88,7 +89,7 @@ public interface Job {
TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
TaskAttemptCompletionEvent[]
TaskCompletionEvent[]
getMapAttemptCompletionEvents(int startIndex, int maxEvents);
/**

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
@ -130,6 +131,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private static final TaskAttemptCompletionEvent[]
EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TaskAttemptCompletionEvent[0];
private static final TaskCompletionEvent[]
EMPTY_TASK_COMPLETION_EVENTS = new TaskCompletionEvent[0];
private static final Log LOG = LogFactory.getLog(JobImpl.class);
//The maximum fraction of fetch failures allowed for a map
@ -196,7 +200,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private int allowedMapFailuresPercent = 0;
private int allowedReduceFailuresPercent = 0;
private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
private List<TaskAttemptCompletionEvent> mapAttemptCompletionEvents;
private List<TaskCompletionEvent> mapAttemptCompletionEvents;
private List<Integer> taskCompletionIdxToMapCompletionIdx;
private final List<String> diagnostics = new ArrayList<String>();
//task/attempt related datastructures
@ -684,27 +689,31 @@ public static Counters incrTaskCounters(
@Override
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
return getAttemptCompletionEvents(taskAttemptCompletionEvents,
fromEventId, maxEvents);
}
@Override
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
return getAttemptCompletionEvents(mapAttemptCompletionEvents,
startIndex, maxEvents);
}
private TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
List<TaskAttemptCompletionEvent> eventList,
int startIndex, int maxEvents) {
TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
readLock.lock();
try {
if (eventList.size() > startIndex) {
if (taskAttemptCompletionEvents.size() > fromEventId) {
int actualMax = Math.min(maxEvents,
(eventList.size() - startIndex));
events = eventList.subList(startIndex,
(taskAttemptCompletionEvents.size() - fromEventId));
events = taskAttemptCompletionEvents.subList(fromEventId,
actualMax + fromEventId).toArray(events);
}
return events;
} finally {
readLock.unlock();
}
}
@Override
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
TaskCompletionEvent[] events = EMPTY_TASK_COMPLETION_EVENTS;
readLock.lock();
try {
if (mapAttemptCompletionEvents.size() > startIndex) {
int actualMax = Math.min(maxEvents,
(mapAttemptCompletionEvents.size() - startIndex));
events = mapAttemptCompletionEvents.subList(startIndex,
actualMax + startIndex).toArray(events);
}
return events;
@ -1247,7 +1256,9 @@ public JobStateInternal transition(JobImpl job, JobEvent event) {
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
job.mapAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(job.numMapTasks + 10);
new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
job.numMapTasks + job.numReduceTasks + 10);
job.allowedMapFailuresPercent =
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
@ -1562,19 +1573,37 @@ public void transition(JobImpl job, JobEvent event) {
//eventId is equal to index in the arraylist
tce.setEventId(job.taskAttemptCompletionEvents.size());
job.taskAttemptCompletionEvents.add(tce);
int mapEventIdx = -1;
if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
job.mapAttemptCompletionEvents.add(tce);
// we track map completions separately from task completions because
// - getMapAttemptCompletionEvents uses index ranges specific to maps
// - type converting the same events over and over is expensive
mapEventIdx = job.mapAttemptCompletionEvents.size();
job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce));
}
job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx);
TaskAttemptId attemptId = tce.getAttemptId();
TaskId taskId = attemptId.getTaskId();
//make the previous completion event as obsolete if it exists
Object successEventNo =
job.successAttemptCompletionEventNoMap.remove(taskId);
Integer successEventNo =
job.successAttemptCompletionEventNoMap.remove(taskId);
if (successEventNo != null) {
TaskAttemptCompletionEvent successEvent =
job.taskAttemptCompletionEvents.get((Integer) successEventNo);
job.taskAttemptCompletionEvents.get(successEventNo);
successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
int mapCompletionIdx =
job.taskCompletionIdxToMapCompletionIdx.get(successEventNo);
if (mapCompletionIdx >= 0) {
// update the corresponding TaskCompletionEvent for the map
TaskCompletionEvent mapEvent =
job.mapAttemptCompletionEvents.get(mapCompletionIdx);
job.mapAttemptCompletionEvents.set(mapCompletionIdx,
new TaskCompletionEvent(mapEvent.getEventId(),
mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(),
mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE,
mapEvent.getTaskTrackerHttp()));
}
}
// if this attempt is not successful then why is the previous successful

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@ -153,9 +154,12 @@ public void testGetMapCompletionEvents() throws IOException {
.thenReturn(Arrays.copyOfRange(taskEvents, 0, 2));
when(mockJob.getTaskAttemptCompletionEvents(2, 100))
.thenReturn(Arrays.copyOfRange(taskEvents, 2, 4));
when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents);
when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents);
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty);
when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(
TypeConverter.fromYarn(mapEvents));
when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(
TypeConverter.fromYarn(mapEvents));
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
TypeConverter.fromYarn(empty));
AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobACL;
@ -556,7 +557,7 @@ public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
}
@Override
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
return null;
}

View File

@ -25,8 +25,10 @@
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -150,14 +152,16 @@ public void testFetchFailure() throws Exception {
Assert.assertEquals("Event status not correct for reduce attempt1",
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
TaskAttemptCompletionEvent mapEvents[] =
TaskCompletionEvent mapEvents[] =
job.getMapAttemptCompletionEvents(0, 2);
TaskCompletionEvent convertedEvents[] = TypeConverter.fromYarn(events);
Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
Assert.assertArrayEquals("Unexpected map events",
Arrays.copyOfRange(events, 0, 2), mapEvents);
Arrays.copyOfRange(convertedEvents, 0, 2), mapEvents);
mapEvents = job.getMapAttemptCompletionEvents(2, 200);
Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
Assert.assertEquals("Unexpected map event", convertedEvents[2],
mapEvents[0]);
}
/**
@ -395,14 +399,16 @@ public void testFetchFailureMultipleReduces() throws Exception {
Assert.assertEquals("Event status not correct for reduce attempt1",
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
TaskAttemptCompletionEvent mapEvents[] =
TaskCompletionEvent mapEvents[] =
job.getMapAttemptCompletionEvents(0, 2);
TaskCompletionEvent convertedEvents[] = TypeConverter.fromYarn(events);
Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
Assert.assertArrayEquals("Unexpected map events",
Arrays.copyOfRange(events, 0, 2), mapEvents);
Arrays.copyOfRange(convertedEvents, 0, 2), mapEvents);
mapEvents = job.getMapAttemptCompletionEvents(2, 200);
Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
Assert.assertEquals("Unexpected map event", convertedEvents[2],
mapEvents[0]);
}

View File

@ -32,6 +32,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
@ -441,7 +442,7 @@ public int getCompletedReduces() {
}
@Override
public TaskAttemptCompletionEvent[]
public TaskCompletionEvent[]
getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TaskID;
@ -183,13 +184,13 @@ public synchronized TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
}
@Override
public synchronized TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
public synchronized TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
if (mapCompletionEvents == null) {
constructTaskAttemptCompletionEvents();
}
return getAttemptCompletionEvents(mapCompletionEvents,
startIndex, maxEvents);
return TypeConverter.fromYarn(getAttemptCompletionEvents(
mapCompletionEvents, startIndex, maxEvents));
}
private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents(

View File

@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
@ -154,7 +155,7 @@ public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
}
@Override
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
return null;
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -143,7 +144,7 @@ public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
}
@Override
public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
return job.getMapAttemptCompletionEvents(startIndex, maxEvents);
}