MAPREDUCE-5689. MRAppMaster does not preempt reducers when scheduled maps cannot be fulfilled. (lohit via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1555161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-01-03 17:27:36 +00:00
parent d85c017d04
commit 1a8781f1f9
3 changed files with 20 additions and 1 deletions

View File

@ -267,6 +267,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5685. Fixed a bug with JobContext getCacheFiles API inside the MAPREDUCE-5685. Fixed a bug with JobContext getCacheFiles API inside the
WrappedReducer class. (Yi Song via vinodkv) WrappedReducer class. (Yi Song via vinodkv)
MAPREDUCE-5689. MRAppMaster does not preempt reducers when scheduled maps
cannot be fulfilled. (lohit via kasha)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -229,7 +229,8 @@ protected synchronized void heartbeat() throws Exception {
int completedMaps = getJob().getCompletedMaps(); int completedMaps = getJob().getCompletedMaps();
int completedTasks = completedMaps + getJob().getCompletedReduces(); int completedTasks = completedMaps + getJob().getCompletedReduces();
if (lastCompletedTasks != completedTasks) { if ((lastCompletedTasks != completedTasks) ||
(scheduledRequests.maps.size() > 0)) {
lastCompletedTasks = completedTasks; lastCompletedTasks = completedTasks;
recalculateReduceSchedule = true; recalculateReduceSchedule = true;
} }

View File

@ -1604,6 +1604,21 @@ public void testReduceScheduling() throws Exception {
numPendingReduces, numPendingReduces,
maxReduceRampupLimit, reduceSlowStart); maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt()); verify(allocator).rampDownReduces(anyInt());
// Test reduce ramp-down for when there are scheduled maps
// Since we have two scheduled Maps, rampDownReducers
// should be invoked twice.
scheduledMaps = 2;
assignedReduces = 2;
doReturn(10 * 1024).when(allocator).getMemLimit();
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, times(2)).rampDownReduces(anyInt());
} }
private static class RecalculateContainerAllocator extends MyContainerAllocator { private static class RecalculateContainerAllocator extends MyContainerAllocator {