svn merge -c 1354181 FIXES: MAPREDUCE-4228. mapreduce.job.reduce.slowstart.completedmaps is not working properly (Jason Lowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1354182 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eea9ffe8ac
commit
4bb61ecbd7
|
@ -498,6 +498,9 @@ Release 0.23.3 - UNRELEASED
|
|||
|
||||
MAPREDUCE-4295. RM crashes due to DNS issue (tgraves)
|
||||
|
||||
MAPREDUCE-4228. mapreduce.job.reduce.slowstart.completedmaps is not working
|
||||
properly (Jason Lowe via bobby)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -417,15 +417,6 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
|
||||
LOG.info("Recalculating schedule...");
|
||||
|
||||
//if all maps are assigned, then ramp up all reduces irrespective of the
|
||||
//headroom
|
||||
if (scheduledMaps == 0 && numPendingReduces > 0) {
|
||||
LOG.info("All maps assigned. " +
|
||||
"Ramping up all remaining reduces:" + numPendingReduces);
|
||||
scheduleAllReduces();
|
||||
return;
|
||||
}
|
||||
|
||||
//check for slow start
|
||||
if (!getIsReduceStarted()) {//not set yet
|
||||
int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart *
|
||||
|
@ -441,6 +432,15 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
}
|
||||
}
|
||||
|
||||
//if all maps are assigned, then ramp up all reduces irrespective of the
|
||||
//headroom
|
||||
if (scheduledMaps == 0 && numPendingReduces > 0) {
|
||||
LOG.info("All maps assigned. " +
|
||||
"Ramping up all remaining reduces:" + numPendingReduces);
|
||||
scheduleAllReduces();
|
||||
return;
|
||||
}
|
||||
|
||||
float completedMapPercent = 0f;
|
||||
if (totalMaps != 0) {//support for 0 maps
|
||||
completedMapPercent = (float)completedMaps/totalMaps;
|
||||
|
@ -498,7 +498,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
}
|
||||
}
|
||||
|
||||
private void scheduleAllReduces() {
|
||||
@Private
|
||||
public void scheduleAllReduces() {
|
||||
for (ContainerRequest req : pendingReduces) {
|
||||
scheduledRequests.addReduce(req);
|
||||
}
|
||||
|
|
|
@ -18,15 +18,24 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.app;
|
||||
|
||||
import static org.mockito.Matchers.anyFloat;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
@ -65,9 +74,10 @@ import org.apache.hadoop.yarn.api.AMRMProtocol;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
|
@ -76,13 +86,11 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
@ -428,29 +436,21 @@ public class TestRMContainerAllocator {
|
|||
|
||||
// Finish off 1 map.
|
||||
Iterator<Task> it = job.getTasks().values().iterator();
|
||||
finishNextNTasks(mrApp, it, 1);
|
||||
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
|
||||
allocator.schedule();
|
||||
rmDispatcher.await();
|
||||
Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
|
||||
Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f);
|
||||
|
||||
// Finish off 7 more so that map-progress is 80%
|
||||
finishNextNTasks(mrApp, it, 7);
|
||||
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7);
|
||||
allocator.schedule();
|
||||
rmDispatcher.await();
|
||||
Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
|
||||
Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f);
|
||||
|
||||
// Finish off the 2 remaining maps
|
||||
finishNextNTasks(mrApp, it, 2);
|
||||
|
||||
// Wait till all reduce-attempts request for containers
|
||||
for (Task t : job.getTasks().values()) {
|
||||
if (t.getType() == TaskType.REDUCE) {
|
||||
mrApp.waitForState(t.getAttempts().values().iterator().next(),
|
||||
TaskAttemptState.UNASSIGNED);
|
||||
}
|
||||
}
|
||||
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
|
||||
|
||||
allocator.schedule();
|
||||
rmDispatcher.await();
|
||||
|
@ -467,7 +467,7 @@ public class TestRMContainerAllocator {
|
|||
}
|
||||
|
||||
// Finish off 2 reduces
|
||||
finishNextNTasks(mrApp, it, 2);
|
||||
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
|
||||
|
||||
allocator.schedule();
|
||||
rmDispatcher.await();
|
||||
|
@ -475,7 +475,7 @@ public class TestRMContainerAllocator {
|
|||
Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
|
||||
|
||||
// Finish off the remaining 8 reduces.
|
||||
finishNextNTasks(mrApp, it, 8);
|
||||
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8);
|
||||
allocator.schedule();
|
||||
rmDispatcher.await();
|
||||
// Remaining is JobCleanup
|
||||
|
@ -483,17 +483,26 @@ public class TestRMContainerAllocator {
|
|||
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
|
||||
}
|
||||
|
||||
private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int nextN)
|
||||
throws Exception {
|
||||
private void finishNextNTasks(DrainDispatcher rmDispatcher, MockNM node,
|
||||
MRApp mrApp, Iterator<Task> it, int nextN) throws Exception {
|
||||
Task task;
|
||||
for (int i=0; i<nextN; i++) {
|
||||
task = it.next();
|
||||
finishTask(mrApp, task);
|
||||
finishTask(rmDispatcher, node, mrApp, task);
|
||||
}
|
||||
}
|
||||
|
||||
private void finishTask(MRApp mrApp, Task task) throws Exception {
|
||||
private void finishTask(DrainDispatcher rmDispatcher, MockNM node,
|
||||
MRApp mrApp, Task task) throws Exception {
|
||||
TaskAttempt attempt = task.getAttempts().values().iterator().next();
|
||||
List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1);
|
||||
contStatus.add(BuilderUtils.newContainerStatus(attempt.getAssignedContainerID(),
|
||||
ContainerState.COMPLETE, "", 0));
|
||||
Map<ApplicationId,List<ContainerStatus>> statusUpdate =
|
||||
new HashMap<ApplicationId,List<ContainerStatus>>(1);
|
||||
statusUpdate.put(mrApp.getAppID(), contStatus);
|
||||
node.nodeHeartbeat(statusUpdate, true);
|
||||
rmDispatcher.await();
|
||||
mrApp.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
|
||||
mrApp.waitForState(task, TaskState.SUCCEEDED);
|
||||
|
@ -576,21 +585,21 @@ public class TestRMContainerAllocator {
|
|||
Iterator<Task> it = job.getTasks().values().iterator();
|
||||
|
||||
// Finish off 1 map so that map-progress is 10%
|
||||
finishNextNTasks(mrApp, it, 1);
|
||||
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
|
||||
allocator.schedule();
|
||||
rmDispatcher.await();
|
||||
Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
|
||||
Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f);
|
||||
|
||||
// Finish off 5 more map so that map-progress is 60%
|
||||
finishNextNTasks(mrApp, it, 5);
|
||||
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5);
|
||||
allocator.schedule();
|
||||
rmDispatcher.await();
|
||||
Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
|
||||
Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
|
||||
|
||||
// Finish off remaining map so that map-progress is 100%
|
||||
finishNextNTasks(mrApp, it, 4);
|
||||
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4);
|
||||
allocator.schedule();
|
||||
rmDispatcher.await();
|
||||
Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
|
||||
|
@ -1338,6 +1347,18 @@ public class TestRMContainerAllocator {
|
|||
maxReduceRampupLimit, reduceSlowStart);
|
||||
verify(allocator, never()).setIsReduceStarted(true);
|
||||
|
||||
// verify slow-start still in effect when no more maps need to
|
||||
// be scheduled but some have yet to complete
|
||||
allocator.scheduleReduces(
|
||||
totalMaps, succeededMaps,
|
||||
0, scheduledReduces,
|
||||
totalMaps - succeededMaps, assignedReduces,
|
||||
mapResourceReqt, reduceResourceReqt,
|
||||
numPendingReduces,
|
||||
maxReduceRampupLimit, reduceSlowStart);
|
||||
verify(allocator, never()).setIsReduceStarted(true);
|
||||
verify(allocator, never()).scheduleAllReduces();
|
||||
|
||||
succeededMaps = 3;
|
||||
allocator.scheduleReduces(
|
||||
totalMaps, succeededMaps,
|
||||
|
|
Loading…
Reference in New Issue