MAPREDUCE-3460. MR AM can hang if containers are allocated on a node blacklisted by the AM. (Contributed by Hitesh Shah and Robert Joseph Evans)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1209737 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2011-12-02 22:18:38 +00:00
parent c5777e0375
commit a3f37e15f7
5 changed files with 175 additions and 6 deletions

View File

@ -220,6 +220,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3443. JobClient and Job should function in the context of the
UGI which created them. (Mahadev Konar via sseth)
MAPREDUCE-3460. MR AM can hang if containers are allocated on a node
blacklisted by the AM. (Hitesh Shah and Robert Joseph Evans via sseth)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -87,7 +87,7 @@ public class RMContainerAllocator extends RMContainerRequestor
}
/*
Vocabulory Used:
Vocabulary Used:
pending -> requests which are NOT yet sent to RM
scheduled -> requests which are sent to RM but not yet assigned
assigned -> requests which are assigned to a container
@ -565,6 +565,7 @@ public class RMContainerAllocator extends RMContainerRequestor
if (event.getEarlierAttemptFailed()) {
earlierFailedMaps.add(event.getAttemptID());
request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
} else {
for (String host : event.getHosts()) {
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
@ -603,7 +604,9 @@ public class RMContainerAllocator extends RMContainerRequestor
containersAllocated += allocatedContainers.size();
while (it.hasNext()) {
Container allocated = it.next();
LOG.info("Assigning container " + allocated);
LOG.info("Assigning container " + allocated.getId() +
" with priority " + allocated.getPriority() +
" to NM " + allocated.getNodeId());
// check if allocated container meets memory requirements
// and whether we have any scheduled tasks that need
@ -645,7 +648,8 @@ public class RMContainerAllocator extends RMContainerRequestor
// we need to request for a new container
// and release the current one
LOG.info("Got allocated container on a blacklisted "
+ " host. Releasing container " + allocated);
+ " host "+allocated.getNodeId().getHost()
+". Releasing container " + allocated);
// find the request matching this allocated container
// and replace it with a new one
@ -727,10 +731,20 @@ public class RMContainerAllocator extends RMContainerRequestor
}
private ContainerRequest getContainerReqToReplace(Container allocated) {
LOG.info("Finding containerReq for allocated container: " + allocated);
Priority priority = allocated.getPriority();
ContainerRequest toBeReplaced = null;
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) {
if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
LOG.info("Replacing FAST_FAIL_MAP container " + allocated.getId());
Iterator<TaskAttemptId> iter = earlierFailedMaps.iterator();
while (toBeReplaced == null && iter.hasNext()) {
toBeReplaced = maps.get(iter.next());
}
LOG.info("Found replacement: " + toBeReplaced);
return toBeReplaced;
}
else if (PRIORITY_MAP.equals(priority)) {
LOG.info("Replacing MAP container " + allocated.getId());
// allocated container was for a map
String host = allocated.getNodeId().getHost();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
@ -749,6 +763,7 @@ public class RMContainerAllocator extends RMContainerRequestor
TaskAttemptId tId = reduces.keySet().iterator().next();
toBeReplaced = reduces.remove(tId);
}
LOG.info("Found replacement: " + toBeReplaced);
return toBeReplaced;
}
@ -758,7 +773,7 @@ public class RMContainerAllocator extends RMContainerRequestor
//try to assign to earlierFailedMaps if present
ContainerRequest assigned = null;
while (assigned == null && earlierFailedMaps.size() > 0) {
TaskAttemptId tId = earlierFailedMaps.removeFirst();
TaskAttemptId tId = earlierFailedMaps.removeFirst();
if (maps.containsKey(tId)) {
assigned = maps.remove(tId);
JobCounterUpdateEvent jce =

View File

@ -105,6 +105,13 @@ public abstract class RMContainerRequestor extends RMCommunicator {
this.priority = priority;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("AttemptId[").append(attemptID).append("]");
sb.append("Capability[").append(capability).append("]");
sb.append("Priority[").append(priority).append("]");
return sb.toString();
}
}
@Override

View File

@ -580,6 +580,135 @@ public class TestRMContainerAllocator {
}
}
@Test
public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
// add resources to scheduler
MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
dispatcher.await();
LOG.info("Requesting 1 Containers _1 on H1");
// create the container request
ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
new String[] { "h1" });
allocator.sendRequest(event1);
LOG.info("RM Heartbeat (to send the container requests)");
// this tells the scheduler about the requests
// as nodes are not added, no allocations
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
LOG.info("h1 Heartbeat (To actually schedule the containers)");
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
dispatcher.await();
LOG.info("RM Heartbeat (To process the scheduled containers)");
assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
LOG.info("Failing container _1 on H1 (should blacklist the node)");
// Send events to blacklist nodes h1 and h2
ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
allocator.sendFailure(f1);
//At this stage, a request should be created for a fast fail map
//Create a FAST_FAIL request for a previously failed map.
ContainerRequestEvent event1f = createReq(jobId, 1, 1024,
new String[] { "h1" }, true, false);
allocator.sendRequest(event1f);
//Update the Scheduler with the new requests.
assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// send another request with different resource and priority
ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
new String[] { "h1", "h3" });
allocator.sendRequest(event3);
//Allocator is aware of prio:5 container, and prio:20 (h1+h3) container.
//RM is only aware of the prio:5 container
LOG.info("h1 Heartbeat (To actually schedule the containers)");
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
dispatcher.await();
LOG.info("RM Heartbeat (To process the scheduled containers)");
assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
//RMContainerAllocator gets assigned a p:5 on a blacklisted node.
//Send a release for the p:5 container + another request.
LOG.info("RM Heartbeat (To process the re-scheduled containers)");
assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
//Hearbeat from H3 to schedule on this host.
LOG.info("h3 Heartbeat (To re-schedule the containers)");
nodeManager3.nodeHeartbeat(true); // Node heartbeat
dispatcher.await();
LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
assigned = allocator.schedule();
dispatcher.await();
// For debugging
for (TaskAttemptContainerAssignedEvent assig : assigned) {
LOG.info(assig.getTaskAttemptID() +
" assgined to " + assig.getContainer().getId() +
" with priority " + assig.getContainer().getPriority());
}
Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
// validate that all containers are assigned to h3
for (TaskAttemptContainerAssignedEvent assig : assigned) {
Assert.assertEquals("Assigned container " + assig.getContainer().getId()
+ " host not correct", "h3", assig.getContainer().getNodeId().getHost());
}
}
private static class MyFifoScheduler extends FifoScheduler {
public MyFifoScheduler(RMContext rmContext) {

View File

@ -340,6 +340,21 @@ public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Contai
return ((ContainerStatusPBImpl)t).getProto();
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Container: [");
sb.append("ContainerId: ").append(getId()).append(", ");
sb.append("NodeId: ").append(getNodeId()).append(", ");
sb.append("NodeHttpAddress: ").append(getNodeHttpAddress()).append(", ");
sb.append("Resource: ").append(getResource()).append(", ");
sb.append("Priority: ").append(getPriority()).append(", ");
sb.append("State: ").append(getState()).append(", ");
sb.append("Token: ").append(getContainerToken()).append(", ");
sb.append("Status: ").append(getContainerStatus());
sb.append("]");
return sb.toString();
}
//TODO Comparator
@Override
public int compareTo(Container other) {