MAPREDUCE-4228. mapreduce.job.reduce.slowstart.completedmaps is not working properly (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1354181 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-06-26 19:17:33 +00:00
parent 0270889b4e
commit 03f2f9b580
3 changed files with 61 additions and 36 deletions

View File

@ -619,6 +619,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4295. RM crashes due to DNS issue (tgraves)
MAPREDUCE-4228. mapreduce.job.reduce.slowstart.completedmaps is not working
properly (Jason Lowe via bobby)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -417,15 +417,6 @@ public void scheduleReduces(
LOG.info("Recalculating schedule...");
//if all maps are assigned, then ramp up all reduces irrespective of the
//headroom
if (scheduledMaps == 0 && numPendingReduces > 0) {
LOG.info("All maps assigned. " +
"Ramping up all remaining reduces:" + numPendingReduces);
scheduleAllReduces();
return;
}
//check for slow start
if (!getIsReduceStarted()) {//not set yet
int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart *
@ -441,6 +432,15 @@ public void scheduleReduces(
}
}
//if all maps are assigned, then ramp up all reduces irrespective of the
//headroom
if (scheduledMaps == 0 && numPendingReduces > 0) {
LOG.info("All maps assigned. " +
"Ramping up all remaining reduces:" + numPendingReduces);
scheduleAllReduces();
return;
}
float completedMapPercent = 0f;
if (totalMaps != 0) {//support for 0 maps
completedMapPercent = (float)completedMaps/totalMaps;
@ -498,7 +498,8 @@ public void scheduleReduces(
}
}
private void scheduleAllReduces() {
@Private
public void scheduleAllReduces() {
for (ContainerRequest req : pendingReduces) {
scheduledRequests.addReduce(req);
}

View File

@ -18,15 +18,24 @@
package org.apache.hadoop.mapreduce.v2.app;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import junit.framework.Assert;
@ -65,9 +74,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
@ -76,13 +86,11 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Test;
@ -428,29 +436,21 @@ protected ContainerAllocator createContainerAllocator(
// Finish off 1 map.
Iterator<Task> it = job.getTasks().values().iterator();
finishNextNTasks(mrApp, it, 1);
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
allocator.schedule();
rmDispatcher.await();
Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f);
// Finish off 7 more so that map-progress is 80%
finishNextNTasks(mrApp, it, 7);
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7);
allocator.schedule();
rmDispatcher.await();
Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f);
// Finish off the 2 remaining maps
finishNextNTasks(mrApp, it, 2);
// Wait till all reduce-attempts request for containers
for (Task t : job.getTasks().values()) {
if (t.getType() == TaskType.REDUCE) {
mrApp.waitForState(t.getAttempts().values().iterator().next(),
TaskAttemptState.UNASSIGNED);
}
}
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
allocator.schedule();
rmDispatcher.await();
@ -467,7 +467,7 @@ protected ContainerAllocator createContainerAllocator(
}
// Finish off 2 reduces
finishNextNTasks(mrApp, it, 2);
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
allocator.schedule();
rmDispatcher.await();
@ -475,7 +475,7 @@ protected ContainerAllocator createContainerAllocator(
Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
// Finish off the remaining 8 reduces.
finishNextNTasks(mrApp, it, 8);
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8);
allocator.schedule();
rmDispatcher.await();
// Remaining is JobCleanup
@ -483,19 +483,28 @@ protected ContainerAllocator createContainerAllocator(
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
}
private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int nextN)
throws Exception {
private void finishNextNTasks(DrainDispatcher rmDispatcher, MockNM node,
MRApp mrApp, Iterator<Task> it, int nextN) throws Exception {
Task task;
for (int i=0; i<nextN; i++) {
task = it.next();
finishTask(mrApp, task);
finishTask(rmDispatcher, node, mrApp, task);
}
}
private void finishTask(MRApp mrApp, Task task) throws Exception {
private void finishTask(DrainDispatcher rmDispatcher, MockNM node,
MRApp mrApp, Task task) throws Exception {
TaskAttempt attempt = task.getAttempts().values().iterator().next();
List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1);
contStatus.add(BuilderUtils.newContainerStatus(attempt.getAssignedContainerID(),
ContainerState.COMPLETE, "", 0));
Map<ApplicationId,List<ContainerStatus>> statusUpdate =
new HashMap<ApplicationId,List<ContainerStatus>>(1);
statusUpdate.put(mrApp.getAppID(), contStatus);
node.nodeHeartbeat(statusUpdate, true);
rmDispatcher.await();
mrApp.getContext().getEventHandler().handle(
new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
mrApp.waitForState(task, TaskState.SUCCEEDED);
}
@ -576,21 +585,21 @@ protected ContainerAllocator createContainerAllocator(
Iterator<Task> it = job.getTasks().values().iterator();
// Finish off 1 map so that map-progress is 10%
finishNextNTasks(mrApp, it, 1);
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
allocator.schedule();
rmDispatcher.await();
Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f);
// Finish off 5 more map so that map-progress is 60%
finishNextNTasks(mrApp, it, 5);
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5);
allocator.schedule();
rmDispatcher.await();
Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
// Finish off remaining map so that map-progress is 100%
finishNextNTasks(mrApp, it, 4);
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4);
allocator.schedule();
rmDispatcher.await();
Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
@ -1338,6 +1347,18 @@ public void testReduceScheduling() throws Exception {
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, never()).setIsReduceStarted(true);
// verify slow-start still in effect when no more maps need to
// be scheduled but some have yet to complete
allocator.scheduleReduces(
totalMaps, succeededMaps,
0, scheduledReduces,
totalMaps - succeededMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, never()).setIsReduceStarted(true);
verify(allocator, never()).scheduleAllReduces();
succeededMaps = 3;
allocator.scheduleReduces(
totalMaps, succeededMaps,