From 7b9c074b7635e3dcdc38d4e7fb1afbff7145e698 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Thu, 19 Jun 2014 17:22:56 +0000 Subject: [PATCH] MAPREDUCE-5844. Add a configurable delay to reducer-preemption. (Maysam Yabandeh via kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603957 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 4 +- .../v2/app/rm/RMContainerAllocator.java | 156 ++++++++++++------ .../v2/app/rm/RMContainerRequestor.java | 31 +++- .../{ => rm}/TestRMContainerAllocator.java | 120 +++++++++++++- .../apache/hadoop/mapreduce/MRJobConfig.java | 12 +- .../src/main/resources/mapred-default.xml | 10 ++ 7 files changed, 277 insertions(+), 59 deletions(-) rename hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/{ => rm}/TestRMContainerAllocator.java (93%) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ae327063310..8feab76d100 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -216,6 +216,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5896. InputSplits should indicate which locations have the block cached in memory. (Sandy Ryza via kasha) + MAPREDUCE-5844. Add a configurable delay to reducer-preemption. + (Maysam Yabandeh via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml index 5bdd618eed5..dd4892b1e28 100644 --- a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml +++ b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml @@ -475,8 +475,8 @@ - - + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index b9d283fe80e..11bc4063fff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -143,15 +144,21 @@ public class RMContainerAllocator extends RMContainerRequestor private int lastCompletedTasks = 0; private boolean recalculateReduceSchedule = false; - private int mapResourceReqt;//memory - private int reduceResourceReqt;//memory + private int mapResourceRequest;//memory + private int reduceResourceRequest;//memory private boolean reduceStarted = false; private float maxReduceRampupLimit = 0; private float maxReducePreemptionLimit = 0; + /** + * after this threshold, if the container request is not allocated, it is + * considered delayed. + */ + private long allocationDelayThresholdMs = 0; private float reduceSlowStart = 0; private long retryInterval; private long retrystartTime; + private Clock clock; private final AMPreemptionPolicy preemptionPolicy; @@ -166,6 +173,7 @@ public class RMContainerAllocator extends RMContainerRequestor super(clientService, context); this.preemptionPolicy = preemptionPolicy; this.stopped = new AtomicBoolean(false); + this.clock = context.getClock(); } @Override @@ -180,6 +188,9 @@ public class RMContainerAllocator extends RMContainerRequestor maxReducePreemptionLimit = conf.getFloat( MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); + allocationDelayThresholdMs = conf.getInt( + MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, + MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms RackResolver.init(conf); retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); @@ -246,7 +257,7 @@ public class RMContainerAllocator extends RMContainerRequestor getJob().getTotalMaps(), completedMaps, scheduledRequests.maps.size(), scheduledRequests.reduces.size(), assignedRequests.maps.size(), assignedRequests.reduces.size(), - mapResourceReqt, reduceResourceReqt, + mapResourceRequest, reduceResourceRequest, pendingReduces.size(), maxReduceRampupLimit, reduceSlowStart); recalculateReduceSchedule = false; @@ -268,6 +279,18 @@ public class RMContainerAllocator extends RMContainerRequestor scheduleStats.log("Final Stats: "); } + @Private + @VisibleForTesting + AssignedRequests getAssignedRequests() { + return assignedRequests; + } + + @Private + @VisibleForTesting + ScheduledRequests getScheduledRequests() { + return scheduledRequests; + } + public boolean getIsReduceStarted() { return reduceStarted; } @@ -303,16 +326,16 @@ public class RMContainerAllocator extends RMContainerRequestor int supportedMaxContainerCapability = getMaxContainerCapability().getMemory(); if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { - if (mapResourceReqt == 0) { - mapResourceReqt = reqEvent.getCapability().getMemory(); + if (mapResourceRequest == 0) { + mapResourceRequest = reqEvent.getCapability().getMemory(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, - mapResourceReqt))); - LOG.info("mapResourceReqt:"+mapResourceReqt); - if (mapResourceReqt > supportedMaxContainerCapability) { + mapResourceRequest))); + LOG.info("mapResourceRequest:"+ mapResourceRequest); + if (mapResourceRequest > supportedMaxContainerCapability) { String diagMsg = "MAP capability required is more than the supported " + - "max container capability in the cluster. Killing the Job. mapResourceReqt: " + - mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability; + "max container capability in the cluster. Killing the Job. mapResourceRequest: " + + mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( jobId, diagMsg)); @@ -320,20 +343,20 @@ public class RMContainerAllocator extends RMContainerRequestor } } //set the rounded off memory - reqEvent.getCapability().setMemory(mapResourceReqt); + reqEvent.getCapability().setMemory(mapResourceRequest); scheduledRequests.addMap(reqEvent);//maps are immediately scheduled } else { - if (reduceResourceReqt == 0) { - reduceResourceReqt = reqEvent.getCapability().getMemory(); + if (reduceResourceRequest == 0) { + reduceResourceRequest = reqEvent.getCapability().getMemory(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, - reduceResourceReqt))); - LOG.info("reduceResourceReqt:"+reduceResourceReqt); - if (reduceResourceReqt > supportedMaxContainerCapability) { + reduceResourceRequest))); + LOG.info("reduceResourceRequest:"+ reduceResourceRequest); + if (reduceResourceRequest > supportedMaxContainerCapability) { String diagMsg = "REDUCE capability required is more than the " + "supported max container capability in the cluster. Killing the " + - "Job. reduceResourceReqt: " + reduceResourceReqt + + "Job. reduceResourceRequest: " + reduceResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( @@ -342,7 +365,7 @@ public class RMContainerAllocator extends RMContainerRequestor } } //set the rounded off memory - reqEvent.getCapability().setMemory(reduceResourceReqt); + reqEvent.getCapability().setMemory(reduceResourceRequest); if (reqEvent.getEarlierAttemptFailed()) { //add to the front of queue for fail fast pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); @@ -394,8 +417,22 @@ public class RMContainerAllocator extends RMContainerRequestor return host; } - private void preemptReducesIfNeeded() { - if (reduceResourceReqt == 0) { + @Private + @VisibleForTesting + synchronized void setReduceResourceRequest(int mem) { + this.reduceResourceRequest = mem; + } + + @Private + @VisibleForTesting + synchronized void setMapResourceRequest(int mem) { + this.mapResourceRequest = mem; + } + + @Private + @VisibleForTesting + void preemptReducesIfNeeded() { + if (reduceResourceRequest == 0) { return; //no reduces } //check if reduces have taken over the whole cluster and there are @@ -403,9 +440,9 @@ public class RMContainerAllocator extends RMContainerRequestor if (scheduledRequests.maps.size() > 0) { int memLimit = getMemLimit(); int availableMemForMap = memLimit - ((assignedRequests.reduces.size() - - assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt); + assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest); //availableMemForMap must be sufficient to run atleast 1 map - if (availableMemForMap < mapResourceReqt) { + if (availableMemForMap < mapResourceRequest) { //to make sure new containers are given to maps and not reduces //ramp down all scheduled reduces if any //(since reduces are scheduled at higher priority than maps) @@ -414,22 +451,40 @@ public class RMContainerAllocator extends RMContainerRequestor pendingReduces.add(req); } scheduledRequests.reduces.clear(); - - //preempt for making space for at least one map - int premeptionLimit = Math.max(mapResourceReqt, - (int) (maxReducePreemptionLimit * memLimit)); - - int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, - premeptionLimit); - - int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt); - toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); - - LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); - assignedRequests.preemptReduce(toPreempt); + + //do further checking to find the number of map requests that were + //hanging around for a while + int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps); + if (hangingMapRequests > 0) { + //preempt for making space for at least one map + int premeptionLimit = Math.max(mapResourceRequest, + (int) (maxReducePreemptionLimit * memLimit)); + + int preemptMem = Math.min(hangingMapRequests * mapResourceRequest, + premeptionLimit); + + int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest); + toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); + + LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); + assignedRequests.preemptReduce(toPreempt); + } } } } + + private int getNumOfHangingRequests(Map requestMap) { + if (allocationDelayThresholdMs <= 0) + return requestMap.size(); + int hangingRequests = 0; + long currTime = clock.getTime(); + for (ContainerRequest request: requestMap.values()) { + long delay = currTime - request.requestTimeMs; + if (delay > allocationDelayThresholdMs) + hangingRequests++; + } + return hangingRequests; + } @Private public void scheduleReduces( @@ -715,11 +770,13 @@ public class RMContainerAllocator extends RMContainerRequestor @Private public int getMemLimit() { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; - return headRoom + assignedRequests.maps.size() * mapResourceReqt + - assignedRequests.reduces.size() * reduceResourceReqt; + return headRoom + assignedRequests.maps.size() * mapResourceRequest + + assignedRequests.reduces.size() * reduceResourceRequest; } - - private class ScheduledRequests { + + @Private + @VisibleForTesting + class ScheduledRequests { private final LinkedList earlierFailedMaps = new LinkedList(); @@ -729,7 +786,8 @@ public class RMContainerAllocator extends RMContainerRequestor new HashMap>(); private final Map> mapsRackMapping = new HashMap>(); - private final Map maps = + @VisibleForTesting + final Map maps = new LinkedHashMap(); private final LinkedHashMap reduces = @@ -825,22 +883,22 @@ public class RMContainerAllocator extends RMContainerRequestor int allocatedMemory = allocated.getResource().getMemory(); if (PRIORITY_FAST_FAIL_MAP.equals(priority) || PRIORITY_MAP.equals(priority)) { - if (allocatedMemory < mapResourceReqt + if (allocatedMemory < mapResourceRequest || maps.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a map as either " - + " container memory less than required " + mapResourceReqt + + " container memory less than required " + mapResourceRequest + " or no pending map tasks - maps.isEmpty=" + maps.isEmpty()); isAssignable = false; } } else if (PRIORITY_REDUCE.equals(priority)) { - if (allocatedMemory < reduceResourceReqt + if (allocatedMemory < reduceResourceRequest || reduces.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a reduce as either " - + " container memory less than required " + reduceResourceReqt + + " container memory less than required " + reduceResourceRequest + " or no pending reduce tasks - reduces.isEmpty=" + reduces.isEmpty()); isAssignable = false; @@ -1119,14 +1177,18 @@ public class RMContainerAllocator extends RMContainerRequestor } } - private class AssignedRequests { + @Private + @VisibleForTesting + class AssignedRequests { private final Map containerToAttemptMap = new HashMap(); private final LinkedHashMap maps = new LinkedHashMap(); - private final LinkedHashMap reduces = + @VisibleForTesting + final LinkedHashMap reduces = new LinkedHashMap(); - private final Set preemptionWaitingReduces = + @VisibleForTesting + final Set preemptionWaitingReduces = new HashSet(); void add(Container container, TaskAttemptId tId) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index a9b5ce58479..18242119451 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -29,8 +29,10 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -96,6 +98,8 @@ public abstract class RMContainerRequestor extends RMCommunicator { super(clientService, context); } + @Private + @VisibleForTesting static class ContainerRequest { final TaskAttemptId attemptID; final Resource capability; @@ -103,20 +107,39 @@ public abstract class RMContainerRequestor extends RMCommunicator { final String[] racks; //final boolean earlierAttemptFailed; final Priority priority; - + /** + * the time when this request object was formed; can be used to avoid + * aggressive preemption for recently placed requests + */ + final long requestTimeMs; + public ContainerRequest(ContainerRequestEvent event, Priority priority) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), event.getRacks(), priority); } - + + public ContainerRequest(ContainerRequestEvent event, Priority priority, + long requestTimeMs) { + this(event.getAttemptID(), event.getCapability(), event.getHosts(), + event.getRacks(), priority, requestTimeMs); + } + public ContainerRequest(TaskAttemptId attemptID, - Resource capability, String[] hosts, String[] racks, - Priority priority) { + Resource capability, String[] hosts, String[] racks, + Priority priority) { + this(attemptID, capability, hosts, racks, priority, + System.currentTimeMillis()); + } + + public ContainerRequest(TaskAttemptId attemptID, + Resource capability, String[] hosts, String[] racks, + Priority priority, long requestTimeMs) { this.attemptID = attemptID; this.capability = capability; this.hosts = hosts; this.racks = racks; this.priority = priority; + this.requestTimeMs = requestTimeMs; } public String toString() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java similarity index 93% rename from hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 9c041870758..4c74d7b5c52 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.mapreduce.v2.app; +package org.apache.hadoop.mapreduce.v2.app.rm; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; @@ -40,6 +40,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; +import org.apache.hadoop.mapreduce.v2.app.ControlledClock; +import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -65,10 +69,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; -import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -80,6 +80,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -422,6 +423,115 @@ public class TestRMContainerAllocator { killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC)); } + @Test(timeout = 30000) + public void testPreemptReducers() throws Exception { + LOG.info("Running testPreemptReducers"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob, new SystemClock()); + allocator.setMapResourceRequest(1024); + allocator.setReduceResourceRequest(1024); + RMContainerAllocator.AssignedRequests assignedRequests = + allocator.getAssignedRequests(); + RMContainerAllocator.ScheduledRequests scheduledRequests = + allocator.getScheduledRequests(); + ContainerRequestEvent event1 = + createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + scheduledRequests.maps.put(mock(TaskAttemptId.class), + new RMContainerRequestor.ContainerRequest(event1, null)); + assignedRequests.reduces.put(mock(TaskAttemptId.class), + mock(Container.class)); + + allocator.preemptReducesIfNeeded(); + Assert.assertEquals("The reducer is not preempted", + 1, assignedRequests.preemptionWaitingReduces.size()); + } + + @Test(timeout = 30000) + public void testNonAggressivelyPreemptReducers() throws Exception { + LOG.info("Running testPreemptReducers"); + + final int preemptThreshold = 2; //sec + Configuration conf = new Configuration(); + conf.setInt( + MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, + preemptThreshold); + + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + ControlledClock clock = new ControlledClock(null); + clock.setTime(1); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob, clock); + allocator.setMapResourceRequest(1024); + allocator.setReduceResourceRequest(1024); + RMContainerAllocator.AssignedRequests assignedRequests = + allocator.getAssignedRequests(); + RMContainerAllocator.ScheduledRequests scheduledRequests = + allocator.getScheduledRequests(); + ContainerRequestEvent event1 = + createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + scheduledRequests.maps.put(mock(TaskAttemptId.class), + new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); + assignedRequests.reduces.put(mock(TaskAttemptId.class), + mock(Container.class)); + + clock.setTime(clock.getTime() + 1); + allocator.preemptReducesIfNeeded(); + Assert.assertEquals("The reducer is aggressively preeempted", 0, + assignedRequests.preemptionWaitingReduces.size()); + + clock.setTime(clock.getTime() + (preemptThreshold) * 1000); + allocator.preemptReducesIfNeeded(); + Assert.assertEquals("The reducer is not preeempted", 1, + assignedRequests.preemptionWaitingReduces.size()); + } + @Test public void testMapReduceScheduling() throws Exception { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 33d79ee4118..4795af78d2a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -579,7 +579,17 @@ public interface MRJobConfig { MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold"; public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = 50; - + + /** + * The threshold in terms of seconds after which an unsatisfied mapper request + * triggers reducer preemption to free space. Default 0 implies that the reduces + * should be preempted immediately after allocation if there is currently no + * room for newly allocated mappers. + */ + public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC = + "mapreduce.job.reducer.preempt.delay.sec"; + public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0; + public static final String MR_AM_ENV = MR_AM_PREFIX + "env"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 9034064cf39..508b0331024 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -82,6 +82,16 @@ + + mapreduce.job.reducer.preempt.delay.sec + 0 + The threshold in terms of seconds after which an unsatisfied mapper + request triggers reducer preemption to free space. Default 0 implies that the + reduces should be preempted immediately after allocation if there is currently no + room for newly allocated mappers. + + + mapreduce.job.max.split.locations 10