diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index ba3d6a4b494..bc67c513a7e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -331,6 +331,12 @@ ScheduledRequests getScheduledRequests() { return scheduledRequests; } + @Private + @VisibleForTesting + int getNumOfPendingReduces() { + return pendingReduces.size(); + } + public boolean getIsReduceStarted() { return reduceStarted; } @@ -510,15 +516,20 @@ boolean preemptReducesIfNeeded() { } // The pending mappers haven't been waiting for too long. Let us see if - // the headroom can fit a mapper. - Resource availableResourceForMap = getAvailableResources(); + // there are enough resources for a mapper to run. This is calculated by + // excluding scheduled reducers from headroom and comparing it against + // resources required to run one mapper. + Resource scheduledReducesResource = Resources.multiply( + reduceResourceRequest, scheduledRequests.reduces.size()); + Resource availableResourceForMap = + Resources.subtract(getAvailableResources(), scheduledReducesResource); if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap, mapResourceRequest, getSchedulerResourceTypes()) > 0) { - // the available headroom is enough to run a mapper + // Enough room to run a mapper return false; } - // Available headroom is not enough to run mapper. See if we should hold + // Available resources are not enough to run mapper. See if we should hold // off before preempting reducers and preempt if okay. return preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs); } @@ -960,11 +971,6 @@ public Resource getResourceLimit() { Resources.add(assignedMapResource, assignedReduceResource)); } - @VisibleForTesting - public int getNumOfPendingReduces() { - return pendingReduces.size(); - } - @Private @VisibleForTesting class ScheduledRequests { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index ba3b698a0f0..10ea9525969 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -3183,6 +3183,128 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() } } + /** + * Tests whether scheduled reducers are excluded from headroom while + * calculating headroom. + */ + @Test + public void testExcludeSchedReducesFromHeadroom() throws Exception { + LOG.info("Running testExcludeSchedReducesFromHeadroom"); + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, -1); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = + (DrainDispatcher) rm.getRMContext().getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + Task mockTask = mock(Task.class); + TaskAttempt mockTaskAttempt = mock(TaskAttempt.class); + when(mockJob.getTask((TaskId)any())).thenReturn(mockTask); + when(mockTask.getAttempt((TaskAttemptId)any())).thenReturn(mockTaskAttempt); + when(mockTaskAttempt.getProgress()).thenReturn(0.01f); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + MockNM nodeManager = rm.registerNode("h1:1234", 4096); + dispatcher.await(); + // Register nodes to RM. + MockNM nodeManager2 = rm.registerNode("h2:1234", 1024); + dispatcher.await(); + + // Request 2 maps and 1 reducer(sone on nodes which are not registered). + ContainerRequestEvent event1 = + createReq(jobId, 1, 1024, new String[] { "h1" }); + allocator.sendRequest(event1); + ContainerRequestEvent event2 = + createReq(jobId, 2, 1024, new String[] { "h2" }); + allocator.sendRequest(event2); + ContainerRequestEvent event3 = + createReq(jobId, 3, 1024, new String[] { "h1" }, false, true); + allocator.sendRequest(event3); + + // This will tell the scheduler about the requests but there will be no + // allocations as nodes are not added. + allocator.schedule(); + dispatcher.await(); + + // Request for another reducer on h3 which has not registered. + ContainerRequestEvent event4 = + createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + allocator.sendRequest(event4); + + allocator.schedule(); + dispatcher.await(); + + // Update resources in scheduler through node heartbeat from h1. + nodeManager.nodeHeartbeat(true); + dispatcher.await(); + + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(3072, 3)); + allocator.schedule(); + dispatcher.await(); + + // Two maps are assigned. + Assert.assertEquals(2, allocator.getAssignedRequests().maps.size()); + // Send deallocate request for map so that no maps are assigned after this. + ContainerAllocatorEvent deallocate1 = createDeallocateEvent(jobId, 1, false); + allocator.sendDeallocate(deallocate1); + ContainerAllocatorEvent deallocate2 = createDeallocateEvent(jobId, 2, false); + allocator.sendDeallocate(deallocate2); + // No map should be assigned. + Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); + + nodeManager.nodeHeartbeat(true); + dispatcher.await(); + + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); + allocator.schedule(); + dispatcher.await(); + + // h2 heartbeats. + nodeManager2.nodeHeartbeat(true); + dispatcher.await(); + + // Send request for one more mapper. + ContainerRequestEvent event5 = + createReq(jobId, 5, 1024, new String[] { "h1" }); + allocator.sendRequest(event5); + + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2)); + allocator.schedule(); + dispatcher.await(); + // One reducer is assigned and one map is scheduled + Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); + Assert.assertEquals(1, allocator.getAssignedRequests().reduces.size()); + // Headroom enough to run a mapper if headroom is taken as it is but wont be + // enough if scheduled reducers resources are deducted. + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1260, 2)); + allocator.schedule(); + dispatcher.await(); + // After allocate response, the one assigned reducer is preempted and killed + Assert.assertEquals(1, MyContainerAllocator.getTaskAttemptKillEvents().size()); + Assert.assertEquals(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC, + MyContainerAllocator.getTaskAttemptKillEvents().get(0).getMessage()); + Assert.assertEquals(1, allocator.getNumOfPendingReduces()); + } + private static class MockScheduler implements ApplicationMasterProtocol { ApplicationAttemptId attemptId; long nextContainerId = 10;