From 895029b2f2535f1ba8275be29fda16d0f80be790 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 25 Oct 2012 01:26:30 +0000 Subject: [PATCH] MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion requests slowly to avoid HADOOP-8942. Contributed by Jason Lowe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1401941 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/task/reduce/EventFetcher.java | 88 ++++++------- .../hadoop/mapreduce/task/reduce/Shuffle.java | 15 ++- .../task/reduce/TestEventFetcher.java | 116 ++++++++++++++++++ 4 files changed, 178 insertions(+), 44 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9b7828d6569..088ac50f72c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -610,6 +610,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown. (Vinod Kumar Vavilapalli via jlowe) + MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion + requests slowly to avoid HADOOP-8942. (Jason Lowe via vinodkv) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java index fd80ec2b1e9..acc85645f47 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java @@ -27,10 +27,8 @@ import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.TaskAttemptID; -@SuppressWarnings("deprecation") class EventFetcher extends Thread { private static final long SLEEP_TIME = 1000; - private static final int MAX_EVENTS_TO_FETCH = 10000; private static final int MAX_RETRIES = 10; private static final int RETRY_PERIOD = 5000; private static final Log LOG = LogFactory.getLog(EventFetcher.class); @@ -38,7 +36,8 @@ class EventFetcher extends Thread { private final TaskAttemptID reduce; private final TaskUmbilicalProtocol umbilical; private final ShuffleScheduler scheduler; - private int fromEventId = 0; + private int fromEventIdx = 0; + private int maxEventsToFetch; private ExceptionReporter exceptionReporter = null; private int maxMapRuntime = 0; @@ -48,13 +47,15 @@ class EventFetcher extends Thread { public EventFetcher(TaskAttemptID reduce, TaskUmbilicalProtocol umbilical, ShuffleScheduler scheduler, - ExceptionReporter reporter) { + ExceptionReporter reporter, + int maxEventsToFetch) { setName("EventFetcher for fetching Map Completion Events"); setDaemon(true); this.reduce = reduce; this.umbilical = umbilical; this.scheduler = scheduler; exceptionReporter = reporter; + this.maxEventsToFetch = maxEventsToFetch; } @Override @@ -112,46 +113,47 @@ class EventFetcher extends Thread { * from a given event ID. * @throws IOException */ - private int getMapCompletionEvents() throws IOException { + protected int getMapCompletionEvents() throws IOException { int numNewMaps = 0; - - MapTaskCompletionEventsUpdate update = - umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID) - reduce.getJobID(), - fromEventId, - MAX_EVENTS_TO_FETCH, - (org.apache.hadoop.mapred.TaskAttemptID) - reduce); - TaskCompletionEvent events[] = update.getMapTaskCompletionEvents(); - LOG.debug("Got " + events.length + " map completion events from " + - fromEventId); - - // Check if the reset is required. - // Since there is no ordering of the task completion events at the - // reducer, the only option to sync with the new jobtracker is to reset - // the events index - if (update.shouldReset()) { - fromEventId = 0; - scheduler.resetKnownMaps(); - } - - // Update the last seen event ID - fromEventId += events.length; - - // Process the TaskCompletionEvents: - // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs. - // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop - // fetching from those maps. - // 3. Remove TIPFAILED maps from neededOutputs since we don't need their - // outputs at all. - for (TaskCompletionEvent event : events) { - switch (event.getTaskStatus()) { + TaskCompletionEvent events[] = null; + + do { + MapTaskCompletionEventsUpdate update = + umbilical.getMapCompletionEvents( + (org.apache.hadoop.mapred.JobID)reduce.getJobID(), + fromEventIdx, + maxEventsToFetch, + (org.apache.hadoop.mapred.TaskAttemptID)reduce); + events = update.getMapTaskCompletionEvents(); + LOG.debug("Got " + events.length + " map completion events from " + + fromEventIdx); + + // Check if the reset is required. + // Since there is no ordering of the task completion events at the + // reducer, the only option to sync with the new jobtracker is to reset + // the events index + if (update.shouldReset()) { + fromEventIdx = 0; + scheduler.resetKnownMaps(); + } + + // Update the last seen event ID + fromEventIdx += events.length; + + // Process the TaskCompletionEvents: + // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs. + // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop + // fetching from those maps. + // 3. Remove TIPFAILED maps from neededOutputs since we don't need their + // outputs at all. + for (TaskCompletionEvent event : events) { + switch (event.getTaskStatus()) { case SUCCEEDED: URI u = getBaseURI(event.getTaskTrackerHttp()); scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(), - u.toString(), - event.getTaskAttemptId()); + u.toString(), + event.getTaskAttemptId()); numNewMaps ++; int duration = event.getTaskRunTime(); if (duration > maxMapRuntime) { @@ -164,15 +166,17 @@ class EventFetcher extends Thread { case OBSOLETE: scheduler.obsoleteMapOutput(event.getTaskAttemptId()); LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + - " map-task: '" + event.getTaskAttemptId() + "'"); + " map-task: '" + event.getTaskAttemptId() + "'"); break; case TIPFAILED: scheduler.tipFailed(event.getTaskAttemptId().getTaskID()); LOG.info("Ignoring output of failed map TIP: '" + - event.getTaskAttemptId() + "'"); + event.getTaskAttemptId() + "'"); break; + } } - } + } while (events.length == maxEventsToFetch); + return numNewMaps; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java index e7d7d71d079..ceada74f793 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java @@ -40,9 +40,12 @@ import org.apache.hadoop.util.Progress; @InterfaceAudience.Private @InterfaceStability.Unstable -@SuppressWarnings({"deprecation", "unchecked", "rawtypes"}) +@SuppressWarnings({"unchecked", "rawtypes"}) public class Shuffle implements ExceptionReporter { private static final int PROGRESS_FREQUENCY = 2000; + private static final int MAX_EVENTS_TO_FETCH = 10000; + private static final int MIN_EVENTS_TO_FETCH = 100; + private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000; private final TaskAttemptID reduceId; private final JobConf jobConf; @@ -99,9 +102,17 @@ public class Shuffle implements ExceptionReporter { } public RawKeyValueIterator run() throws IOException, InterruptedException { + // Scale the maximum events we fetch per RPC call to mitigate OOM issues + // on the ApplicationMaster when a thundering herd of reducers fetch events + // TODO: This should not be necessary after HADOOP-8942 + int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, + MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks()); + int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer); + // Start the map-completion events fetcher thread final EventFetcher eventFetcher = - new EventFetcher(reduceId, umbilical, scheduler, this); + new EventFetcher(reduceId, umbilical, scheduler, this, + maxEventsToFetch); eventFetcher.start(); // Start the map-output fetcher threads diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java new file mode 100644 index 00000000000..84ac656cf9b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskCompletionEvent; +import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapreduce.TaskType; +import org.junit.Test; +import org.mockito.InOrder; + +public class TestEventFetcher { + + @Test + public void testConsecutiveFetch() throws IOException { + final int MAX_EVENTS_TO_FETCH = 100; + TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 1); + + TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class); + when(umbilical.getMapCompletionEvents(any(JobID.class), + anyInt(), anyInt(), any(TaskAttemptID.class))) + .thenReturn(getMockedCompletionEventsUpdate(0, 0)); + when(umbilical.getMapCompletionEvents(any(JobID.class), + eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid))) + .thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH)); + when(umbilical.getMapCompletionEvents(any(JobID.class), + eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid))) + .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH, + MAX_EVENTS_TO_FETCH)); + when(umbilical.getMapCompletionEvents(any(JobID.class), + eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid))) + .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH*2, 3)); + + @SuppressWarnings("unchecked") + ShuffleScheduler scheduler = mock(ShuffleScheduler.class); + ExceptionReporter reporter = mock(ExceptionReporter.class); + + EventFetcherForTest ef = + new EventFetcherForTest(tid, umbilical, scheduler, + reporter, MAX_EVENTS_TO_FETCH); + ef.getMapCompletionEvents(); + + verify(reporter, never()).reportException(any(Throwable.class)); + InOrder inOrder = inOrder(umbilical); + inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class), + eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)); + inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class), + eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)); + inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class), + eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid)); + verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).addKnownMapOutput( + anyString(), anyString(), any(TaskAttemptID.class)); + } + + private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate( + int startIdx, int numEvents) { + ArrayList tceList = + new ArrayList(numEvents); + for (int i = 0; i < numEvents; ++i) { + int eventIdx = startIdx + i; + TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx, + new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0), + eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED, + "http://somehost:8888"); + tceList.add(tce); + } + TaskCompletionEvent[] events = {}; + return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false); + } + + private static class EventFetcherForTest extends EventFetcher { + + public EventFetcherForTest(TaskAttemptID reduce, + TaskUmbilicalProtocol umbilical, ShuffleScheduler scheduler, + ExceptionReporter reporter, int maxEventsToFetch) { + super(reduce, umbilical, scheduler, reporter, maxEventsToFetch); + } + + @Override + public int getMapCompletionEvents() throws IOException { + return super.getMapCompletionEvents(); + } + + } +}