MAPREDUCE-4893. Fixed MR ApplicationMaster to do optimal assignment of containers to get maximum locality. Contributed by Bikas Saha.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1440749 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6384684299
commit
28c308d5e8
|
@ -217,6 +217,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-4893. Fixed MR ApplicationMaster to do optimal assignment of
|
||||
containers to get maximum locality. (Bikas Saha via vinodkv)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-4607. Race condition in ReduceTask completion can result in Task
|
||||
|
|
|
@ -747,7 +747,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
addContainerReq(req);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
// this method will change the list of allocatedContainers.
|
||||
private void assign(List<Container> allocatedContainers) {
|
||||
Iterator<Container> it = allocatedContainers.iterator();
|
||||
LOG.info("Got allocated containers " + allocatedContainers.size());
|
||||
|
@ -788,84 +788,97 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
+ reduces.isEmpty());
|
||||
isAssignable = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Container allocated at unwanted priority: " + priority +
|
||||
". Returning to RM...");
|
||||
isAssignable = false;
|
||||
}
|
||||
|
||||
boolean blackListed = false;
|
||||
ContainerRequest assigned = null;
|
||||
if(!isAssignable) {
|
||||
// release container if we could not assign it
|
||||
containerNotAssigned(allocated);
|
||||
it.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (isAssignable) {
|
||||
// do not assign if allocated container is on a
|
||||
// blacklisted host
|
||||
String allocatedHost = allocated.getNodeId().getHost();
|
||||
blackListed = isNodeBlacklisted(allocatedHost);
|
||||
if (blackListed) {
|
||||
// we need to request for a new container
|
||||
// and release the current one
|
||||
LOG.info("Got allocated container on a blacklisted "
|
||||
+ " host "+allocatedHost
|
||||
+". Releasing container " + allocated);
|
||||
// do not assign if allocated container is on a
|
||||
// blacklisted host
|
||||
String allocatedHost = allocated.getNodeId().getHost();
|
||||
if (isNodeBlacklisted(allocatedHost)) {
|
||||
// we need to request for a new container
|
||||
// and release the current one
|
||||
LOG.info("Got allocated container on a blacklisted "
|
||||
+ " host "+allocatedHost
|
||||
+". Releasing container " + allocated);
|
||||
|
||||
// find the request matching this allocated container
|
||||
// and replace it with a new one
|
||||
ContainerRequest toBeReplacedReq =
|
||||
getContainerReqToReplace(allocated);
|
||||
if (toBeReplacedReq != null) {
|
||||
LOG.info("Placing a new container request for task attempt "
|
||||
+ toBeReplacedReq.attemptID);
|
||||
ContainerRequest newReq =
|
||||
getFilteredContainerRequest(toBeReplacedReq);
|
||||
decContainerReq(toBeReplacedReq);
|
||||
if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
|
||||
TaskType.MAP) {
|
||||
maps.put(newReq.attemptID, newReq);
|
||||
}
|
||||
else {
|
||||
reduces.put(newReq.attemptID, newReq);
|
||||
}
|
||||
addContainerReq(newReq);
|
||||
// find the request matching this allocated container
|
||||
// and replace it with a new one
|
||||
ContainerRequest toBeReplacedReq =
|
||||
getContainerReqToReplace(allocated);
|
||||
if (toBeReplacedReq != null) {
|
||||
LOG.info("Placing a new container request for task attempt "
|
||||
+ toBeReplacedReq.attemptID);
|
||||
ContainerRequest newReq =
|
||||
getFilteredContainerRequest(toBeReplacedReq);
|
||||
decContainerReq(toBeReplacedReq);
|
||||
if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
|
||||
TaskType.MAP) {
|
||||
maps.put(newReq.attemptID, newReq);
|
||||
}
|
||||
else {
|
||||
LOG.info("Could not map allocated container to a valid request."
|
||||
+ " Releasing allocated container " + allocated);
|
||||
reduces.put(newReq.attemptID, newReq);
|
||||
}
|
||||
addContainerReq(newReq);
|
||||
}
|
||||
else {
|
||||
assigned = assign(allocated);
|
||||
if (assigned != null) {
|
||||
// Update resource requests
|
||||
decContainerReq(assigned);
|
||||
|
||||
// send the container-assigned event to task attempt
|
||||
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
||||
assigned.attemptID, allocated, applicationACLs));
|
||||
|
||||
assignedRequests.add(allocated, assigned.attemptID);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("Assigned container (" + allocated + ") "
|
||||
+ " to task " + assigned.attemptID + " on node "
|
||||
+ allocated.getNodeId().toString());
|
||||
}
|
||||
}
|
||||
else {
|
||||
//not assigned to any request, release the container
|
||||
LOG.info("Releasing unassigned and invalid container "
|
||||
+ allocated + ". RM has gone crazy, someone go look!"
|
||||
+ " Hey RM, if you are so rich, go donate to non-profits!");
|
||||
}
|
||||
LOG.info("Could not map allocated container to a valid request."
|
||||
+ " Releasing allocated container " + allocated);
|
||||
}
|
||||
|
||||
// release container if we could not assign it
|
||||
containerNotAssigned(allocated);
|
||||
it.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
// release container if it was blacklisted
|
||||
// or if we could not assign it
|
||||
if (blackListed || assigned == null) {
|
||||
containersReleased++;
|
||||
release(allocated.getId());
|
||||
}
|
||||
}
|
||||
|
||||
assignContainers(allocatedContainers);
|
||||
|
||||
// release container if we could not assign it
|
||||
it = allocatedContainers.iterator();
|
||||
while (it.hasNext()) {
|
||||
Container allocated = it.next();
|
||||
LOG.info("Releasing unassigned and invalid container "
|
||||
+ allocated + ". RM may have assignment issues");
|
||||
containerNotAssigned(allocated);
|
||||
}
|
||||
}
|
||||
|
||||
private ContainerRequest assign(Container allocated) {
|
||||
@SuppressWarnings("unchecked")
|
||||
private void containerAssigned(Container allocated,
|
||||
ContainerRequest assigned) {
|
||||
// Update resource requests
|
||||
decContainerReq(assigned);
|
||||
|
||||
// send the container-assigned event to task attempt
|
||||
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
||||
assigned.attemptID, allocated, applicationACLs));
|
||||
|
||||
assignedRequests.add(allocated, assigned.attemptID);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("Assigned container (" + allocated + ") "
|
||||
+ " to task " + assigned.attemptID + " on node "
|
||||
+ allocated.getNodeId().toString());
|
||||
}
|
||||
}
|
||||
|
||||
private void containerNotAssigned(Container allocated) {
|
||||
containersReleased++;
|
||||
release(allocated.getId());
|
||||
}
|
||||
|
||||
private ContainerRequest assignWithoutLocality(Container allocated) {
|
||||
ContainerRequest assigned = null;
|
||||
|
||||
Priority priority = allocated.getPriority();
|
||||
|
@ -877,18 +890,24 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
LOG.debug("Assigning container " + allocated + " to reduce");
|
||||
}
|
||||
assigned = assignToReduce(allocated);
|
||||
} else if (PRIORITY_MAP.equals(priority)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Assigning container " + allocated + " to map");
|
||||
}
|
||||
assigned = assignToMap(allocated);
|
||||
} else {
|
||||
LOG.warn("Container allocated at unwanted priority: " + priority +
|
||||
". Returning to RM...");
|
||||
}
|
||||
|
||||
return assigned;
|
||||
}
|
||||
|
||||
private void assignContainers(List<Container> allocatedContainers) {
|
||||
Iterator<Container> it = allocatedContainers.iterator();
|
||||
while (it.hasNext()) {
|
||||
Container allocated = it.next();
|
||||
ContainerRequest assigned = assignWithoutLocality(allocated);
|
||||
if (assigned != null) {
|
||||
containerAssigned(allocated, assigned);
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
|
||||
assignMapsWithLocality(allocatedContainers);
|
||||
}
|
||||
|
||||
private ContainerRequest getContainerReqToReplace(Container allocated) {
|
||||
LOG.info("Finding containerReq for allocated container: " + allocated);
|
||||
|
@ -959,11 +978,15 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private ContainerRequest assignToMap(Container allocated) {
|
||||
//try to assign to maps if present
|
||||
//first by host, then by rack, followed by *
|
||||
ContainerRequest assigned = null;
|
||||
while (assigned == null && maps.size() > 0) {
|
||||
private void assignMapsWithLocality(List<Container> allocatedContainers) {
|
||||
// try to assign to all nodes first to match node local
|
||||
Iterator<Container> it = allocatedContainers.iterator();
|
||||
while(it.hasNext() && maps.size() > 0){
|
||||
Container allocated = it.next();
|
||||
Priority priority = allocated.getPriority();
|
||||
assert PRIORITY_MAP.equals(priority);
|
||||
// "if (maps.containsKey(tId))" below should be almost always true.
|
||||
// hence this while loop would almost always have O(1) complexity
|
||||
String host = allocated.getNodeId().getHost();
|
||||
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
|
||||
while (list != null && list.size() > 0) {
|
||||
|
@ -972,7 +995,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
}
|
||||
TaskAttemptId tId = list.removeFirst();
|
||||
if (maps.containsKey(tId)) {
|
||||
assigned = maps.remove(tId);
|
||||
ContainerRequest assigned = maps.remove(tId);
|
||||
containerAssigned(allocated, assigned);
|
||||
it.remove();
|
||||
JobCounterUpdateEvent jce =
|
||||
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
|
||||
jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
|
||||
|
@ -984,39 +1009,56 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
break;
|
||||
}
|
||||
}
|
||||
if (assigned == null) {
|
||||
String rack = RackResolver.resolve(host).getNetworkLocation();
|
||||
list = mapsRackMapping.get(rack);
|
||||
while (list != null && list.size() > 0) {
|
||||
TaskAttemptId tId = list.removeFirst();
|
||||
if (maps.containsKey(tId)) {
|
||||
assigned = maps.remove(tId);
|
||||
JobCounterUpdateEvent jce =
|
||||
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
|
||||
jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
|
||||
eventHandler.handle(jce);
|
||||
rackLocalAssigned++;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Assigned based on rack match " + rack);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (assigned == null && maps.size() > 0) {
|
||||
TaskAttemptId tId = maps.keySet().iterator().next();
|
||||
assigned = maps.remove(tId);
|
||||
}
|
||||
|
||||
// try to match all rack local
|
||||
it = allocatedContainers.iterator();
|
||||
while(it.hasNext() && maps.size() > 0){
|
||||
Container allocated = it.next();
|
||||
Priority priority = allocated.getPriority();
|
||||
assert PRIORITY_MAP.equals(priority);
|
||||
// "if (maps.containsKey(tId))" below should be almost always true.
|
||||
// hence this while loop would almost always have O(1) complexity
|
||||
String host = allocated.getNodeId().getHost();
|
||||
String rack = RackResolver.resolve(host).getNetworkLocation();
|
||||
LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
|
||||
while (list != null && list.size() > 0) {
|
||||
TaskAttemptId tId = list.removeFirst();
|
||||
if (maps.containsKey(tId)) {
|
||||
ContainerRequest assigned = maps.remove(tId);
|
||||
containerAssigned(allocated, assigned);
|
||||
it.remove();
|
||||
JobCounterUpdateEvent jce =
|
||||
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
|
||||
jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
|
||||
jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
|
||||
eventHandler.handle(jce);
|
||||
rackLocalAssigned++;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Assigned based on * match");
|
||||
LOG.debug("Assigned based on rack match " + rack);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return assigned;
|
||||
|
||||
// assign remaining
|
||||
it = allocatedContainers.iterator();
|
||||
while(it.hasNext() && maps.size() > 0){
|
||||
Container allocated = it.next();
|
||||
Priority priority = allocated.getPriority();
|
||||
assert PRIORITY_MAP.equals(priority);
|
||||
TaskAttemptId tId = maps.keySet().iterator().next();
|
||||
ContainerRequest assigned = maps.remove(tId);
|
||||
containerAssigned(allocated, assigned);
|
||||
it.remove();
|
||||
JobCounterUpdateEvent jce =
|
||||
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
|
||||
jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
|
||||
eventHandler.handle(jce);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Assigned based on * match");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -190,6 +190,92 @@ public class TestRMContainerAllocator {
|
|||
checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
|
||||
assigned, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapNodeLocality() throws Exception {
|
||||
// test checks that ordering of allocated containers list from the RM does
|
||||
// not affect the map->container assignment done by the AM. If there is a
|
||||
// node local container available for a map then it should be assigned to
|
||||
// that container and not a rack-local container that happened to be seen
|
||||
// earlier in the allocated containers list from the RM.
|
||||
// Regression test for MAPREDUCE-4893
|
||||
LOG.info("Running testMapNodeLocality");
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
MyResourceManager rm = new MyResourceManager(conf);
|
||||
rm.start();
|
||||
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
|
||||
.getDispatcher();
|
||||
|
||||
// Submit the application
|
||||
RMApp app = rm.submitApp(1024);
|
||||
dispatcher.await();
|
||||
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
||||
.getAppAttemptId();
|
||||
rm.sendAMLaunched(appAttemptId);
|
||||
dispatcher.await();
|
||||
|
||||
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
||||
Job mockJob = mock(Job.class);
|
||||
when(mockJob.getReport()).thenReturn(
|
||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||
appAttemptId, mockJob);
|
||||
|
||||
// add resources to scheduler
|
||||
MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps
|
||||
rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node
|
||||
MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map
|
||||
dispatcher.await();
|
||||
|
||||
// create the container requests for maps
|
||||
ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
|
||||
new String[] { "h1" });
|
||||
allocator.sendRequest(event1);
|
||||
ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
|
||||
new String[] { "h1" });
|
||||
allocator.sendRequest(event2);
|
||||
ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
|
||||
new String[] { "h2" });
|
||||
allocator.sendRequest(event3);
|
||||
|
||||
// this tells the scheduler about the requests
|
||||
// as nodes are not added, no allocations
|
||||
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
|
||||
dispatcher.await();
|
||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||
|
||||
// update resources in scheduler
|
||||
// Node heartbeat from rack-local first. This makes node h3 the first in the
|
||||
// list of allocated containers but it should not be assigned to task1.
|
||||
nodeManager3.nodeHeartbeat(true);
|
||||
// Node heartbeat from node-local next. This allocates 2 node local
|
||||
// containers for task1 and task2. These should be matched with those tasks.
|
||||
nodeManager1.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
assigned = allocator.schedule();
|
||||
dispatcher.await();
|
||||
checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
|
||||
assigned, false);
|
||||
// remove the rack-local assignment that should have happened for task3
|
||||
for(TaskAttemptContainerAssignedEvent event : assigned) {
|
||||
if(event.getTaskAttemptID().equals(event3.getAttemptID())) {
|
||||
assigned.remove(event);
|
||||
Assert.assertTrue(
|
||||
event.getContainer().getNodeId().getHost().equals("h3"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
checkAssignments(new ContainerRequestEvent[] { event1, event2},
|
||||
assigned, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResource() throws Exception {
|
||||
|
@ -1202,7 +1288,7 @@ public class TestRMContainerAllocator {
|
|||
if (checkHostMatch) {
|
||||
Assert.assertTrue("Not assigned to requested host", Arrays.asList(
|
||||
request.getHosts()).contains(
|
||||
assigned.getContainer().getNodeId().toString()));
|
||||
assigned.getContainer().getNodeId().getHost()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue