diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0a6989f8eae..d5f14408c43 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -573,6 +573,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4299. Terasort hangs with MR2 FifoScheduler (Tom White via bobby) + MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers to never be + scheduled (Jason Lowe via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index b28916c9b5f..f8ebfcfc6d2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.app.rm; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -47,9 +46,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; -import org.apache.hadoop.mapreduce.v2.app.job.Job; -import org.apache.hadoop.mapreduce.v2.app.job.Task; -import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; @@ -131,6 +127,7 @@ public class RMContainerAllocator extends RMContainerRequestor private int containersReleased = 0; private int hostLocalAssigned = 0; private int rackLocalAssigned = 0; + private int lastCompletedTasks = 0; private boolean recalculateReduceSchedule = false; private int mapResourceReqt;//memory @@ -214,11 +211,18 @@ public class RMContainerAllocator extends RMContainerRequestor scheduledRequests.assign(allocatedContainers); LOG.info("After Assign: " + getStat()); } - + + int completedMaps = getJob().getCompletedMaps(); + int completedTasks = completedMaps + getJob().getCompletedReduces(); + if (lastCompletedTasks != completedTasks) { + lastCompletedTasks = completedTasks; + recalculateReduceSchedule = true; + } + if (recalculateReduceSchedule) { preemptReducesIfNeeded(); scheduleReduces( - getJob().getTotalMaps(), getJob().getCompletedMaps(), + getJob().getTotalMaps(), completedMaps, scheduledRequests.maps.size(), scheduledRequests.reduces.size(), assignedRequests.maps.size(), assignedRequests.reduces.size(), mapResourceReqt, reduceResourceReqt, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 4109ed4dd68..e252c6f6dc3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -1409,7 +1409,63 @@ public class TestRMContainerAllocator { maxReduceRampupLimit, reduceSlowStart); verify(allocator).rampDownReduces(anyInt()); } + + private static class RecalculateContainerAllocator extends MyContainerAllocator { + public boolean recalculatedReduceSchedule = false; + + public RecalculateContainerAllocator(MyResourceManager rm, + Configuration conf, ApplicationAttemptId appAttemptId, Job job) { + super(rm, conf, appAttemptId, job); + } + + @Override + public void scheduleReduces(int totalMaps, int completedMaps, + int scheduledMaps, int scheduledReduces, int assignedMaps, + int assignedReduces, int mapResourceReqt, int reduceResourceReqt, + int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) { + recalculatedReduceSchedule = true; + } + } + @Test + public void testCompletedTasksRecalculateSchedule() throws Exception { + LOG.info("Running testCompletedTasksRecalculateSchedule"); + + Configuration conf = new Configuration(); + final MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job job = mock(Job.class); + when(job.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false)); + doReturn(10).when(job).getTotalMaps(); + doReturn(10).when(job).getTotalReduces(); + doReturn(0).when(job).getCompletedMaps(); + RecalculateContainerAllocator allocator = + new RecalculateContainerAllocator(rm, conf, appAttemptId, job); + allocator.schedule(); + + allocator.recalculatedReduceSchedule = false; + allocator.schedule(); + Assert.assertFalse("Unexpected recalculate of reduce schedule", + allocator.recalculatedReduceSchedule); + + doReturn(1).when(job).getCompletedMaps(); + allocator.schedule(); + Assert.assertTrue("Expected recalculate of reduce schedule", + allocator.recalculatedReduceSchedule); + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple(); @@ -1418,6 +1474,7 @@ public class TestRMContainerAllocator { t.testReportedAppProgress(); t.testReportedAppProgressWithOnlyMaps(); t.testBlackListedNodes(); + t.testCompletedTasksRecalculateSchedule(); } }