svn merge -c 1362209 FIXES: MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers to never be scheduled (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1362213 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-07-16 19:13:10 +00:00
parent d3b1109de8
commit 0d1867fa9d
3 changed files with 70 additions and 6 deletions

View File

@ -573,6 +573,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4299. Terasort hangs with MR2 FifoScheduler (Tom White via MAPREDUCE-4299. Terasort hangs with MR2 FifoScheduler (Tom White via
bobby) bobby)
MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers to never be
scheduled (Jason Lowe via bobby)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -47,9 +46,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@ -131,6 +127,7 @@ public class RMContainerAllocator extends RMContainerRequestor
private int containersReleased = 0; private int containersReleased = 0;
private int hostLocalAssigned = 0; private int hostLocalAssigned = 0;
private int rackLocalAssigned = 0; private int rackLocalAssigned = 0;
private int lastCompletedTasks = 0;
private boolean recalculateReduceSchedule = false; private boolean recalculateReduceSchedule = false;
private int mapResourceReqt;//memory private int mapResourceReqt;//memory
@ -214,11 +211,18 @@ public class RMContainerAllocator extends RMContainerRequestor
scheduledRequests.assign(allocatedContainers); scheduledRequests.assign(allocatedContainers);
LOG.info("After Assign: " + getStat()); LOG.info("After Assign: " + getStat());
} }
int completedMaps = getJob().getCompletedMaps();
int completedTasks = completedMaps + getJob().getCompletedReduces();
if (lastCompletedTasks != completedTasks) {
lastCompletedTasks = completedTasks;
recalculateReduceSchedule = true;
}
if (recalculateReduceSchedule) { if (recalculateReduceSchedule) {
preemptReducesIfNeeded(); preemptReducesIfNeeded();
scheduleReduces( scheduleReduces(
getJob().getTotalMaps(), getJob().getCompletedMaps(), getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(), scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(), assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceReqt, reduceResourceReqt, mapResourceReqt, reduceResourceReqt,

View File

@ -1409,7 +1409,63 @@ public class TestRMContainerAllocator {
maxReduceRampupLimit, reduceSlowStart); maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt()); verify(allocator).rampDownReduces(anyInt());
} }
private static class RecalculateContainerAllocator extends MyContainerAllocator {
public boolean recalculatedReduceSchedule = false;
public RecalculateContainerAllocator(MyResourceManager rm,
Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
super(rm, conf, appAttemptId, job);
}
@Override
public void scheduleReduces(int totalMaps, int completedMaps,
int scheduledMaps, int scheduledReduces, int assignedMaps,
int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
recalculatedReduceSchedule = true;
}
}
@Test
public void testCompletedTasksRecalculateSchedule() throws Exception {
LOG.info("Running testCompletedTasksRecalculateSchedule");
Configuration conf = new Configuration();
final MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job job = mock(Job.class);
when(job.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false));
doReturn(10).when(job).getTotalMaps();
doReturn(10).when(job).getTotalReduces();
doReturn(0).when(job).getCompletedMaps();
RecalculateContainerAllocator allocator =
new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
allocator.schedule();
allocator.recalculatedReduceSchedule = false;
allocator.schedule();
Assert.assertFalse("Unexpected recalculate of reduce schedule",
allocator.recalculatedReduceSchedule);
doReturn(1).when(job).getCompletedMaps();
allocator.schedule();
Assert.assertTrue("Expected recalculate of reduce schedule",
allocator.recalculatedReduceSchedule);
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator(); TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple(); t.testSimple();
@ -1418,6 +1474,7 @@ public class TestRMContainerAllocator {
t.testReportedAppProgress(); t.testReportedAppProgress();
t.testReportedAppProgressWithOnlyMaps(); t.testReportedAppProgressWithOnlyMaps();
t.testBlackListedNodes(); t.testBlackListedNodes();
t.testCompletedTasksRecalculateSchedule();
} }
} }