diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 0f4b59bd3f2..459cd3b5ca9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -527,12 +527,7 @@ private boolean preemptReducersForHangingMapRequests(long pendingThreshold) { } private void clearAllPendingReduceRequests() { - LOG.info("Ramping down all scheduled reduces:" - + scheduledRequests.reduces.size()); - for (ContainerRequest req : scheduledRequests.reduces.values()) { - pendingReduces.add(req); - } - scheduledRequests.reduces.clear(); + rampDownReduces(Integer.MAX_VALUE); } private void preemptReducer(int hangingMapRequests) { @@ -704,9 +699,13 @@ public void rampUpReduces(int rampUp) { @Private public void rampDownReduces(int rampDown) { //remove from the scheduled and move back to pending - for (int i = 0; i < rampDown; i++) { + while (rampDown > 0) { ContainerRequest request = scheduledRequests.removeReduce(); + if (request == null) { + return; + } pendingReduces.add(request); + rampDown--; } } @@ -956,6 +955,11 @@ public Resource getResourceLimit() { Resources.add(assignedMapResource, assignedReduceResource)); } + @VisibleForTesting + public int getNumOfPendingReduces() { + return pendingReduces.size(); + } + @Private @VisibleForTesting class ScheduledRequests { @@ -971,8 +975,9 @@ class ScheduledRequests { @VisibleForTesting final Map maps = new LinkedHashMap(); - - private final LinkedHashMap reduces = + + @VisibleForTesting + final LinkedHashMap reduces = new LinkedHashMap(); boolean remove(TaskAttemptId tId) { @@ -1372,7 +1377,8 @@ private void assignMapsWithLocality(List allocatedContainers) { class AssignedRequests { private final Map containerToAttemptMap = new HashMap(); - private final LinkedHashMap maps = + @VisibleForTesting + final LinkedHashMap maps = new LinkedHashMap(); @VisibleForTesting final LinkedHashMap reduces = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index a639d5579a6..0e87f29017b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -562,4 +562,10 @@ protected void setRequestLimit(Priority priority, Resource capability, public Set getBlacklistedNodes() { return blacklistedNodes; } + + @Private + @VisibleForTesting + Set getAsk() { + return ask; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index a2aaa9d704b..0124f7ca80d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -1899,6 +1899,7 @@ private static AppContext createAppContext( when(context.getApplicationID()).thenReturn(appId); when(context.getApplicationAttemptId()).thenReturn(appAttemptId); when(context.getJob(isA(JobId.class))).thenReturn(job); + when(context.getClock()).thenReturn(new ControlledClock()); when(context.getClusterInfo()).thenReturn( new ClusterInfo(Resource.newInstance(10240, 1))); when(context.getEventHandler()).thenReturn(new EventHandler() { @@ -2893,6 +2894,128 @@ public void testAttemptNotFoundCausesRMCommunicatorException() allocator.schedule(); } + @Test + public void testUpdateAskOnRampDownAllReduces() throws Exception { + LOG.info("Running testUpdateAskOnRampDownAllReduces"); + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = + (DrainDispatcher) rm.getRMContext().getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + // Use a controlled clock to advance time for test. + ControlledClock clock = (ControlledClock)allocator.getContext().getClock(); + clock.setTime(System.currentTimeMillis()); + + // Register nodes to RM. + MockNM nodeManager = rm.registerNode("h1:1234", 1024); + dispatcher.await(); + + // Request 2 maps and 1 reducer(sone on nodes which are not registered). + ContainerRequestEvent event1 = + createReq(jobId, 1, 1024, new String[] { "h1" }); + allocator.sendRequest(event1); + ContainerRequestEvent event2 = + createReq(jobId, 2, 1024, new String[] { "h2" }); + allocator.sendRequest(event2); + ContainerRequestEvent event3 = + createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); + allocator.sendRequest(event3); + + // This will tell the scheduler about the requests but there will be no + // allocations as nodes are not added. + allocator.schedule(); + dispatcher.await(); + + // Advance clock so that maps can be considered as hanging. + clock.setTime(System.currentTimeMillis() + 500000L); + + // Request for another reducer on h3 which has not registered. + ContainerRequestEvent event4 = + createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + allocator.sendRequest(event4); + + allocator.schedule(); + dispatcher.await(); + + // Update resources in scheduler through node heartbeat from h1. + nodeManager.nodeHeartbeat(true); + dispatcher.await(); + + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); + allocator.schedule(); + dispatcher.await(); + + // One map is assigned. + Assert.assertEquals(1, allocator.getAssignedRequests().maps.size()); + // Send deallocate request for map so that no maps are assigned after this. + ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false); + allocator.sendDeallocate(deallocate); + // Now one reducer should be scheduled and one should be pending. + Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size()); + Assert.assertEquals(1, allocator.getNumOfPendingReduces()); + // No map should be assigned and one should be scheduled. + Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); + Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); + + Assert.assertEquals(6, allocator.getAsk().size()); + for (ResourceRequest req : allocator.getAsk()) { + boolean isReduce = + req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE); + if (isReduce) { + // 1 reducer each asked on h2, * and default-rack + Assert.assertTrue((req.getResourceName().equals("*") || + req.getResourceName().equals("/default-rack") || + req.getResourceName().equals("h2")) && req.getNumContainers() == 1); + } else { //map + // 0 mappers asked on h1 and 1 each on * and default-rack + Assert.assertTrue(((req.getResourceName().equals("*") || + req.getResourceName().equals("/default-rack")) && + req.getNumContainers() == 1) || (req.getResourceName().equals("h1") + && req.getNumContainers() == 0)); + } + } + // On next allocate request to scheduler, headroom reported will be 0. + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0)); + allocator.schedule(); + dispatcher.await(); + // After allocate response from scheduler, all scheduled reduces are ramped + // down and move to pending. 3 asks are also updated with 0 containers to + // indicate ramping down of reduces to scheduler. + Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size()); + Assert.assertEquals(2, allocator.getNumOfPendingReduces()); + Assert.assertEquals(3, allocator.getAsk().size()); + for (ResourceRequest req : allocator.getAsk()) { + Assert.assertEquals( + RMContainerAllocator.PRIORITY_REDUCE, req.getPriority()); + Assert.assertTrue(req.getResourceName().equals("*") || + req.getResourceName().equals("/default-rack") || + req.getResourceName().equals("h2")); + Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability()); + Assert.assertEquals(0, req.getNumContainers()); + } + } + private static class MockScheduler implements ApplicationMasterProtocol { ApplicationAttemptId attemptId; long nextContainerId = 10;