diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 403ba142b82..0e05a286a8e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -220,6 +220,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3443. JobClient and Job should function in the context of the UGI which created them. (Mahadev Konar via sseth) + MAPREDUCE-3460. MR AM can hang if containers are allocated on a node + blacklisted by the AM. (Hitesh Shah and Robert Joseph Evans via sseth) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index e8588e5cd0e..81a5a75b503 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -87,7 +87,7 @@ public class RMContainerAllocator extends RMContainerRequestor } /* - Vocabulory Used: + Vocabulary Used: pending -> requests which are NOT yet sent to RM scheduled -> requests which are sent to RM but not yet assigned assigned -> requests which are assigned to a container @@ -565,6 +565,7 @@ public class RMContainerAllocator extends RMContainerRequestor if (event.getEarlierAttemptFailed()) { earlierFailedMaps.add(event.getAttemptID()); request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP); + LOG.info("Added "+event.getAttemptID()+" to list of failed maps"); } else { for (String host : event.getHosts()) { LinkedList list = mapsHostMapping.get(host); @@ -603,7 +604,9 @@ public class RMContainerAllocator extends RMContainerRequestor containersAllocated += allocatedContainers.size(); while (it.hasNext()) { Container allocated = it.next(); - LOG.info("Assigning container " + allocated); + LOG.info("Assigning container " + allocated.getId() + + " with priority " + allocated.getPriority() + + " to NM " + allocated.getNodeId()); // check if allocated container meets memory requirements // and whether we have any scheduled tasks that need @@ -645,7 +648,8 @@ public class RMContainerAllocator extends RMContainerRequestor // we need to request for a new container // and release the current one LOG.info("Got allocated container on a blacklisted " - + " host. Releasing container " + allocated); + + " host "+allocated.getNodeId().getHost() + +". Releasing container " + allocated); // find the request matching this allocated container // and replace it with a new one @@ -727,10 +731,20 @@ public class RMContainerAllocator extends RMContainerRequestor } private ContainerRequest getContainerReqToReplace(Container allocated) { + LOG.info("Finding containerReq for allocated container: " + allocated); Priority priority = allocated.getPriority(); ContainerRequest toBeReplaced = null; - if (PRIORITY_FAST_FAIL_MAP.equals(priority) - || PRIORITY_MAP.equals(priority)) { + if (PRIORITY_FAST_FAIL_MAP.equals(priority)) { + LOG.info("Replacing FAST_FAIL_MAP container " + allocated.getId()); + Iterator iter = earlierFailedMaps.iterator(); + while (toBeReplaced == null && iter.hasNext()) { + toBeReplaced = maps.get(iter.next()); + } + LOG.info("Found replacement: " + toBeReplaced); + return toBeReplaced; + } + else if (PRIORITY_MAP.equals(priority)) { + LOG.info("Replacing MAP container " + allocated.getId()); // allocated container was for a map String host = allocated.getNodeId().getHost(); LinkedList list = mapsHostMapping.get(host); @@ -749,6 +763,7 @@ public class RMContainerAllocator extends RMContainerRequestor TaskAttemptId tId = reduces.keySet().iterator().next(); toBeReplaced = reduces.remove(tId); } + LOG.info("Found replacement: " + toBeReplaced); return toBeReplaced; } @@ -758,7 +773,7 @@ public class RMContainerAllocator extends RMContainerRequestor //try to assign to earlierFailedMaps if present ContainerRequest assigned = null; while (assigned == null && earlierFailedMaps.size() > 0) { - TaskAttemptId tId = earlierFailedMaps.removeFirst(); + TaskAttemptId tId = earlierFailedMaps.removeFirst(); if (maps.containsKey(tId)) { assigned = maps.remove(tId); JobCounterUpdateEvent jce = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index ba3c73219dd..6c03c6690cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -105,6 +105,13 @@ public abstract class RMContainerRequestor extends RMCommunicator { this.priority = priority; } + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("AttemptId[").append(attemptID).append("]"); + sb.append("Capability[").append(capability).append("]"); + sb.append("Priority[").append(priority).append("]"); + return sb.toString(); + } } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 9dd877b3301..812393c1b58 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -580,6 +580,135 @@ public class TestRMContainerAllocator { } } + @Test + public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { + LOG.info("Running testBlackListedNodesWithSchedulingToThatNode"); + + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); + conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); + + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null)); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); + MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); + dispatcher.await(); + + LOG.info("Requesting 1 Containers _1 on H1"); + // create the container request + ContainerRequestEvent event1 = createReq(jobId, 1, 1024, + new String[] { "h1" }); + allocator.sendRequest(event1); + + LOG.info("RM Heartbeat (to send the container requests)"); + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + LOG.info("h1 Heartbeat (To actually schedule the containers)"); + // update resources in scheduler + nodeManager1.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + LOG.info("RM Heartbeat (To process the scheduled containers)"); + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + + LOG.info("Failing container _1 on H1 (should blacklist the node)"); + // Send events to blacklist nodes h1 and h2 + ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); + allocator.sendFailure(f1); + + //At this stage, a request should be created for a fast fail map + //Create a FAST_FAIL request for a previously failed map. + ContainerRequestEvent event1f = createReq(jobId, 1, 1024, + new String[] { "h1" }, true, false); + allocator.sendRequest(event1f); + + //Update the Scheduler with the new requests. + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // send another request with different resource and priority + ContainerRequestEvent event3 = createReq(jobId, 3, 1024, + new String[] { "h1", "h3" }); + allocator.sendRequest(event3); + + //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container. + //RM is only aware of the prio:5 container + + LOG.info("h1 Heartbeat (To actually schedule the containers)"); + // update resources in scheduler + nodeManager1.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + LOG.info("RM Heartbeat (To process the scheduled containers)"); + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + //RMContainerAllocator gets assigned a p:5 on a blacklisted node. + + //Send a release for the p:5 container + another request. + LOG.info("RM Heartbeat (To process the re-scheduled containers)"); + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + //Hearbeat from H3 to schedule on this host. + LOG.info("h3 Heartbeat (To re-schedule the containers)"); + nodeManager3.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)"); + assigned = allocator.schedule(); + dispatcher.await(); + + // For debugging + for (TaskAttemptContainerAssignedEvent assig : assigned) { + LOG.info(assig.getTaskAttemptID() + + " assgined to " + assig.getContainer().getId() + + " with priority " + assig.getContainer().getPriority()); + } + + Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); + + // validate that all containers are assigned to h3 + for (TaskAttemptContainerAssignedEvent assig : assigned) { + Assert.assertEquals("Assigned container " + assig.getContainer().getId() + + " host not correct", "h3", assig.getContainer().getNodeId().getHost()); + } + } + private static class MyFifoScheduler extends FifoScheduler { public MyFifoScheduler(RMContext rmContext) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java index 39b15e0cefd..ef655100f00 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java @@ -340,6 +340,21 @@ public class ContainerPBImpl extends ProtoBase implements Contai return ((ContainerStatusPBImpl)t).getProto(); } + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Container: ["); + sb.append("ContainerId: ").append(getId()).append(", "); + sb.append("NodeId: ").append(getNodeId()).append(", "); + sb.append("NodeHttpAddress: ").append(getNodeHttpAddress()).append(", "); + sb.append("Resource: ").append(getResource()).append(", "); + sb.append("Priority: ").append(getPriority()).append(", "); + sb.append("State: ").append(getState()).append(", "); + sb.append("Token: ").append(getContainerToken()).append(", "); + sb.append("Status: ").append(getContainerStatus()); + sb.append("]"); + return sb.toString(); + } + //TODO Comparator @Override public int compareTo(Container other) {