diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5cce59ba04f..36dc62f5f5f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -619,6 +619,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4295. RM crashes due to DNS issue (tgraves) + MAPREDUCE-4228. mapreduce.job.reduce.slowstart.completedmaps is not working + properly (Jason Lowe via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index efef456f109..a0ba0e4c694 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -417,15 +417,6 @@ public void scheduleReduces( LOG.info("Recalculating schedule..."); - //if all maps are assigned, then ramp up all reduces irrespective of the - //headroom - if (scheduledMaps == 0 && numPendingReduces > 0) { - LOG.info("All maps assigned. " + - "Ramping up all remaining reduces:" + numPendingReduces); - scheduleAllReduces(); - return; - } - //check for slow start if (!getIsReduceStarted()) {//not set yet int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart * @@ -441,6 +432,15 @@ public void scheduleReduces( } } + //if all maps are assigned, then ramp up all reduces irrespective of the + //headroom + if (scheduledMaps == 0 && numPendingReduces > 0) { + LOG.info("All maps assigned. " + + "Ramping up all remaining reduces:" + numPendingReduces); + scheduleAllReduces(); + return; + } + float completedMapPercent = 0f; if (totalMaps != 0) {//support for 0 maps completedMapPercent = (float)completedMaps/totalMaps; @@ -498,7 +498,8 @@ public void scheduleReduces( } } - private void scheduleAllReduces() { + @Private + public void scheduleAllReduces() { for (ContainerRequest req : pendingReduces) { scheduledRequests.addReduce(req); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 98bc020ecb4..33306f45937 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -18,15 +18,24 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.mockito.Matchers.anyFloat; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import junit.framework.Assert; @@ -65,9 +74,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; @@ -76,13 +86,11 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Test; @@ -428,29 +436,21 @@ protected ContainerAllocator createContainerAllocator( // Finish off 1 map. Iterator it = job.getTasks().values().iterator(); - finishNextNTasks(mrApp, it, 1); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); allocator.schedule(); rmDispatcher.await(); Assert.assertEquals(0.095f, job.getProgress(), 0.001f); Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f); // Finish off 7 more so that map-progress is 80% - finishNextNTasks(mrApp, it, 7); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7); allocator.schedule(); rmDispatcher.await(); Assert.assertEquals(0.41f, job.getProgress(), 0.001f); Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f); // Finish off the 2 remaining maps - finishNextNTasks(mrApp, it, 2); - - // Wait till all reduce-attempts request for containers - for (Task t : job.getTasks().values()) { - if (t.getType() == TaskType.REDUCE) { - mrApp.waitForState(t.getAttempts().values().iterator().next(), - TaskAttemptState.UNASSIGNED); - } - } + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); allocator.schedule(); rmDispatcher.await(); @@ -467,7 +467,7 @@ protected ContainerAllocator createContainerAllocator( } // Finish off 2 reduces - finishNextNTasks(mrApp, it, 2); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); allocator.schedule(); rmDispatcher.await(); @@ -475,7 +475,7 @@ protected ContainerAllocator createContainerAllocator( Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); // Finish off the remaining 8 reduces. - finishNextNTasks(mrApp, it, 8); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8); allocator.schedule(); rmDispatcher.await(); // Remaining is JobCleanup @@ -483,19 +483,28 @@ protected ContainerAllocator createContainerAllocator( Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); } - private void finishNextNTasks(MRApp mrApp, Iterator it, int nextN) - throws Exception { + private void finishNextNTasks(DrainDispatcher rmDispatcher, MockNM node, + MRApp mrApp, Iterator it, int nextN) throws Exception { Task task; for (int i=0; i contStatus = new ArrayList(1); + contStatus.add(BuilderUtils.newContainerStatus(attempt.getAssignedContainerID(), + ContainerState.COMPLETE, "", 0)); + Map> statusUpdate = + new HashMap>(1); + statusUpdate.put(mrApp.getAppID(), contStatus); + node.nodeHeartbeat(statusUpdate, true); + rmDispatcher.await(); mrApp.getContext().getEventHandler().handle( - new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE)); + new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE)); mrApp.waitForState(task, TaskState.SUCCEEDED); } @@ -576,21 +585,21 @@ protected ContainerAllocator createContainerAllocator( Iterator it = job.getTasks().values().iterator(); // Finish off 1 map so that map-progress is 10% - finishNextNTasks(mrApp, it, 1); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); allocator.schedule(); rmDispatcher.await(); Assert.assertEquals(0.14f, job.getProgress(), 0.001f); Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f); // Finish off 5 more map so that map-progress is 60% - finishNextNTasks(mrApp, it, 5); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5); allocator.schedule(); rmDispatcher.await(); Assert.assertEquals(0.59f, job.getProgress(), 0.001f); Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); // Finish off remaining map so that map-progress is 100% - finishNextNTasks(mrApp, it, 4); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4); allocator.schedule(); rmDispatcher.await(); Assert.assertEquals(0.95f, job.getProgress(), 0.001f); @@ -1338,6 +1347,18 @@ public void testReduceScheduling() throws Exception { maxReduceRampupLimit, reduceSlowStart); verify(allocator, never()).setIsReduceStarted(true); + // verify slow-start still in effect when no more maps need to + // be scheduled but some have yet to complete + allocator.scheduleReduces( + totalMaps, succeededMaps, + 0, scheduledReduces, + totalMaps - succeededMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, + maxReduceRampupLimit, reduceSlowStart); + verify(allocator, never()).setIsReduceStarted(true); + verify(allocator, never()).scheduleAllReduces(); + succeededMaps = 3; allocator.scheduleReduces( totalMaps, succeededMaps,