MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion requests slowly to avoid HADOOP-8942. Contributed by Jason Lowe.

svn merge --ignore-ancestry -c 1401941 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1401942 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-10-25 01:27:17 +00:00
parent 33cb0be07c
commit 79a14bb25f
4 changed files with 178 additions and 44 deletions

View File

@ -463,6 +463,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown. MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown.
(Vinod Kumar Vavilapalli via jlowe) (Vinod Kumar Vavilapalli via jlowe)
MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion
requests slowly to avoid HADOOP-8942. (Jason Lowe via vinodkv)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -27,10 +27,8 @@ import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
@SuppressWarnings("deprecation")
class EventFetcher<K,V> extends Thread { class EventFetcher<K,V> extends Thread {
private static final long SLEEP_TIME = 1000; private static final long SLEEP_TIME = 1000;
private static final int MAX_EVENTS_TO_FETCH = 10000;
private static final int MAX_RETRIES = 10; private static final int MAX_RETRIES = 10;
private static final int RETRY_PERIOD = 5000; private static final int RETRY_PERIOD = 5000;
private static final Log LOG = LogFactory.getLog(EventFetcher.class); private static final Log LOG = LogFactory.getLog(EventFetcher.class);
@ -38,7 +36,8 @@ class EventFetcher<K,V> extends Thread {
private final TaskAttemptID reduce; private final TaskAttemptID reduce;
private final TaskUmbilicalProtocol umbilical; private final TaskUmbilicalProtocol umbilical;
private final ShuffleScheduler<K,V> scheduler; private final ShuffleScheduler<K,V> scheduler;
private int fromEventId = 0; private int fromEventIdx = 0;
private int maxEventsToFetch;
private ExceptionReporter exceptionReporter = null; private ExceptionReporter exceptionReporter = null;
private int maxMapRuntime = 0; private int maxMapRuntime = 0;
@ -48,13 +47,15 @@ class EventFetcher<K,V> extends Thread {
public EventFetcher(TaskAttemptID reduce, public EventFetcher(TaskAttemptID reduce,
TaskUmbilicalProtocol umbilical, TaskUmbilicalProtocol umbilical,
ShuffleScheduler<K,V> scheduler, ShuffleScheduler<K,V> scheduler,
ExceptionReporter reporter) { ExceptionReporter reporter,
int maxEventsToFetch) {
setName("EventFetcher for fetching Map Completion Events"); setName("EventFetcher for fetching Map Completion Events");
setDaemon(true); setDaemon(true);
this.reduce = reduce; this.reduce = reduce;
this.umbilical = umbilical; this.umbilical = umbilical;
this.scheduler = scheduler; this.scheduler = scheduler;
exceptionReporter = reporter; exceptionReporter = reporter;
this.maxEventsToFetch = maxEventsToFetch;
} }
@Override @Override
@ -112,46 +113,47 @@ class EventFetcher<K,V> extends Thread {
* from a given event ID. * from a given event ID.
* @throws IOException * @throws IOException
*/ */
private int getMapCompletionEvents() throws IOException { protected int getMapCompletionEvents() throws IOException {
int numNewMaps = 0; int numNewMaps = 0;
TaskCompletionEvent events[] = null;
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID) do {
reduce.getJobID(), MapTaskCompletionEventsUpdate update =
fromEventId, umbilical.getMapCompletionEvents(
MAX_EVENTS_TO_FETCH, (org.apache.hadoop.mapred.JobID)reduce.getJobID(),
(org.apache.hadoop.mapred.TaskAttemptID) fromEventIdx,
reduce); maxEventsToFetch,
TaskCompletionEvent events[] = update.getMapTaskCompletionEvents(); (org.apache.hadoop.mapred.TaskAttemptID)reduce);
LOG.debug("Got " + events.length + " map completion events from " + events = update.getMapTaskCompletionEvents();
fromEventId); LOG.debug("Got " + events.length + " map completion events from " +
fromEventIdx);
// Check if the reset is required.
// Since there is no ordering of the task completion events at the // Check if the reset is required.
// reducer, the only option to sync with the new jobtracker is to reset // Since there is no ordering of the task completion events at the
// the events index // reducer, the only option to sync with the new jobtracker is to reset
if (update.shouldReset()) { // the events index
fromEventId = 0; if (update.shouldReset()) {
scheduler.resetKnownMaps(); fromEventIdx = 0;
} scheduler.resetKnownMaps();
}
// Update the last seen event ID
fromEventId += events.length; // Update the last seen event ID
fromEventIdx += events.length;
// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs. // Process the TaskCompletionEvents:
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// fetching from those maps. // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their // fetching from those maps.
// outputs at all. // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
for (TaskCompletionEvent event : events) { // outputs at all.
switch (event.getTaskStatus()) { for (TaskCompletionEvent event : events) {
switch (event.getTaskStatus()) {
case SUCCEEDED: case SUCCEEDED:
URI u = getBaseURI(event.getTaskTrackerHttp()); URI u = getBaseURI(event.getTaskTrackerHttp());
scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(), scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(), u.toString(),
event.getTaskAttemptId()); event.getTaskAttemptId());
numNewMaps ++; numNewMaps ++;
int duration = event.getTaskRunTime(); int duration = event.getTaskRunTime();
if (duration > maxMapRuntime) { if (duration > maxMapRuntime) {
@ -164,15 +166,17 @@ class EventFetcher<K,V> extends Thread {
case OBSOLETE: case OBSOLETE:
scheduler.obsoleteMapOutput(event.getTaskAttemptId()); scheduler.obsoleteMapOutput(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'"); " map-task: '" + event.getTaskAttemptId() + "'");
break; break;
case TIPFAILED: case TIPFAILED:
scheduler.tipFailed(event.getTaskAttemptId().getTaskID()); scheduler.tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" + LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'"); event.getTaskAttemptId() + "'");
break; break;
}
} }
} } while (events.length == maxEventsToFetch);
return numNewMaps; return numNewMaps;
} }

View File

@ -40,9 +40,12 @@ import org.apache.hadoop.util.Progress;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
@SuppressWarnings({"deprecation", "unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public class Shuffle<K, V> implements ExceptionReporter { public class Shuffle<K, V> implements ExceptionReporter {
private static final int PROGRESS_FREQUENCY = 2000; private static final int PROGRESS_FREQUENCY = 2000;
private static final int MAX_EVENTS_TO_FETCH = 10000;
private static final int MIN_EVENTS_TO_FETCH = 100;
private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
private final TaskAttemptID reduceId; private final TaskAttemptID reduceId;
private final JobConf jobConf; private final JobConf jobConf;
@ -99,9 +102,17 @@ public class Shuffle<K, V> implements ExceptionReporter {
} }
public RawKeyValueIterator run() throws IOException, InterruptedException { public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
// TODO: This should not be necessary after HADOOP-8942
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread // Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher = final EventFetcher<K,V> eventFetcher =
new EventFetcher<K,V>(reduceId, umbilical, scheduler, this); new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start(); eventFetcher.start();
// Start the map-output fetcher threads // Start the map-output fetcher threads

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.task.reduce;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.Test;
import org.mockito.InOrder;
public class TestEventFetcher {
@Test
public void testConsecutiveFetch() throws IOException {
final int MAX_EVENTS_TO_FETCH = 100;
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 1);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
anyInt(), anyInt(), any(TaskAttemptID.class)))
.thenReturn(getMockedCompletionEventsUpdate(0, 0));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH,
MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH*2, 3));
@SuppressWarnings("unchecked")
ShuffleScheduler<String,String> scheduler = mock(ShuffleScheduler.class);
ExceptionReporter reporter = mock(ExceptionReporter.class);
EventFetcherForTest<String,String> ef =
new EventFetcherForTest<String,String>(tid, umbilical, scheduler,
reporter, MAX_EVENTS_TO_FETCH);
ef.getMapCompletionEvents();
verify(reporter, never()).reportException(any(Throwable.class));
InOrder inOrder = inOrder(umbilical);
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid));
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid));
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid));
verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).addKnownMapOutput(
anyString(), anyString(), any(TaskAttemptID.class));
}
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx, int numEvents) {
ArrayList<TaskCompletionEvent> tceList =
new ArrayList<TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
}
TaskCompletionEvent[] events = {};
return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
}
private static class EventFetcherForTest<K,V> extends EventFetcher<K,V> {
public EventFetcherForTest(TaskAttemptID reduce,
TaskUmbilicalProtocol umbilical, ShuffleScheduler<K,V> scheduler,
ExceptionReporter reporter, int maxEventsToFetch) {
super(reduce, umbilical, scheduler, reporter, maxEventsToFetch);
}
@Override
public int getMapCompletionEvents() throws IOException {
return super.getMapCompletionEvents();
}
}
}