MAPREDUCE-3490. Fixed MapReduce AM to count failed maps also towards Reduce ramp up. Contributed by Sharad Agarwal and Arun C Murthy.

svn merge -c 1227226 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1227227 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-04 17:12:06 +00:00
parent aa4147c428
commit 16b1c5c24d
3 changed files with 146 additions and 49 deletions

View File

@ -335,6 +335,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3608. Fixed compile issue with MAPREDUCE-3522. (mahadev via
acmurthy)
MAPREDUCE-3490. Fixed MapReduce AM to count failed maps also towards Reduce
ramp up. (Sharad Agarwal and Arun C Murthy via vinodkv)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -33,6 +33,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID;
@ -122,8 +123,6 @@ added to the pending and are ramped up (added to scheduled) based
private boolean recalculateReduceSchedule = false;
private int mapResourceReqt;//memory
private int reduceResourceReqt;//memory
private int completedMaps = 0;
private int completedReduces = 0;
private boolean reduceStarted = false;
private float maxReduceRampupLimit = 0;
@ -169,7 +168,13 @@ protected synchronized void heartbeat() throws Exception {
if (recalculateReduceSchedule) {
preemptReducesIfNeeded();
scheduleReduces();
scheduleReduces(
getJob().getTotalMaps(), getJob().getCompletedMaps(),
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceReqt, reduceResourceReqt,
pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
}
}
@ -180,6 +185,14 @@ public void stop() {
LOG.info("Final Stats: " + getStat());
}
public boolean getIsReduceStarted() {
return reduceStarted;
}
public void setIsReduceStarted(boolean reduceStarted) {
this.reduceStarted = reduceStarted;
}
@SuppressWarnings("unchecked")
@Override
public synchronized void handle(ContainerAllocatorEvent event) {
@ -319,10 +332,17 @@ private void preemptReducesIfNeeded() {
}
}
}
private void scheduleReduces() {
@Private
public void scheduleReduces(
int totalMaps, int completedMaps,
int scheduledMaps, int scheduledReduces,
int assignedMaps, int assignedReduces,
int mapResourceReqt, int reduceResourceReqt,
int numPendingReduces,
float maxReduceRampupLimit, float reduceSlowStart) {
if (pendingReduces.size() == 0) {
if (numPendingReduces == 0) {
return;
}
@ -330,29 +350,25 @@ private void scheduleReduces() {
//if all maps are assigned, then ramp up all reduces irrespective of the
//headroom
if (scheduledRequests.maps.size() == 0 && pendingReduces.size() > 0) {
LOG.info("All maps assigned. Ramping up all remaining reduces:" + pendingReduces.size());
for (ContainerRequest req : pendingReduces) {
scheduledRequests.addReduce(req);
}
pendingReduces.clear();
if (scheduledMaps == 0 && numPendingReduces > 0) {
LOG.info("All maps assigned. " +
"Ramping up all remaining reduces:" + numPendingReduces);
scheduleAllReduces();
return;
}
int totalMaps = assignedRequests.maps.size() + completedMaps + scheduledRequests.maps.size();
//check for slow start
if (!reduceStarted) {//not set yet
if (!getIsReduceStarted()) {//not set yet
int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart *
totalMaps);
if(completedMaps < completedMapsForReduceSlowstart) {
LOG.info("Reduce slow start threshold not met. " +
"completedMapsForReduceSlowstart " + completedMapsForReduceSlowstart);
"completedMapsForReduceSlowstart " +
completedMapsForReduceSlowstart);
return;
} else {
LOG.info("Reduce slow start threshold reached. Scheduling reduces.");
reduceStarted = true;
setIsReduceStarted(true);
}
}
@ -363,20 +379,21 @@ private void scheduleReduces() {
completedMapPercent = 1;
}
int netScheduledMapMem = scheduledRequests.maps.size() * mapResourceReqt
+ assignedRequests.maps.size() * mapResourceReqt;
int netScheduledMapMem =
(scheduledMaps + assignedMaps) * mapResourceReqt;
int netScheduledReduceMem = scheduledRequests.reduces.size()
* reduceResourceReqt + assignedRequests.reduces.size()
* reduceResourceReqt;
int netScheduledReduceMem =
(scheduledReduces + assignedReduces) * reduceResourceReqt;
int finalMapMemLimit = 0;
int finalReduceMemLimit = 0;
// ramp up the reduces based on completed map percentage
int totalMemLimit = getMemLimit();
int idealReduceMemLimit = Math.min((int)(completedMapPercent * totalMemLimit),
(int) (maxReduceRampupLimit * totalMemLimit));
int idealReduceMemLimit =
Math.min(
(int)(completedMapPercent * totalMemLimit),
(int) (maxReduceRampupLimit * totalMemLimit));
int idealMapMemLimit = totalMemLimit - idealReduceMemLimit;
// check if there aren't enough maps scheduled, give the free map capacity
@ -397,29 +414,46 @@ private void scheduleReduces() {
" netScheduledMapMem:" + netScheduledMapMem +
" netScheduledReduceMem:" + netScheduledReduceMem);
int rampUp = (finalReduceMemLimit - netScheduledReduceMem)
/ reduceResourceReqt;
int rampUp =
(finalReduceMemLimit - netScheduledReduceMem) / reduceResourceReqt;
if (rampUp > 0) {
rampUp = Math.min(rampUp, pendingReduces.size());
rampUp = Math.min(rampUp, numPendingReduces);
LOG.info("Ramping up " + rampUp);
//more reduce to be scheduled
for (int i = 0; i < rampUp; i++) {
ContainerRequest request = pendingReduces.removeFirst();
scheduledRequests.addReduce(request);
}
rampUpReduces(rampUp);
} else if (rampUp < 0){
int rampDown = -1 * rampUp;
rampDown = Math.min(rampDown, scheduledRequests.reduces.size());
rampDown = Math.min(rampDown, scheduledReduces);
LOG.info("Ramping down " + rampDown);
//remove from the scheduled and move back to pending
for (int i = 0; i < rampDown; i++) {
ContainerRequest request = scheduledRequests.removeReduce();
pendingReduces.add(request);
}
rampDownReduces(rampDown);
}
}
private void scheduleAllReduces() {
for (ContainerRequest req : pendingReduces) {
scheduledRequests.addReduce(req);
}
pendingReduces.clear();
}
@Private
public void rampUpReduces(int rampUp) {
//more reduce to be scheduled
for (int i = 0; i < rampUp; i++) {
ContainerRequest request = pendingReduces.removeFirst();
scheduledRequests.addReduce(request);
}
}
@Private
public void rampDownReduces(int rampDown) {
//remove from the scheduled and move back to pending
for (int i = 0; i < rampDown; i++) {
ContainerRequest request = scheduledRequests.removeReduce();
pendingReduces.add(request);
}
}
/**
* Synchronized to avoid findbugs warnings
*/
@ -429,8 +463,8 @@ private synchronized String getStat() {
" ScheduledReduces:" + scheduledRequests.reduces.size() +
" AssignedMaps:" + assignedRequests.maps.size() +
" AssignedReduces:" + assignedRequests.reduces.size() +
" completedMaps:" + completedMaps +
" completedReduces:" + completedReduces +
" completedMaps:" + getJob().getCompletedMaps() +
" completedReduces:" + getJob().getCompletedReduces() +
" containersAllocated:" + containersAllocated +
" containersReleased:" + containersReleased +
" hostLocalAssigned:" + hostLocalAssigned +
@ -497,11 +531,7 @@ private List<Container> getResources() throws Exception {
+ cont.getContainerId());
} else {
assignedRequests.remove(attemptID);
if (attemptID.getTaskId().getTaskType().equals(TaskType.MAP)) {
completedMaps++;
} else {
completedReduces++;
}
// send the container completed event to Task attempt
eventHandler.handle(new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
@ -514,7 +544,8 @@ private List<Container> getResources() throws Exception {
return newContainers;
}
private int getMemLimit() {
@Private
public int getMemLimit() {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
return headRoom + assignedRequests.maps.size() * mapResourceReqt +
assignedRequests.reduces.size() * reduceResourceReqt;

View File

@ -19,8 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.ArrayList;
@ -1218,6 +1217,70 @@ protected void startAllocatorThread() {
}
@Test
public void testReduceScheduling() throws Exception {
int totalMaps = 10;
int succeededMaps = 1;
int scheduledMaps = 10;
int scheduledReduces = 0;
int assignedMaps = 2;
int assignedReduces = 0;
int mapResourceReqt = 1024;
int reduceResourceReqt = 2*1024;
int numPendingReduces = 4;
float maxReduceRampupLimit = 0.5f;
float reduceSlowStart = 0.2f;
RMContainerAllocator allocator = mock(RMContainerAllocator.class);
doCallRealMethod().when(allocator).
scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(),
anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());
// Test slow-start
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, never()).setIsReduceStarted(true);
succeededMaps = 3;
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, times(1)).setIsReduceStarted(true);
// Test reduce ramp-up
doReturn(100 * 1024).when(allocator).getMemLimit();
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampUpReduces(anyInt());
verify(allocator, never()).rampDownReduces(anyInt());
// Test reduce ramp-down
scheduledReduces = 3;
doReturn(10 * 1024).when(allocator).getMemLimit();
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt());
}
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();