merge MAPREDUCE-4671 from trunk. AM does not tell the RM about container requests which are no longer needed. Contributed by Bikas Saha.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1443324 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-02-07 07:08:30 +00:00
parent 577aed0fcf
commit 288dd9b025
3 changed files with 47 additions and 12 deletions

View File

@ -12,6 +12,9 @@ Release 2.0.4-beta - UNRELEASED
BUG FIXES BUG FIXES
MAPREDUCE-4671. AM does not tell the RM about container requests which are
no longer needed. (Bikas Saha via sseth)
Release 2.0.3-alpha - 2013-02-06 Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -72,7 +72,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
remoteRequestsTable = remoteRequestsTable =
new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>(); new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(); // use custom comparator to make sure ResourceRequest objects differing only in
// numContainers dont end up as duplicates
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
private final Set<ContainerId> release = new TreeSet<ContainerId>(); private final Set<ContainerId> release = new TreeSet<ContainerId>();
private boolean nodeBlacklistingEnabled; private boolean nodeBlacklistingEnabled;
@ -235,7 +238,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req); ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
zeroedRequest.setNumContainers(0); zeroedRequest.setNumContainers(0);
// to be sent to RM on next heartbeat // to be sent to RM on next heartbeat
ask.add(zeroedRequest); addResourceRequestToAsk(zeroedRequest);
} }
} }
// if all requests were still in ask queue // if all requests were still in ask queue
@ -320,7 +323,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
// Note this down for next interaction with ResourceManager // Note this down for next interaction with ResourceManager
ask.add(remoteRequest); addResourceRequestToAsk(remoteRequest);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("addResourceRequest:" + " applicationId=" LOG.debug("addResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority() + applicationId.getId() + " priority=" + priority.getPriority()
@ -353,7 +356,12 @@ public abstract class RMContainerRequestor extends RMCommunicator {
+ remoteRequest.getNumContainers() + " #asks=" + ask.size()); + remoteRequest.getNumContainers() + " #asks=" + ask.size());
} }
if(remoteRequest.getNumContainers() > 0) {
// based on blacklisting comments above we can end up decrementing more
// than requested. so guard for that.
remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1); remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
}
if (remoteRequest.getNumContainers() == 0) { if (remoteRequest.getNumContainers() == 0) {
reqMap.remove(capability); reqMap.remove(capability);
if (reqMap.size() == 0) { if (reqMap.size() == 0) {
@ -362,13 +370,12 @@ public abstract class RMContainerRequestor extends RMCommunicator {
if (remoteRequests.size() == 0) { if (remoteRequests.size() == 0) {
remoteRequestsTable.remove(priority); remoteRequestsTable.remove(priority);
} }
//remove from ask if it may have
ask.remove(remoteRequest);
} else {
ask.add(remoteRequest);//this will override the request if ask doesn't
//already have it.
} }
// send the updated resource request to RM
// send 0 container count requests also to cancel previous requests
addResourceRequestToAsk(remoteRequest);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.info("AFTER decResourceRequest:" + " applicationId=" LOG.info("AFTER decResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority() + applicationId.getId() + " priority=" + priority.getPriority()
@ -377,6 +384,16 @@ public abstract class RMContainerRequestor extends RMCommunicator {
} }
} }
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// because objects inside the resource map can be deleted ask can end up
// containing an object that matches new resource object but with different
// numContainers. So exisintg values must be replaced explicitly
if(ask.contains(remoteRequest)) {
ask.remove(remoteRequest);
}
ask.add(remoteRequest);
}
protected void release(ContainerId containerId) { protected void release(ContainerId containerId) {
release.add(containerId); release.add(containerId);
} }

View File

@ -167,6 +167,7 @@ public class TestRMContainerAllocator {
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
dispatcher.await(); dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size());
// send another request with different resource and priority // send another request with different resource and priority
ContainerRequestEvent event3 = createReq(jobId, 3, 1024, ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
@ -178,6 +179,7 @@ public class TestRMContainerAllocator {
assigned = allocator.schedule(); assigned = allocator.schedule();
dispatcher.await(); dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size());
// update resources in scheduler // update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager1.nodeHeartbeat(true); // Node heartbeat
@ -187,8 +189,14 @@ public class TestRMContainerAllocator {
assigned = allocator.schedule(); assigned = allocator.schedule();
dispatcher.await(); dispatcher.await();
Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size());
checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
assigned, false); assigned, false);
// check that the assigned container requests are cancelled
assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());
} }
@Test @Test
@ -446,6 +454,10 @@ public class TestRMContainerAllocator {
protected ResourceScheduler createScheduler() { protected ResourceScheduler createScheduler() {
return new MyFifoScheduler(this.getRMContext()); return new MyFifoScheduler(this.getRMContext());
} }
MyFifoScheduler getMyFifoScheduler() {
return (MyFifoScheduler) scheduler;
}
} }
@Test @Test
@ -1195,6 +1207,8 @@ public class TestRMContainerAllocator {
} }
} }
List<ResourceRequest> lastAsk = null;
// override this to copy the objects otherwise FifoScheduler updates the // override this to copy the objects otherwise FifoScheduler updates the
// numContainers in same objects as kept by RMContainerAllocator // numContainers in same objects as kept by RMContainerAllocator
@Override @Override
@ -1208,6 +1222,7 @@ public class TestRMContainerAllocator {
.getNumContainers()); .getNumContainers());
askCopy.add(reqCopy); askCopy.add(reqCopy);
} }
lastAsk = ask;
return super.allocate(applicationAttemptId, askCopy, release); return super.allocate(applicationAttemptId, askCopy, release);
} }
} }