diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 459cd3b5ca9..d2d1b4ececf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -281,14 +281,18 @@ public class RMContainerAllocator extends RMContainerRequestor } if (recalculateReduceSchedule) { - preemptReducesIfNeeded(); - scheduleReduces( - getJob().getTotalMaps(), completedMaps, - scheduledRequests.maps.size(), scheduledRequests.reduces.size(), - assignedRequests.maps.size(), assignedRequests.reduces.size(), - mapResourceRequest, reduceResourceRequest, - pendingReduces.size(), - maxReduceRampupLimit, reduceSlowStart); + boolean reducerPreempted = preemptReducesIfNeeded(); + + if (!reducerPreempted) { + // Only schedule new reducers if no reducer preemption happens for + // this heartbeat + scheduleReduces(getJob().getTotalMaps(), completedMaps, + scheduledRequests.maps.size(), scheduledRequests.reduces.size(), + assignedRequests.maps.size(), assignedRequests.reduces.size(), + mapResourceRequest, reduceResourceRequest, pendingReduces.size(), + maxReduceRampupLimit, reduceSlowStart); + } + recalculateReduceSchedule = false; } @@ -475,30 +479,30 @@ public class RMContainerAllocator extends RMContainerRequestor @Private @VisibleForTesting - void preemptReducesIfNeeded() { + boolean preemptReducesIfNeeded() { if (reduceResourceRequest.equals(Resources.none())) { - return; // no reduces + return false; // no reduces } if (assignedRequests.maps.size() > 0) { // there are assigned mappers - return; + return false; } if (scheduledRequests.maps.size() <= 0) { // there are no pending requests for mappers - return; + return false; } + // At this point: // we have pending mappers and all assigned resources are taken by reducers - if (reducerUnconditionalPreemptionDelayMs >= 0) { // Unconditional preemption is enabled. // If mappers are pending for longer than the configured threshold, // preempt reducers irrespective of what the headroom is. if (preemptReducersForHangingMapRequests( reducerUnconditionalPreemptionDelayMs)) { - return; + return true; } } @@ -508,12 +512,12 @@ public class RMContainerAllocator extends RMContainerRequestor if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap, mapResourceRequest, getSchedulerResourceTypes()) > 0) { // the available headroom is enough to run a mapper - return; + return false; } // Available headroom is not enough to run mapper. See if we should hold // off before preempting reducers and preempt if okay. - preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs); + return preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs); } private boolean preemptReducersForHangingMapRequests(long pendingThreshold) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 0124f7ca80d..83f03ca83f1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -3016,6 +3016,133 @@ public class TestRMContainerAllocator { } } + + @Test + public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() + throws Exception { + LOG.info("Running testAvoidAskMoreReducersWhenReducerPreemptionIsRequired"); + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = + (DrainDispatcher) rm.getRMContext().getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + // Use a controlled clock to advance time for test. + ControlledClock clock = (ControlledClock)allocator.getContext().getClock(); + clock.setTime(System.currentTimeMillis()); + + // Register nodes to RM. + MockNM nodeManager = rm.registerNode("h1:1234", 1024); + dispatcher.await(); + + // Request 2 maps and 1 reducer(sone on nodes which are not registered). + ContainerRequestEvent event1 = + createReq(jobId, 1, 1024, new String[] { "h1" }); + allocator.sendRequest(event1); + ContainerRequestEvent event2 = + createReq(jobId, 2, 1024, new String[] { "h2" }); + allocator.sendRequest(event2); + ContainerRequestEvent event3 = + createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); + allocator.sendRequest(event3); + + // This will tell the scheduler about the requests but there will be no + // allocations as nodes are not added. + allocator.schedule(); + dispatcher.await(); + + // Advance clock so that maps can be considered as hanging. + clock.setTime(System.currentTimeMillis() + 500000L); + + // Request for another reducer on h3 which has not registered. + ContainerRequestEvent event4 = + createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + allocator.sendRequest(event4); + + allocator.schedule(); + dispatcher.await(); + + // Update resources in scheduler through node heartbeat from h1. + nodeManager.nodeHeartbeat(true); + dispatcher.await(); + + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); + allocator.schedule(); + dispatcher.await(); + + // One map is assigned. + Assert.assertEquals(1, allocator.getAssignedRequests().maps.size()); + // Send deallocate request for map so that no maps are assigned after this. + ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false); + allocator.sendDeallocate(deallocate); + // Now one reducer should be scheduled and one should be pending. + Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size()); + Assert.assertEquals(1, allocator.getNumOfPendingReduces()); + // No map should be assigned and one should be scheduled. + Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); + Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); + + Assert.assertEquals(6, allocator.getAsk().size()); + for (ResourceRequest req : allocator.getAsk()) { + boolean isReduce = + req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE); + if (isReduce) { + // 1 reducer each asked on h2, * and default-rack + Assert.assertTrue((req.getResourceName().equals("*") || + req.getResourceName().equals("/default-rack") || + req.getResourceName().equals("h2")) && req.getNumContainers() == 1); + } else { //map + // 0 mappers asked on h1 and 1 each on * and default-rack + Assert.assertTrue(((req.getResourceName().equals("*") || + req.getResourceName().equals("/default-rack")) && + req.getNumContainers() == 1) || (req.getResourceName().equals("h1") + && req.getNumContainers() == 0)); + } + } + + clock.setTime(System.currentTimeMillis() + 500000L + 10 * 60 * 1000); + + // On next allocate request to scheduler, headroom reported will be 2048. + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 0)); + allocator.schedule(); + dispatcher.await(); + // After allocate response from scheduler, all scheduled reduces are ramped + // down and move to pending. 3 asks are also updated with 0 containers to + // indicate ramping down of reduces to scheduler. + Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size()); + Assert.assertEquals(2, allocator.getNumOfPendingReduces()); + Assert.assertEquals(3, allocator.getAsk().size()); + for (ResourceRequest req : allocator.getAsk()) { + Assert.assertEquals( + RMContainerAllocator.PRIORITY_REDUCE, req.getPriority()); + Assert.assertTrue(req.getResourceName().equals("*") || + req.getResourceName().equals("/default-rack") || + req.getResourceName().equals("h2")); + Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability()); + Assert.assertEquals(0, req.getNumContainers()); + } + } + private static class MockScheduler implements ApplicationMasterProtocol { ApplicationAttemptId attemptId; long nextContainerId = 10;