diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a54ba67f187..d2dcb9f44d4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -164,6 +164,9 @@ Release 2.0.4-beta - UNRELEASED BUG FIXES + MAPREDUCE-4671. AM does not tell the RM about container requests which are + no longer needed. (Bikas Saha via sseth) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 59c9795e2d8..5f3a7f5f960 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -72,7 +72,10 @@ public abstract class RMContainerRequestor extends RMCommunicator { remoteRequestsTable = new TreeMap>>(); - private final Set ask = new TreeSet(); + // use custom comparator to make sure ResourceRequest objects differing only in + // numContainers dont end up as duplicates + private final Set ask = new TreeSet( + new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator()); private final Set release = new TreeSet(); private boolean nodeBlacklistingEnabled; @@ -235,7 +238,7 @@ protected void containerFailedOnHost(String hostName) { ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req); zeroedRequest.setNumContainers(0); // to be sent to RM on next heartbeat - ask.add(zeroedRequest); + addResourceRequestToAsk(zeroedRequest); } } // if all requests were still in ask queue @@ -320,7 +323,7 @@ private void addResourceRequest(Priority priority, String resourceName, remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); // Note this down for next interaction with ResourceManager - ask.add(remoteRequest); + addResourceRequestToAsk(remoteRequest); if (LOG.isDebugEnabled()) { LOG.debug("addResourceRequest:" + " applicationId=" + applicationId.getId() + " priority=" + priority.getPriority() @@ -353,7 +356,12 @@ private void decResourceRequest(Priority priority, String resourceName, + remoteRequest.getNumContainers() + " #asks=" + ask.size()); } - remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1); + if(remoteRequest.getNumContainers() > 0) { + // based on blacklisting comments above we can end up decrementing more + // than requested. so guard for that. + remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1); + } + if (remoteRequest.getNumContainers() == 0) { reqMap.remove(capability); if (reqMap.size() == 0) { @@ -362,13 +370,12 @@ private void decResourceRequest(Priority priority, String resourceName, if (remoteRequests.size() == 0) { remoteRequestsTable.remove(priority); } - //remove from ask if it may have - ask.remove(remoteRequest); - } else { - ask.add(remoteRequest);//this will override the request if ask doesn't - //already have it. } + // send the updated resource request to RM + // send 0 container count requests also to cancel previous requests + addResourceRequestToAsk(remoteRequest); + if (LOG.isDebugEnabled()) { LOG.info("AFTER decResourceRequest:" + " applicationId=" + applicationId.getId() + " priority=" + priority.getPriority() @@ -376,6 +383,16 @@ private void decResourceRequest(Priority priority, String resourceName, + remoteRequest.getNumContainers() + " #asks=" + ask.size()); } } + + private void addResourceRequestToAsk(ResourceRequest remoteRequest) { + // because objects inside the resource map can be deleted ask can end up + // containing an object that matches new resource object but with different + // numContainers. So exisintg values must be replaced explicitly + if(ask.contains(remoteRequest)) { + ask.remove(remoteRequest); + } + ask.add(remoteRequest); + } protected void release(ContainerId containerId) { release.add(containerId); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index bd00e1b1608..47845a0aa6f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -167,6 +167,7 @@ public void testSimple() throws Exception { List assigned = allocator.schedule(); dispatcher.await(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size()); // send another request with different resource and priority ContainerRequestEvent event3 = createReq(jobId, 3, 1024, @@ -178,7 +179,8 @@ public void testSimple() throws Exception { assigned = allocator.schedule(); dispatcher.await(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); - + Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size()); + // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat @@ -187,8 +189,14 @@ public void testSimple() throws Exception { assigned = allocator.schedule(); dispatcher.await(); + Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size()); checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, assigned, false); + + // check that the assigned container requests are cancelled + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size()); } @Test @@ -422,7 +430,7 @@ public void testMapReduceScheduling() throws Exception { } private static class MyResourceManager extends MockRM { - + public MyResourceManager(Configuration conf) { super(conf); } @@ -446,6 +454,10 @@ public void handle(SchedulerEvent event) { protected ResourceScheduler createScheduler() { return new MyFifoScheduler(this.getRMContext()); } + + MyFifoScheduler getMyFifoScheduler() { + return (MyFifoScheduler) scheduler; + } } @Test @@ -1194,7 +1206,9 @@ public MyFifoScheduler(RMContext rmContext) { assert (false); } } - + + List lastAsk = null; + // override this to copy the objects otherwise FifoScheduler updates the // numContainers in same objects as kept by RMContainerAllocator @Override @@ -1208,6 +1222,7 @@ public synchronized Allocation allocate( .getNumContainers()); askCopy.add(reqCopy); } + lastAsk = ask; return super.allocate(applicationAttemptId, askCopy, release); } }