diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 33d0d8ca6a4..b5290243fa6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1641,6 +1641,8 @@ Release 0.23.0 - Unreleased MAPREDUCE-2788. Normalize resource requests in FifoScheduler appropriately. (Ahmed Radwan via acmurthy) + MAPREDUCE-2693. Fix NPE in job-blacklisting. (Hitesh Shah via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 7b75cd1fbd7..fc45b8f11b2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -509,18 +509,6 @@ public class RMContainerAllocator extends RMContainerRequestor request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP); } else { for (String host : event.getHosts()) { - //host comes from data splitLocations which are hostnames. Containers - // use IP addresses. - //TODO Temporary fix for locality. Use resolvers from h-common. - // Cache to make this more efficient ? - InetAddress addr = null; - try { - addr = InetAddress.getByName(host); - } catch (UnknownHostException e) { - LOG.warn("Unable to resolve host to IP for host [: " + host + "]"); - } - if (addr != null) //Fallback to host if resolve fails. - host = addr.getHostAddress(); LinkedList list = mapsHostMapping.get(host); if (list == null) { list = new LinkedList(); @@ -557,26 +545,101 @@ public class RMContainerAllocator extends RMContainerRequestor while (it.hasNext()) { Container allocated = it.next(); LOG.info("Assigning container " + allocated); - ContainerRequest assigned = assign(allocated); - - if (assigned != null) { - // Update resource requests - decContainerReq(assigned); + + // check if allocated container meets memory requirements + // and whether we have any scheduled tasks that need + // a container to be assigned + boolean isAssignable = true; + Priority priority = allocated.getPriority(); + if (PRIORITY_FAST_FAIL_MAP.equals(priority) + || PRIORITY_MAP.equals(priority)) { + if (allocated.getResource().getMemory() < mapResourceReqt + || maps.isEmpty()) { + LOG.info("Cannot assign container " + allocated + + " for a map as either " + + " container memory less than required " + mapResourceReqt + + " or no pending map tasks - maps.isEmpty=" + + maps.isEmpty()); + isAssignable = false; + } + } + else if (PRIORITY_REDUCE.equals(priority)) { + if (allocated.getResource().getMemory() < reduceResourceReqt + || reduces.isEmpty()) { + LOG.info("Cannot assign container " + allocated + + " for a reduce as either " + + " container memory less than required " + reduceResourceReqt + + " or no pending reduce tasks - reduces.isEmpty=" + + reduces.isEmpty()); + isAssignable = false; + } + } + + boolean blackListed = false; + ContainerRequest assigned = null; + + if (isAssignable) { + // do not assign if allocated container is on a + // blacklisted host + blackListed = isNodeBlacklisted(allocated.getNodeId().getHost()); + if (blackListed) { + // we need to request for a new container + // and release the current one + LOG.info("Got allocated container on a blacklisted " + + " host. Releasing container " + allocated); - // send the container-assigned event to task attempt - eventHandler.handle(new TaskAttemptContainerAssignedEvent( - assigned.attemptID, allocated)); + // find the request matching this allocated container + // and replace it with a new one + ContainerRequest toBeReplacedReq = + getContainerReqToReplace(allocated); + if (toBeReplacedReq != null) { + LOG.info("Placing a new container request for task attempt " + + toBeReplacedReq.attemptID); + ContainerRequest newReq = + getFilteredContainerRequest(toBeReplacedReq); + decContainerReq(toBeReplacedReq); + if (toBeReplacedReq.attemptID.getTaskId().getTaskType() == + TaskType.MAP) { + maps.put(newReq.attemptID, newReq); + } + else { + reduces.put(newReq.attemptID, newReq); + } + addContainerReq(newReq); + } + else { + LOG.info("Could not map allocated container to a valid request." + + " Releasing allocated container " + allocated); + } + } + else { + assigned = assign(allocated); + if (assigned != null) { + // Update resource requests + decContainerReq(assigned); - assignedRequests.add(allocated.getId(), assigned.attemptID); - - LOG.info("Assigned container (" + allocated + ") " + - " to task " + assigned.attemptID + - " on node " + allocated.getNodeId().toString()); - } else { - //not assigned to any request, release the container - LOG.info("Releasing unassigned and invalid container " + allocated - + ". RM has gone crazy, someone go look!" - + " Hey RM, if you are so rich, go donate to non-profits!"); + // send the container-assigned event to task attempt + eventHandler.handle(new TaskAttemptContainerAssignedEvent( + assigned.attemptID, allocated)); + + assignedRequests.add(allocated.getId(), assigned.attemptID); + + LOG.info("Assigned container (" + allocated + ") " + + " to task " + assigned.attemptID + + " on node " + allocated.getNodeId().toString()); + } + else { + //not assigned to any request, release the container + LOG.info("Releasing unassigned and invalid container " + + allocated + ". RM has gone crazy, someone go look!" + + " Hey RM, if you are so rich, go donate to non-profits!"); + } + } + } + + // release container if it was blacklisted + // or if we could not assign it + if (blackListed || assigned == null) { containersReleased++; release(allocated.getId()); } @@ -604,12 +667,37 @@ public class RMContainerAllocator extends RMContainerRequestor return assigned; } + private ContainerRequest getContainerReqToReplace(Container allocated) { + Priority priority = allocated.getPriority(); + ContainerRequest toBeReplaced = null; + if (PRIORITY_FAST_FAIL_MAP.equals(priority) + || PRIORITY_MAP.equals(priority)) { + // allocated container was for a map + String host = allocated.getNodeId().getHost(); + LinkedList list = mapsHostMapping.get(host); + if (list != null && list.size() > 0) { + TaskAttemptId tId = list.removeLast(); + if (maps.containsKey(tId)) { + toBeReplaced = maps.remove(tId); + } + } + else { + TaskAttemptId tId = maps.keySet().iterator().next(); + toBeReplaced = maps.remove(tId); + } + } + else if (PRIORITY_REDUCE.equals(priority)) { + TaskAttemptId tId = reduces.keySet().iterator().next(); + toBeReplaced = reduces.remove(tId); + } + return toBeReplaced; + } + private ContainerRequest assignToFailedMap(Container allocated) { //try to assign to earlierFailedMaps if present ContainerRequest assigned = null; - while (assigned == null && earlierFailedMaps.size() > 0 && - allocated.getResource().getMemory() >= mapResourceReqt) { + while (assigned == null && earlierFailedMaps.size() > 0) { TaskAttemptId tId = earlierFailedMaps.removeFirst(); if (maps.containsKey(tId)) { assigned = maps.remove(tId); @@ -627,8 +715,7 @@ public class RMContainerAllocator extends RMContainerRequestor private ContainerRequest assignToReduce(Container allocated) { ContainerRequest assigned = null; //try to assign to reduces if present - if (assigned == null && reduces.size() > 0 - && allocated.getResource().getMemory() >= reduceResourceReqt) { + if (assigned == null && reduces.size() > 0) { TaskAttemptId tId = reduces.keySet().iterator().next(); assigned = reduces.remove(tId); LOG.info("Assigned to reduce"); @@ -640,9 +727,8 @@ public class RMContainerAllocator extends RMContainerRequestor //try to assign to maps if present //first by host, then by rack, followed by * ContainerRequest assigned = null; - while (assigned == null && maps.size() > 0 - && allocated.getResource().getMemory() >= mapResourceReqt) { - String host = getHost(allocated.getNodeId().toString()); + while (assigned == null && maps.size() > 0) { + String host = allocated.getNodeId().getHost(); LinkedList list = mapsHostMapping.get(host); while (list != null && list.size() > 0) { LOG.info("Host matched to the request list " + host); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index cda2ed678af..cfedde2229a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -63,7 +65,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { //Key->ResourceName (e.g., hostname, rackname, *) //Value->Map //Key->Resource Capability - //Value->ResourceReqeust + //Value->ResourceRequest private final Map>> remoteRequestsTable = new TreeMap>>(); @@ -87,14 +89,22 @@ public abstract class RMContainerRequestor extends RMCommunicator { final String[] racks; //final boolean earlierAttemptFailed; final Priority priority; + public ContainerRequest(ContainerRequestEvent event, Priority priority) { - this.attemptID = event.getAttemptID(); - this.capability = event.getCapability(); - this.hosts = event.getHosts(); - this.racks = event.getRacks(); - //this.earlierAttemptFailed = event.getEarlierAttemptFailed(); + this(event.getAttemptID(), event.getCapability(), event.getHosts(), + event.getRacks(), priority); + } + + public ContainerRequest(TaskAttemptId attemptID, + Resource capability, String[] hosts, String[] racks, + Priority priority) { + this.attemptID = attemptID; + this.capability = capability; + this.hosts = hosts; + this.racks = racks; this.priority = priority; } + } @Override @@ -149,14 +159,37 @@ public abstract class RMContainerRequestor extends RMCommunicator { //remove all the requests corresponding to this hostname for (Map> remoteRequests : remoteRequestsTable.values()){ - //remove from host - Map reqMap = remoteRequests.remove(hostName); + //remove from host if no pending allocations + boolean foundAll = true; + Map reqMap = remoteRequests.get(hostName); if (reqMap != null) { for (ResourceRequest req : reqMap.values()) { - ask.remove(req); + if (!ask.remove(req)) { + foundAll = false; + } + else { + // if ask already sent to RM, we can try and overwrite it if possible. + // send a new ask to RM with numContainers + // specified for the blacklisted host to be 0. + ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req); + zeroedRequest.setNumContainers(0); + // to be sent to RM on next heartbeat + ask.add(zeroedRequest); + } } + // if all requests were still in ask queue + // we can remove this request + if (foundAll) { + remoteRequests.remove(hostName); + } } - //TODO: remove from rack + // TODO handling of rack blacklisting + // Removing from rack should be dependent on no. of failures within the rack + // Blacklisting a rack on the basis of a single node's blacklisting + // may be overly aggressive. + // Node failures could be co-related with other failures on the same rack + // but we probably need a better approach at trying to decide how and when + // to blacklist a rack } } else { nodeFailures.put(hostName, failures); @@ -171,7 +204,9 @@ public abstract class RMContainerRequestor extends RMCommunicator { // Create resource requests for (String host : req.hosts) { // Data-local - addResourceRequest(req.priority, host, req.capability); + if (!isNodeBlacklisted(host)) { + addResourceRequest(req.priority, host, req.capability); + } } // Nothing Rack-local for now @@ -234,6 +269,14 @@ public abstract class RMContainerRequestor extends RMCommunicator { Map> remoteRequests = this.remoteRequestsTable.get(priority); Map reqMap = remoteRequests.get(resourceName); + if (reqMap == null) { + // as we modify the resource requests by filtering out blacklisted hosts + // when they are added, this value may be null when being + // decremented + LOG.debug("Not decrementing resource as " + resourceName + + " is not present in request table"); + return; + } ResourceRequest remoteRequest = reqMap.get(capability); LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId() @@ -267,4 +310,23 @@ public abstract class RMContainerRequestor extends RMCommunicator { release.add(containerId); } + protected boolean isNodeBlacklisted(String hostname) { + if (!nodeBlacklistingEnabled) { + return false; + } + return blacklistedNodes.contains(hostname); + } + + protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) { + ArrayList newHosts = new ArrayList(); + for (String host : orig.hosts) { + if (!isNodeBlacklisted(host)) { + newHosts.add(host); + } + } + String[] hosts = newHosts.toArray(new String[newHosts.size()]); + ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability, + hosts, orig.racks, orig.priority); + return newReq; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 4c09a83ae74..dfbae8092c0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -34,6 +34,7 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -44,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; @@ -478,6 +480,105 @@ public class TestRMContainerAllocator { Assert.assertEquals(100.0f, app.getProgress(), 0.0); } + @Test + public void testBlackListedNodes() throws Exception { + + LOG.info("Running testBlackListedNodes"); + + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); + conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); + + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, + 0, 0, 0, 0, 0, 0, "jobfile")); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); + MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); + MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); + dispatcher.await(); + + // create the container request + ContainerRequestEvent event1 = createReq(jobId, 1, 1024, + new String[] { "h1" }); + allocator.sendRequest(event1); + + // send 1 more request with different resource req + ContainerRequestEvent event2 = createReq(jobId, 2, 1024, + new String[] { "h2" }); + allocator.sendRequest(event2); + + // send another request with different resource and priority + ContainerRequestEvent event3 = createReq(jobId, 3, 1024, + new String[] { "h3" }); + allocator.sendRequest(event3); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // Send events to blacklist nodes h1 and h2 + ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); + allocator.sendFailure(f1); + ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false); + allocator.sendFailure(f2); + + // update resources in scheduler + nodeManager1.nodeHeartbeat(true); // Node heartbeat + nodeManager2.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // mark h1/h2 as bad nodes + nodeManager1.nodeHeartbeat(false); + nodeManager2.nodeHeartbeat(false); + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + nodeManager3.nodeHeartbeat(true); // Node heartbeat + assigned = allocator.schedule(); + dispatcher.await(); + + Assert.assertTrue("No of assignments must be 3", assigned.size() == 3); + + // validate that all containers are assigned to h3 + for (TaskAttemptContainerAssignedEvent assig : assigned) { + Assert.assertTrue("Assigned container host not correct", "h3".equals(assig + .getContainer().getNodeId().getHost())); + } + } + private static class MyFifoScheduler extends FifoScheduler { public MyFifoScheduler(RMContext rmContext) { @@ -534,6 +635,19 @@ public class TestRMContainerAllocator { new String[] { NetworkTopology.DEFAULT_RACK }); } + private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId, + String host, boolean reduce) { + TaskId taskId; + if (reduce) { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); + } else { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + } + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, + taskAttemptId); + return new ContainerFailedEvent(attemptId, host); + } + private void checkAssignments(ContainerRequestEvent[] requests, List assignments, boolean checkHostMatch) { @@ -653,6 +767,10 @@ public class TestRMContainerAllocator { } } + public void sendFailure(ContainerFailedEvent f) { + super.handle(f); + } + // API to be used by tests public List schedule() { // run the scheduler @@ -672,6 +790,7 @@ public class TestRMContainerAllocator { protected void startAllocatorThread() { // override to NOT start thread } + } public static void main(String[] args) throws Exception { @@ -681,5 +800,7 @@ public class TestRMContainerAllocator { t.testMapReduceScheduling(); t.testReportedAppProgress(); t.testReportedAppProgressWithOnlyMaps(); + t.testBlackListedNodes(); } + }