MAPREDUCE-6302. Incorrect headroom can lead to a deadlock between map and reduce allocations. (kasha)
(cherry picked from commit 4aa9b3e75c
)
Conflicts:
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
This commit is contained in:
parent
0139517811
commit
2d8f9e3fd9
|
@ -328,6 +328,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
MAPREDUCE-6503. archive-logs tool should use HADOOP_PREFIX instead
|
MAPREDUCE-6503. archive-logs tool should use HADOOP_PREFIX instead
|
||||||
of HADOOP_HOME (rkanter)
|
of HADOOP_HOME (rkanter)
|
||||||
|
|
||||||
|
MAPREDUCE-6302. Preempt reducers after a configurable timeout irrespective
|
||||||
|
of headroom. (kasha)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -158,11 +158,13 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
private boolean reduceStarted = false;
|
private boolean reduceStarted = false;
|
||||||
private float maxReduceRampupLimit = 0;
|
private float maxReduceRampupLimit = 0;
|
||||||
private float maxReducePreemptionLimit = 0;
|
private float maxReducePreemptionLimit = 0;
|
||||||
/**
|
|
||||||
* after this threshold, if the container request is not allocated, it is
|
// Mapper allocation timeout, after which a reducer is forcibly preempted
|
||||||
* considered delayed.
|
private long reducerUnconditionalPreemptionDelayMs;
|
||||||
*/
|
|
||||||
private long allocationDelayThresholdMs = 0;
|
// Duration to wait before preempting a reducer when there is NO room
|
||||||
|
private long reducerNoHeadroomPreemptionDelayMs = 0;
|
||||||
|
|
||||||
private float reduceSlowStart = 0;
|
private float reduceSlowStart = 0;
|
||||||
private int maxRunningMaps = 0;
|
private int maxRunningMaps = 0;
|
||||||
private int maxRunningReduces = 0;
|
private int maxRunningReduces = 0;
|
||||||
|
@ -198,7 +200,10 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
maxReducePreemptionLimit = conf.getFloat(
|
maxReducePreemptionLimit = conf.getFloat(
|
||||||
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
|
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
|
||||||
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
|
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
|
||||||
allocationDelayThresholdMs = conf.getInt(
|
reducerUnconditionalPreemptionDelayMs = 1000 * conf.getInt(
|
||||||
|
MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC,
|
||||||
|
MRJobConfig.DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC);
|
||||||
|
reducerNoHeadroomPreemptionDelayMs = conf.getInt(
|
||||||
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
|
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
|
||||||
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
|
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
|
||||||
maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
|
maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
|
||||||
|
@ -462,59 +467,89 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
if (reduceResourceRequest.equals(Resources.none())) {
|
if (reduceResourceRequest.equals(Resources.none())) {
|
||||||
return; // no reduces
|
return; // no reduces
|
||||||
}
|
}
|
||||||
//check if reduces have taken over the whole cluster and there are
|
|
||||||
//unassigned maps
|
|
||||||
if (scheduledRequests.maps.size() > 0) {
|
|
||||||
Resource resourceLimit = getResourceLimit();
|
|
||||||
Resource availableResourceForMap =
|
|
||||||
Resources.subtract(
|
|
||||||
resourceLimit,
|
|
||||||
Resources.multiply(reduceResourceRequest,
|
|
||||||
assignedRequests.reduces.size()
|
|
||||||
- assignedRequests.preemptionWaitingReduces.size()));
|
|
||||||
// availableMemForMap must be sufficient to run at least 1 map
|
|
||||||
if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
|
|
||||||
mapResourceRequest, getSchedulerResourceTypes()) <= 0) {
|
|
||||||
// to make sure new containers are given to maps and not reduces
|
|
||||||
// ramp down all scheduled reduces if any
|
|
||||||
// (since reduces are scheduled at higher priority than maps)
|
|
||||||
LOG.info("Ramping down all scheduled reduces:"
|
|
||||||
+ scheduledRequests.reduces.size());
|
|
||||||
for (ContainerRequest req : scheduledRequests.reduces.values()) {
|
|
||||||
pendingReduces.add(req);
|
|
||||||
}
|
|
||||||
scheduledRequests.reduces.clear();
|
|
||||||
|
|
||||||
//do further checking to find the number of map requests that were
|
|
||||||
//hanging around for a while
|
|
||||||
int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
|
|
||||||
if (hangingMapRequests > 0) {
|
|
||||||
// preempt for making space for at least one map
|
|
||||||
int preemptionReduceNumForOneMap =
|
|
||||||
ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
|
|
||||||
reduceResourceRequest, getSchedulerResourceTypes());
|
|
||||||
int preemptionReduceNumForPreemptionLimit =
|
|
||||||
ResourceCalculatorUtils.divideAndCeilContainers(
|
|
||||||
Resources.multiply(resourceLimit, maxReducePreemptionLimit),
|
|
||||||
reduceResourceRequest, getSchedulerResourceTypes());
|
|
||||||
int preemptionReduceNumForAllMaps =
|
|
||||||
ResourceCalculatorUtils.divideAndCeilContainers(
|
|
||||||
Resources.multiply(mapResourceRequest, hangingMapRequests),
|
|
||||||
reduceResourceRequest, getSchedulerResourceTypes());
|
|
||||||
int toPreempt =
|
|
||||||
Math.min(Math.max(preemptionReduceNumForOneMap,
|
|
||||||
preemptionReduceNumForPreemptionLimit),
|
|
||||||
preemptionReduceNumForAllMaps);
|
|
||||||
|
|
||||||
LOG.info("Going to preempt " + toPreempt
|
if (assignedRequests.maps.size() > 0) {
|
||||||
+ " due to lack of space for maps");
|
// there are assigned mappers
|
||||||
assignedRequests.preemptReduce(toPreempt);
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (scheduledRequests.maps.size() <= 0) {
|
||||||
|
// there are no pending requests for mappers
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// At this point:
|
||||||
|
// we have pending mappers and all assigned resources are taken by reducers
|
||||||
|
|
||||||
|
if (reducerUnconditionalPreemptionDelayMs >= 0) {
|
||||||
|
// Unconditional preemption is enabled.
|
||||||
|
// If mappers are pending for longer than the configured threshold,
|
||||||
|
// preempt reducers irrespective of what the headroom is.
|
||||||
|
if (preemptReducersForHangingMapRequests(
|
||||||
|
reducerUnconditionalPreemptionDelayMs)) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The pending mappers haven't been waiting for too long. Let us see if
|
||||||
|
// the headroom can fit a mapper.
|
||||||
|
Resource availableResourceForMap = getAvailableResources();
|
||||||
|
if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
|
||||||
|
mapResourceRequest, getSchedulerResourceTypes()) > 0) {
|
||||||
|
// the available headroom is enough to run a mapper
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Available headroom is not enough to run mapper. See if we should hold
|
||||||
|
// off before preempting reducers and preempt if okay.
|
||||||
|
preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
|
private boolean preemptReducersForHangingMapRequests(long pendingThreshold) {
|
||||||
|
int hangingMapRequests = getNumHangingRequests(
|
||||||
|
pendingThreshold, scheduledRequests.maps);
|
||||||
|
if (hangingMapRequests > 0) {
|
||||||
|
preemptReducer(hangingMapRequests);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void clearAllPendingReduceRequests() {
|
||||||
|
LOG.info("Ramping down all scheduled reduces:"
|
||||||
|
+ scheduledRequests.reduces.size());
|
||||||
|
for (ContainerRequest req : scheduledRequests.reduces.values()) {
|
||||||
|
pendingReduces.add(req);
|
||||||
|
}
|
||||||
|
scheduledRequests.reduces.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void preemptReducer(int hangingMapRequests) {
|
||||||
|
clearAllPendingReduceRequests();
|
||||||
|
|
||||||
|
// preempt for making space for at least one map
|
||||||
|
int preemptionReduceNumForOneMap =
|
||||||
|
ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
|
||||||
|
reduceResourceRequest, getSchedulerResourceTypes());
|
||||||
|
int preemptionReduceNumForPreemptionLimit =
|
||||||
|
ResourceCalculatorUtils.divideAndCeilContainers(
|
||||||
|
Resources.multiply(getResourceLimit(), maxReducePreemptionLimit),
|
||||||
|
reduceResourceRequest, getSchedulerResourceTypes());
|
||||||
|
int preemptionReduceNumForAllMaps =
|
||||||
|
ResourceCalculatorUtils.divideAndCeilContainers(
|
||||||
|
Resources.multiply(mapResourceRequest, hangingMapRequests),
|
||||||
|
reduceResourceRequest, getSchedulerResourceTypes());
|
||||||
|
int toPreempt =
|
||||||
|
Math.min(Math.max(preemptionReduceNumForOneMap,
|
||||||
|
preemptionReduceNumForPreemptionLimit),
|
||||||
|
preemptionReduceNumForAllMaps);
|
||||||
|
|
||||||
|
LOG.info("Going to preempt " + toPreempt
|
||||||
|
+ " due to lack of space for maps");
|
||||||
|
assignedRequests.preemptReduce(toPreempt);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getNumHangingRequests(long allocationDelayThresholdMs,
|
||||||
|
Map<TaskAttemptId, ContainerRequest> requestMap) {
|
||||||
if (allocationDelayThresholdMs <= 0)
|
if (allocationDelayThresholdMs <= 0)
|
||||||
return requestMap.size();
|
return requestMap.size();
|
||||||
int hangingRequests = 0;
|
int hangingRequests = 0;
|
||||||
|
@ -542,9 +577,6 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
|
|
||||||
// get available resources for this job
|
// get available resources for this job
|
||||||
Resource headRoom = getAvailableResources();
|
Resource headRoom = getAvailableResources();
|
||||||
if (headRoom == null) {
|
|
||||||
headRoom = Resources.none();
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.info("Recalculating schedule, headroom=" + headRoom);
|
LOG.info("Recalculating schedule, headroom=" + headRoom);
|
||||||
|
|
||||||
|
@ -671,9 +703,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
applyConcurrentTaskLimits();
|
applyConcurrentTaskLimits();
|
||||||
|
|
||||||
// will be null the first time
|
// will be null the first time
|
||||||
Resource headRoom =
|
Resource headRoom = Resources.clone(getAvailableResources());
|
||||||
getAvailableResources() == null ? Resources.none() :
|
|
||||||
Resources.clone(getAvailableResources());
|
|
||||||
AllocateResponse response;
|
AllocateResponse response;
|
||||||
/*
|
/*
|
||||||
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
|
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
|
||||||
|
@ -714,9 +744,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
// continue to attempt to contact the RM.
|
// continue to attempt to contact the RM.
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
Resource newHeadRoom =
|
Resource newHeadRoom = getAvailableResources();
|
||||||
getAvailableResources() == null ? Resources.none()
|
|
||||||
: getAvailableResources();
|
|
||||||
List<Container> newContainers = response.getAllocatedContainers();
|
List<Container> newContainers = response.getAllocatedContainers();
|
||||||
// Setting NMTokens
|
// Setting NMTokens
|
||||||
if (response.getNMTokens() != null) {
|
if (response.getNMTokens() != null) {
|
||||||
|
@ -876,9 +904,6 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
@Private
|
@Private
|
||||||
public Resource getResourceLimit() {
|
public Resource getResourceLimit() {
|
||||||
Resource headRoom = getAvailableResources();
|
Resource headRoom = getAvailableResources();
|
||||||
if (headRoom == null) {
|
|
||||||
headRoom = Resources.none();
|
|
||||||
}
|
|
||||||
Resource assignedMapResource =
|
Resource assignedMapResource =
|
||||||
Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
|
Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
|
||||||
Resource assignedReduceResource =
|
Resource assignedReduceResource =
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -386,7 +387,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Resource getAvailableResources() {
|
protected Resource getAvailableResources() {
|
||||||
return availableResources;
|
return availableResources == null ? Resources.none() : availableResources;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void addContainerReq(ContainerRequest req) {
|
protected void addContainerReq(ContainerRequest req) {
|
||||||
|
|
|
@ -430,7 +430,7 @@ public class TestRMContainerAllocator {
|
||||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||||
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||||
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||||
appAttemptId, mockJob);
|
appAttemptId, mockJob, new SystemClock());
|
||||||
// add resources to scheduler
|
// add resources to scheduler
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
|
@ -564,6 +564,69 @@ public class TestRMContainerAllocator {
|
||||||
assignedRequests.preemptionWaitingReduces.size());
|
assignedRequests.preemptionWaitingReduces.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testUnconditionalPreemptReducers() throws Exception {
|
||||||
|
LOG.info("Running testForcePreemptReducers");
|
||||||
|
|
||||||
|
int forcePreemptThresholdSecs = 2;
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
|
||||||
|
2 * forcePreemptThresholdSecs);
|
||||||
|
conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC,
|
||||||
|
forcePreemptThresholdSecs);
|
||||||
|
|
||||||
|
MyResourceManager rm = new MyResourceManager(conf);
|
||||||
|
rm.start();
|
||||||
|
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8));
|
||||||
|
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
|
||||||
|
.getDispatcher();
|
||||||
|
|
||||||
|
// Submit the application
|
||||||
|
RMApp app = rm.submitApp(1024);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||||
|
amNodeManager.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId();
|
||||||
|
rm.sendAMLaunched(appAttemptId);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
||||||
|
Job mockJob = mock(Job.class);
|
||||||
|
when(mockJob.getReport()).thenReturn(
|
||||||
|
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||||
|
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||||
|
ControlledClock clock = new ControlledClock(null);
|
||||||
|
clock.setTime(1);
|
||||||
|
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||||
|
appAttemptId, mockJob, clock);
|
||||||
|
allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1));
|
||||||
|
allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1));
|
||||||
|
RMContainerAllocator.AssignedRequests assignedRequests =
|
||||||
|
allocator.getAssignedRequests();
|
||||||
|
RMContainerAllocator.ScheduledRequests scheduledRequests =
|
||||||
|
allocator.getScheduledRequests();
|
||||||
|
ContainerRequestEvent event1 =
|
||||||
|
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
|
||||||
|
scheduledRequests.maps.put(mock(TaskAttemptId.class),
|
||||||
|
new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
|
||||||
|
assignedRequests.reduces.put(mock(TaskAttemptId.class),
|
||||||
|
mock(Container.class));
|
||||||
|
|
||||||
|
clock.setTime(clock.getTime() + 1);
|
||||||
|
allocator.preemptReducesIfNeeded();
|
||||||
|
Assert.assertEquals("The reducer is preeempted too soon", 0,
|
||||||
|
assignedRequests.preemptionWaitingReduces.size());
|
||||||
|
|
||||||
|
clock.setTime(clock.getTime() + 1000 * forcePreemptThresholdSecs);
|
||||||
|
allocator.preemptReducesIfNeeded();
|
||||||
|
Assert.assertEquals("The reducer is not preeempted", 1,
|
||||||
|
assignedRequests.preemptionWaitingReduces.size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
public void testExcessReduceContainerAssign() throws Exception {
|
public void testExcessReduceContainerAssign() throws Exception {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
|
@ -589,7 +652,7 @@ public class TestRMContainerAllocator {
|
||||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||||
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||||
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||||
appAttemptId, mockJob);
|
appAttemptId, mockJob, new SystemClock());
|
||||||
|
|
||||||
// request to allocate two reduce priority containers
|
// request to allocate two reduce priority containers
|
||||||
final String[] locations = new String[] { host };
|
final String[] locations = new String[] { host };
|
||||||
|
@ -633,7 +696,8 @@ public class TestRMContainerAllocator {
|
||||||
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||||
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
|
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
|
||||||
MyContainerAllocator allocator =
|
MyContainerAllocator allocator =
|
||||||
new MyContainerAllocator(null, conf, appAttemptId, mockJob) {
|
new MyContainerAllocator(null, conf, appAttemptId, mockJob,
|
||||||
|
new SystemClock()) {
|
||||||
@Override
|
@Override
|
||||||
protected void register() {
|
protected void register() {
|
||||||
}
|
}
|
||||||
|
@ -725,7 +789,7 @@ public class TestRMContainerAllocator {
|
||||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||||
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||||
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||||
appAttemptId, mockJob);
|
appAttemptId, mockJob, new SystemClock());
|
||||||
|
|
||||||
// add resources to scheduler
|
// add resources to scheduler
|
||||||
MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
|
MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
|
||||||
|
@ -1628,6 +1692,7 @@ public class TestRMContainerAllocator {
|
||||||
List<ContainerId> lastRelease = null;
|
List<ContainerId> lastRelease = null;
|
||||||
List<String> lastBlacklistAdditions;
|
List<String> lastBlacklistAdditions;
|
||||||
List<String> lastBlacklistRemovals;
|
List<String> lastBlacklistRemovals;
|
||||||
|
Resource forceResourceLimit = null;
|
||||||
|
|
||||||
// override this to copy the objects otherwise FifoScheduler updates the
|
// override this to copy the objects otherwise FifoScheduler updates the
|
||||||
// numContainers in same objects as kept by RMContainerAllocator
|
// numContainers in same objects as kept by RMContainerAllocator
|
||||||
|
@ -1650,9 +1715,18 @@ public class TestRMContainerAllocator {
|
||||||
lastRelease = release;
|
lastRelease = release;
|
||||||
lastBlacklistAdditions = blacklistAdditions;
|
lastBlacklistAdditions = blacklistAdditions;
|
||||||
lastBlacklistRemovals = blacklistRemovals;
|
lastBlacklistRemovals = blacklistRemovals;
|
||||||
return super.allocate(
|
Allocation allocation = super.allocate(
|
||||||
applicationAttemptId, askCopy, release, blacklistAdditions,
|
applicationAttemptId, askCopy, release, blacklistAdditions,
|
||||||
blacklistRemovals, increaseRequests, decreaseRequests);
|
blacklistRemovals, increaseRequests, decreaseRequests);
|
||||||
|
if (forceResourceLimit != null) {
|
||||||
|
// Test wants to force the non-default resource limit
|
||||||
|
allocation.setResourceLimit(forceResourceLimit);
|
||||||
|
}
|
||||||
|
return allocation;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void forceResourceLimit(Resource resource) {
|
||||||
|
this.forceResourceLimit = resource;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2672,7 +2746,7 @@ public class TestRMContainerAllocator {
|
||||||
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||||
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
|
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
|
||||||
MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
|
MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
|
||||||
appAttemptId, mockJob) {
|
appAttemptId, mockJob, new SystemClock()) {
|
||||||
@Override
|
@Override
|
||||||
protected void register() {
|
protected void register() {
|
||||||
}
|
}
|
||||||
|
|
|
@ -703,10 +703,18 @@ public interface MRJobConfig {
|
||||||
10 * 1000l;
|
10 * 1000l;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The threshold in terms of seconds after which an unsatisfied mapper request
|
* Duration to wait before forcibly preempting a reducer to allow
|
||||||
* triggers reducer preemption to free space. Default 0 implies that the reduces
|
* allocating new mappers, even when YARN reports positive headroom.
|
||||||
* should be preempted immediately after allocation if there is currently no
|
*/
|
||||||
* room for newly allocated mappers.
|
public static final String MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC =
|
||||||
|
"mapreduce.job.reducer.unconditional-preempt.delay.sec";
|
||||||
|
|
||||||
|
public static final int
|
||||||
|
DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC = 5 * 60;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Duration to wait before preempting a reducer, when there is no headroom
|
||||||
|
* to allocate new mappers.
|
||||||
*/
|
*/
|
||||||
public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
|
public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
|
||||||
"mapreduce.job.reducer.preempt.delay.sec";
|
"mapreduce.job.reducer.preempt.delay.sec";
|
||||||
|
|
|
@ -98,17 +98,28 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.job.reducer.preempt.delay.sec</name>
|
<name>mapreduce.job.reducer.preempt.delay.sec</name>
|
||||||
<value>0</value>
|
<value>0</value>
|
||||||
<description>The threshold in terms of seconds after which an unsatisfied mapper
|
<description>The threshold (in seconds) after which an unsatisfied
|
||||||
request triggers reducer preemption to free space. Default 0 implies that the
|
mapper request triggers reducer preemption when there is no anticipated
|
||||||
reduces should be preempted immediately after allocation if there is currently no
|
headroom. If set to 0 or a negative value, the reducer is preempted as
|
||||||
room for newly allocated mappers.
|
soon as lack of headroom is detected. Default is 0.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
<name>mapreduce.job.reducer.unconditional-preempt.delay.sec</name>
|
||||||
|
<value>300</value>
|
||||||
|
<description>The threshold (in seconds) after which an unsatisfied
|
||||||
|
mapper request triggers a forced reducer preemption irrespective of the
|
||||||
|
anticipated headroom. By default, it is set to 5 mins. Setting it to 0
|
||||||
|
leads to immediate reducer preemption. Setting to -1 disables this
|
||||||
|
preemption altogether.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
<name>mapreduce.job.max.split.locations</name>
|
<name>mapreduce.job.max.split.locations</name>
|
||||||
<value>10</value>
|
<value>10</value>
|
||||||
<description>The max number of block locations to store for each split for
|
<description>The max number of block locations to store for each split for
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
|
@ -30,13 +31,13 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
public class Allocation {
|
public class Allocation {
|
||||||
|
|
||||||
final List<Container> containers;
|
final List<Container> containers;
|
||||||
final Resource resourceLimit;
|
|
||||||
final Set<ContainerId> strictContainers;
|
final Set<ContainerId> strictContainers;
|
||||||
final Set<ContainerId> fungibleContainers;
|
final Set<ContainerId> fungibleContainers;
|
||||||
final List<ResourceRequest> fungibleResources;
|
final List<ResourceRequest> fungibleResources;
|
||||||
final List<NMToken> nmTokens;
|
final List<NMToken> nmTokens;
|
||||||
final List<Container> increasedContainers;
|
final List<Container> increasedContainers;
|
||||||
final List<Container> decreasedContainers;
|
final List<Container> decreasedContainers;
|
||||||
|
private Resource resourceLimit;
|
||||||
|
|
||||||
|
|
||||||
public Allocation(List<Container> containers, Resource resourceLimit,
|
public Allocation(List<Container> containers, Resource resourceLimit,
|
||||||
|
@ -98,4 +99,9 @@ public class Allocation {
|
||||||
public List<Container> getDecreasedContainers() {
|
public List<Container> getDecreasedContainers() {
|
||||||
return decreasedContainers;
|
return decreasedContainers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setResourceLimit(Resource resource) {
|
||||||
|
this.resourceLimit = resource;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue