MAPREDUCE-5844. Add a configurable delay to reducer-preemption. (Maysam Yabandeh via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1604032 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-06-19 21:13:14 +00:00
parent 16d769c864
commit ff2886e909
7 changed files with 277 additions and 59 deletions

View File

@ -71,6 +71,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5896. InputSplits should indicate which locations have the block MAPREDUCE-5896. InputSplits should indicate which locations have the block
cached in memory. (Sandy Ryza via kasha) cached in memory. (Sandy Ryza via kasha)
MAPREDUCE-5844. Add a configurable delay to reducer-preemption.
(Maysam Yabandeh via kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -475,8 +475,8 @@
<Match> <Match>
<Class name="org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator" /> <Class name="org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator" />
<Or> <Or>
<Field name="mapResourceReqt" /> <Field name="mapResourceRequest" />
<Field name="reduceResourceReqt" /> <Field name="reduceResourceRequest" />
<Field name="maxReduceRampupLimit" /> <Field name="maxReduceRampupLimit" />
<Field name="reduceSlowStart" /> <Field name="reduceSlowStart" />
</Or> </Or>

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -141,15 +142,21 @@ public class RMContainerAllocator extends RMContainerRequestor
private int lastCompletedTasks = 0; private int lastCompletedTasks = 0;
private boolean recalculateReduceSchedule = false; private boolean recalculateReduceSchedule = false;
private int mapResourceReqt;//memory private int mapResourceRequest;//memory
private int reduceResourceReqt;//memory private int reduceResourceRequest;//memory
private boolean reduceStarted = false; private boolean reduceStarted = false;
private float maxReduceRampupLimit = 0; private float maxReduceRampupLimit = 0;
private float maxReducePreemptionLimit = 0; private float maxReducePreemptionLimit = 0;
/**
* after this threshold, if the container request is not allocated, it is
* considered delayed.
*/
private long allocationDelayThresholdMs = 0;
private float reduceSlowStart = 0; private float reduceSlowStart = 0;
private long retryInterval; private long retryInterval;
private long retrystartTime; private long retrystartTime;
private Clock clock;
@VisibleForTesting @VisibleForTesting
protected BlockingQueue<ContainerAllocatorEvent> eventQueue protected BlockingQueue<ContainerAllocatorEvent> eventQueue
@ -160,6 +167,7 @@ public class RMContainerAllocator extends RMContainerRequestor
public RMContainerAllocator(ClientService clientService, AppContext context) { public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context); super(clientService, context);
this.stopped = new AtomicBoolean(false); this.stopped = new AtomicBoolean(false);
this.clock = context.getClock();
} }
@Override @Override
@ -174,6 +182,9 @@ public class RMContainerAllocator extends RMContainerRequestor
maxReducePreemptionLimit = conf.getFloat( maxReducePreemptionLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
allocationDelayThresholdMs = conf.getInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
RackResolver.init(conf); RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@ -240,7 +251,7 @@ public class RMContainerAllocator extends RMContainerRequestor
getJob().getTotalMaps(), completedMaps, getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(), scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(), assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceReqt, reduceResourceReqt, mapResourceRequest, reduceResourceRequest,
pendingReduces.size(), pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart); maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false; recalculateReduceSchedule = false;
@ -262,6 +273,18 @@ public class RMContainerAllocator extends RMContainerRequestor
scheduleStats.log("Final Stats: "); scheduleStats.log("Final Stats: ");
} }
@Private
@VisibleForTesting
AssignedRequests getAssignedRequests() {
return assignedRequests;
}
@Private
@VisibleForTesting
ScheduledRequests getScheduledRequests() {
return scheduledRequests;
}
public boolean getIsReduceStarted() { public boolean getIsReduceStarted() {
return reduceStarted; return reduceStarted;
} }
@ -297,16 +320,16 @@ public class RMContainerAllocator extends RMContainerRequestor
int supportedMaxContainerCapability = int supportedMaxContainerCapability =
getMaxContainerCapability().getMemory(); getMaxContainerCapability().getMemory();
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
if (mapResourceReqt == 0) { if (mapResourceRequest == 0) {
mapResourceReqt = reqEvent.getCapability().getMemory(); mapResourceRequest = reqEvent.getCapability().getMemory();
eventHandler.handle(new JobHistoryEvent(jobId, eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
mapResourceReqt))); mapResourceRequest)));
LOG.info("mapResourceReqt:"+mapResourceReqt); LOG.info("mapResourceRequest:"+ mapResourceRequest);
if (mapResourceReqt > supportedMaxContainerCapability) { if (mapResourceRequest > supportedMaxContainerCapability) {
String diagMsg = "MAP capability required is more than the supported " + String diagMsg = "MAP capability required is more than the supported " +
"max container capability in the cluster. Killing the Job. mapResourceReqt: " + "max container capability in the cluster. Killing the Job. mapResourceRequest: " +
mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability; mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg); LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent( eventHandler.handle(new JobDiagnosticsUpdateEvent(
jobId, diagMsg)); jobId, diagMsg));
@ -314,20 +337,20 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
} }
//set the rounded off memory //set the rounded off memory
reqEvent.getCapability().setMemory(mapResourceReqt); reqEvent.getCapability().setMemory(mapResourceRequest);
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
} else { } else {
if (reduceResourceReqt == 0) { if (reduceResourceRequest == 0) {
reduceResourceReqt = reqEvent.getCapability().getMemory(); reduceResourceRequest = reqEvent.getCapability().getMemory();
eventHandler.handle(new JobHistoryEvent(jobId, eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent( new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE, org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceReqt))); reduceResourceRequest)));
LOG.info("reduceResourceReqt:"+reduceResourceReqt); LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
if (reduceResourceReqt > supportedMaxContainerCapability) { if (reduceResourceRequest > supportedMaxContainerCapability) {
String diagMsg = "REDUCE capability required is more than the " + String diagMsg = "REDUCE capability required is more than the " +
"supported max container capability in the cluster. Killing the " + "supported max container capability in the cluster. Killing the " +
"Job. reduceResourceReqt: " + reduceResourceReqt + "Job. reduceResourceRequest: " + reduceResourceRequest +
" maxContainerCapability:" + supportedMaxContainerCapability; " maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg); LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent( eventHandler.handle(new JobDiagnosticsUpdateEvent(
@ -336,7 +359,7 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
} }
//set the rounded off memory //set the rounded off memory
reqEvent.getCapability().setMemory(reduceResourceReqt); reqEvent.getCapability().setMemory(reduceResourceRequest);
if (reqEvent.getEarlierAttemptFailed()) { if (reqEvent.getEarlierAttemptFailed()) {
//add to the front of queue for fail fast //add to the front of queue for fail fast
pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
@ -384,8 +407,22 @@ public class RMContainerAllocator extends RMContainerRequestor
return host; return host;
} }
private void preemptReducesIfNeeded() { @Private
if (reduceResourceReqt == 0) { @VisibleForTesting
synchronized void setReduceResourceRequest(int mem) {
this.reduceResourceRequest = mem;
}
@Private
@VisibleForTesting
synchronized void setMapResourceRequest(int mem) {
this.mapResourceRequest = mem;
}
@Private
@VisibleForTesting
void preemptReducesIfNeeded() {
if (reduceResourceRequest == 0) {
return; //no reduces return; //no reduces
} }
//check if reduces have taken over the whole cluster and there are //check if reduces have taken over the whole cluster and there are
@ -393,9 +430,9 @@ public class RMContainerAllocator extends RMContainerRequestor
if (scheduledRequests.maps.size() > 0) { if (scheduledRequests.maps.size() > 0) {
int memLimit = getMemLimit(); int memLimit = getMemLimit();
int availableMemForMap = memLimit - ((assignedRequests.reduces.size() - int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt); assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest);
//availableMemForMap must be sufficient to run atleast 1 map //availableMemForMap must be sufficient to run atleast 1 map
if (availableMemForMap < mapResourceReqt) { if (availableMemForMap < mapResourceRequest) {
//to make sure new containers are given to maps and not reduces //to make sure new containers are given to maps and not reduces
//ramp down all scheduled reduces if any //ramp down all scheduled reduces if any
//(since reduces are scheduled at higher priority than maps) //(since reduces are scheduled at higher priority than maps)
@ -405,22 +442,40 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
scheduledRequests.reduces.clear(); scheduledRequests.reduces.clear();
//preempt for making space for atleast one map //do further checking to find the number of map requests that were
int premeptionLimit = Math.max(mapResourceReqt, //hanging around for a while
(int) (maxReducePreemptionLimit * memLimit)); int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
if (hangingMapRequests > 0) {
//preempt for making space for at least one map
int premeptionLimit = Math.max(mapResourceRequest,
(int) (maxReducePreemptionLimit * memLimit));
int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
premeptionLimit); premeptionLimit);
int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt); int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest);
toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
LOG.info("Going to preempt " + toPreempt); LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
assignedRequests.preemptReduce(toPreempt); assignedRequests.preemptReduce(toPreempt);
}
} }
} }
} }
private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
if (allocationDelayThresholdMs <= 0)
return requestMap.size();
int hangingRequests = 0;
long currTime = clock.getTime();
for (ContainerRequest request: requestMap.values()) {
long delay = currTime - request.requestTimeMs;
if (delay > allocationDelayThresholdMs)
hangingRequests++;
}
return hangingRequests;
}
@Private @Private
public void scheduleReduces( public void scheduleReduces(
int totalMaps, int completedMaps, int totalMaps, int completedMaps,
@ -695,11 +750,13 @@ public class RMContainerAllocator extends RMContainerRequestor
@Private @Private
public int getMemLimit() { public int getMemLimit() {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
return headRoom + assignedRequests.maps.size() * mapResourceReqt + return headRoom + assignedRequests.maps.size() * mapResourceRequest +
assignedRequests.reduces.size() * reduceResourceReqt; assignedRequests.reduces.size() * reduceResourceRequest;
} }
private class ScheduledRequests { @Private
@VisibleForTesting
class ScheduledRequests {
private final LinkedList<TaskAttemptId> earlierFailedMaps = private final LinkedList<TaskAttemptId> earlierFailedMaps =
new LinkedList<TaskAttemptId>(); new LinkedList<TaskAttemptId>();
@ -709,7 +766,8 @@ public class RMContainerAllocator extends RMContainerRequestor
new HashMap<String, LinkedList<TaskAttemptId>>(); new HashMap<String, LinkedList<TaskAttemptId>>();
private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping =
new HashMap<String, LinkedList<TaskAttemptId>>(); new HashMap<String, LinkedList<TaskAttemptId>>();
private final Map<TaskAttemptId, ContainerRequest> maps = @VisibleForTesting
final Map<TaskAttemptId, ContainerRequest> maps =
new LinkedHashMap<TaskAttemptId, ContainerRequest>(); new LinkedHashMap<TaskAttemptId, ContainerRequest>();
private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces = private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
@ -805,22 +863,22 @@ public class RMContainerAllocator extends RMContainerRequestor
int allocatedMemory = allocated.getResource().getMemory(); int allocatedMemory = allocated.getResource().getMemory();
if (PRIORITY_FAST_FAIL_MAP.equals(priority) if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) { || PRIORITY_MAP.equals(priority)) {
if (allocatedMemory < mapResourceReqt if (allocatedMemory < mapResourceRequest
|| maps.isEmpty()) { || maps.isEmpty()) {
LOG.info("Cannot assign container " + allocated LOG.info("Cannot assign container " + allocated
+ " for a map as either " + " for a map as either "
+ " container memory less than required " + mapResourceReqt + " container memory less than required " + mapResourceRequest
+ " or no pending map tasks - maps.isEmpty=" + " or no pending map tasks - maps.isEmpty="
+ maps.isEmpty()); + maps.isEmpty());
isAssignable = false; isAssignable = false;
} }
} }
else if (PRIORITY_REDUCE.equals(priority)) { else if (PRIORITY_REDUCE.equals(priority)) {
if (allocatedMemory < reduceResourceReqt if (allocatedMemory < reduceResourceRequest
|| reduces.isEmpty()) { || reduces.isEmpty()) {
LOG.info("Cannot assign container " + allocated LOG.info("Cannot assign container " + allocated
+ " for a reduce as either " + " for a reduce as either "
+ " container memory less than required " + reduceResourceReqt + " container memory less than required " + reduceResourceRequest
+ " or no pending reduce tasks - reduces.isEmpty=" + " or no pending reduce tasks - reduces.isEmpty="
+ reduces.isEmpty()); + reduces.isEmpty());
isAssignable = false; isAssignable = false;
@ -1099,14 +1157,18 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
} }
private class AssignedRequests { @Private
@VisibleForTesting
class AssignedRequests {
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap = private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
new HashMap<ContainerId, TaskAttemptId>(); new HashMap<ContainerId, TaskAttemptId>();
private final LinkedHashMap<TaskAttemptId, Container> maps = private final LinkedHashMap<TaskAttemptId, Container> maps =
new LinkedHashMap<TaskAttemptId, Container>(); new LinkedHashMap<TaskAttemptId, Container>();
private final LinkedHashMap<TaskAttemptId, Container> reduces = @VisibleForTesting
final LinkedHashMap<TaskAttemptId, Container> reduces =
new LinkedHashMap<TaskAttemptId, Container>(); new LinkedHashMap<TaskAttemptId, Container>();
private final Set<TaskAttemptId> preemptionWaitingReduces = @VisibleForTesting
final Set<TaskAttemptId> preemptionWaitingReduces =
new HashSet<TaskAttemptId>(); new HashSet<TaskAttemptId>();
void add(Container container, TaskAttemptId tId) { void add(Container container, TaskAttemptId tId) {

View File

@ -29,8 +29,10 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@ -96,6 +98,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
super(clientService, context); super(clientService, context);
} }
@Private
@VisibleForTesting
static class ContainerRequest { static class ContainerRequest {
final TaskAttemptId attemptID; final TaskAttemptId attemptID;
final Resource capability; final Resource capability;
@ -103,20 +107,39 @@ public abstract class RMContainerRequestor extends RMCommunicator {
final String[] racks; final String[] racks;
//final boolean earlierAttemptFailed; //final boolean earlierAttemptFailed;
final Priority priority; final Priority priority;
/**
* the time when this request object was formed; can be used to avoid
* aggressive preemption for recently placed requests
*/
final long requestTimeMs;
public ContainerRequest(ContainerRequestEvent event, Priority priority) { public ContainerRequest(ContainerRequestEvent event, Priority priority) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(), this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority); event.getRacks(), priority);
} }
public ContainerRequest(ContainerRequestEvent event, Priority priority,
long requestTimeMs) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority, requestTimeMs);
}
public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
Priority priority) {
this(attemptID, capability, hosts, racks, priority,
System.currentTimeMillis());
}
public ContainerRequest(TaskAttemptId attemptID, public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks, Resource capability, String[] hosts, String[] racks,
Priority priority) { Priority priority, long requestTimeMs) {
this.attemptID = attemptID; this.attemptID = attemptID;
this.capability = capability; this.capability = capability;
this.hosts = hosts; this.hosts = hosts;
this.racks = racks; this.racks = racks;
this.priority = priority; this.priority = priority;
this.requestTimeMs = requestTimeMs;
} }
public String toString() { public String toString() {

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app.rm;
import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
@ -40,6 +40,10 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -65,10 +69,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -79,6 +79,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
@ -421,6 +422,115 @@ public class TestRMContainerAllocator {
killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC)); killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC));
} }
@Test(timeout = 30000)
public void testPreemptReducers() throws Exception {
LOG.info("Running testPreemptReducers");
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock());
allocator.setMapResourceRequest(1024);
allocator.setReduceResourceRequest(1024);
RMContainerAllocator.AssignedRequests assignedRequests =
allocator.getAssignedRequests();
RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests();
ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is not preempted",
1, assignedRequests.preemptionWaitingReduces.size());
}
@Test(timeout = 30000)
public void testNonAggressivelyPreemptReducers() throws Exception {
LOG.info("Running testPreemptReducers");
final int preemptThreshold = 2; //sec
Configuration conf = new Configuration();
conf.setInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
preemptThreshold);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
ControlledClock clock = new ControlledClock(null);
clock.setTime(1);
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, clock);
allocator.setMapResourceRequest(1024);
allocator.setReduceResourceRequest(1024);
RMContainerAllocator.AssignedRequests assignedRequests =
allocator.getAssignedRequests();
RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests();
ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
clock.setTime(clock.getTime() + 1);
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is aggressively preeempted", 0,
assignedRequests.preemptionWaitingReduces.size());
clock.setTime(clock.getTime() + (preemptThreshold) * 1000);
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is not preeempted", 1,
assignedRequests.preemptionWaitingReduces.size());
}
@Test @Test
public void testMapReduceScheduling() throws Exception { public void testMapReduceScheduling() throws Exception {

View File

@ -569,6 +569,16 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50; 50;
/**
* The threshold in terms of seconds after which an unsatisfied mapper request
* triggers reducer preemption to free space. Default 0 implies that the reduces
* should be preempted immediately after allocation if there is currently no
* room for newly allocated mappers.
*/
public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
"mapreduce.job.reducer.preempt.delay.sec";
public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0;
public static final String MR_AM_ENV = public static final String MR_AM_ENV =
MR_AM_PREFIX + "env"; MR_AM_PREFIX + "env";

View File

@ -285,6 +285,16 @@
</property> </property>
<property>
<name>mapreduce.job.reducer.preempt.delay.sec</name>
<value>0</value>
<description>The threshold in terms of seconds after which an unsatisfied mapper
request triggers reducer preemption to free space. Default 0 implies that the
reduces should be preempted immediately after allocation if there is currently no
room for newly allocated mappers.
</description>
</property>
<property> <property>
<name>mapreduce.job.max.split.locations</name> <name>mapreduce.job.max.split.locations</name>
<value>10</value> <value>10</value>