MAPREDUCE-6689. MapReduce job can infinitely increase number of reducer resource requests. Contributed by Wangda Tan
(cherry picked from commit c9bb96fa81
)
This commit is contained in:
parent
b0bcb4e728
commit
5f698df901
|
@ -78,6 +78,9 @@ Release 2.7.3 - UNRELEASED
|
|||
MAPREDUCE-5817. Mappers get rescheduled on node transition even after all
|
||||
reducers are completed. (Sangjin Lee via kasha)
|
||||
|
||||
MAPREDUCE-6689. MapReduce job can infinitely increase number of reducer
|
||||
resource requests (Wangda Tan via jlowe)
|
||||
|
||||
Release 2.7.2 - 2016-01-25
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -267,14 +267,18 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
}
|
||||
|
||||
if (recalculateReduceSchedule) {
|
||||
preemptReducesIfNeeded();
|
||||
scheduleReduces(
|
||||
getJob().getTotalMaps(), completedMaps,
|
||||
boolean reducerPreempted = preemptReducesIfNeeded();
|
||||
|
||||
if (!reducerPreempted) {
|
||||
// Only schedule new reducers if no reducer preemption happens for
|
||||
// this heartbeat
|
||||
scheduleReduces(getJob().getTotalMaps(), completedMaps,
|
||||
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
|
||||
assignedRequests.maps.size(), assignedRequests.reduces.size(),
|
||||
mapResourceRequest, reduceResourceRequest,
|
||||
pendingReduces.size(),
|
||||
mapResourceRequest, reduceResourceRequest, pendingReduces.size(),
|
||||
maxReduceRampupLimit, reduceSlowStart);
|
||||
}
|
||||
|
||||
recalculateReduceSchedule = false;
|
||||
}
|
||||
|
||||
|
@ -455,30 +459,30 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
void preemptReducesIfNeeded() {
|
||||
boolean preemptReducesIfNeeded() {
|
||||
if (reduceResourceRequest.equals(Resources.none())) {
|
||||
return; // no reduces
|
||||
return false; // no reduces
|
||||
}
|
||||
|
||||
if (assignedRequests.maps.size() > 0) {
|
||||
// there are assigned mappers
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (scheduledRequests.maps.size() <= 0) {
|
||||
// there are no pending requests for mappers
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
// At this point:
|
||||
// we have pending mappers and all assigned resources are taken by reducers
|
||||
|
||||
if (reducerUnconditionalPreemptionDelayMs >= 0) {
|
||||
// Unconditional preemption is enabled.
|
||||
// If mappers are pending for longer than the configured threshold,
|
||||
// preempt reducers irrespective of what the headroom is.
|
||||
if (preemptReducersForHangingMapRequests(
|
||||
reducerUnconditionalPreemptionDelayMs)) {
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -488,12 +492,12 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
|
||||
mapResourceRequest, getSchedulerResourceTypes()) > 0) {
|
||||
// the available headroom is enough to run a mapper
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Available headroom is not enough to run mapper. See if we should hold
|
||||
// off before preempting reducers and preempt if okay.
|
||||
preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
|
||||
return preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
|
||||
}
|
||||
|
||||
private boolean preemptReducersForHangingMapRequests(long pendingThreshold) {
|
||||
|
|
|
@ -2809,6 +2809,133 @@ public class TestRMContainerAllocator {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired()
|
||||
throws Exception {
|
||||
LOG.info("Running testAvoidAskMoreReducersWhenReducerPreemptionIsRequired");
|
||||
Configuration conf = new Configuration();
|
||||
MyResourceManager rm = new MyResourceManager(conf);
|
||||
rm.start();
|
||||
DrainDispatcher dispatcher =
|
||||
(DrainDispatcher) rm.getRMContext().getDispatcher();
|
||||
|
||||
// Submit the application
|
||||
RMApp app = rm.submitApp(1024);
|
||||
dispatcher.await();
|
||||
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
||||
.getAppAttemptId();
|
||||
rm.sendAMLaunched(appAttemptId);
|
||||
dispatcher.await();
|
||||
|
||||
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
||||
Job mockJob = mock(Job.class);
|
||||
when(mockJob.getReport()).thenReturn(
|
||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||
appAttemptId, mockJob);
|
||||
// Use a controlled clock to advance time for test.
|
||||
ControlledClock clock = (ControlledClock)allocator.getContext().getClock();
|
||||
clock.setTime(System.currentTimeMillis());
|
||||
|
||||
// Register nodes to RM.
|
||||
MockNM nodeManager = rm.registerNode("h1:1234", 1024);
|
||||
dispatcher.await();
|
||||
|
||||
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
|
||||
ContainerRequestEvent event1 =
|
||||
createReq(jobId, 1, 1024, new String[] { "h1" });
|
||||
allocator.sendRequest(event1);
|
||||
ContainerRequestEvent event2 =
|
||||
createReq(jobId, 2, 1024, new String[] { "h2" });
|
||||
allocator.sendRequest(event2);
|
||||
ContainerRequestEvent event3 =
|
||||
createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
|
||||
allocator.sendRequest(event3);
|
||||
|
||||
// This will tell the scheduler about the requests but there will be no
|
||||
// allocations as nodes are not added.
|
||||
allocator.schedule();
|
||||
dispatcher.await();
|
||||
|
||||
// Advance clock so that maps can be considered as hanging.
|
||||
clock.setTime(System.currentTimeMillis() + 500000L);
|
||||
|
||||
// Request for another reducer on h3 which has not registered.
|
||||
ContainerRequestEvent event4 =
|
||||
createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
|
||||
allocator.sendRequest(event4);
|
||||
|
||||
allocator.schedule();
|
||||
dispatcher.await();
|
||||
|
||||
// Update resources in scheduler through node heartbeat from h1.
|
||||
nodeManager.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
|
||||
allocator.schedule();
|
||||
dispatcher.await();
|
||||
|
||||
// One map is assigned.
|
||||
Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
|
||||
// Send deallocate request for map so that no maps are assigned after this.
|
||||
ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false);
|
||||
allocator.sendDeallocate(deallocate);
|
||||
// Now one reducer should be scheduled and one should be pending.
|
||||
Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size());
|
||||
Assert.assertEquals(1, allocator.getNumOfPendingReduces());
|
||||
// No map should be assigned and one should be scheduled.
|
||||
Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
|
||||
Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
|
||||
|
||||
Assert.assertEquals(6, allocator.getAsk().size());
|
||||
for (ResourceRequest req : allocator.getAsk()) {
|
||||
boolean isReduce =
|
||||
req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
|
||||
if (isReduce) {
|
||||
// 1 reducer each asked on h2, * and default-rack
|
||||
Assert.assertTrue((req.getResourceName().equals("*") ||
|
||||
req.getResourceName().equals("/default-rack") ||
|
||||
req.getResourceName().equals("h2")) && req.getNumContainers() == 1);
|
||||
} else { //map
|
||||
// 0 mappers asked on h1 and 1 each on * and default-rack
|
||||
Assert.assertTrue(((req.getResourceName().equals("*") ||
|
||||
req.getResourceName().equals("/default-rack")) &&
|
||||
req.getNumContainers() == 1) || (req.getResourceName().equals("h1")
|
||||
&& req.getNumContainers() == 0));
|
||||
}
|
||||
}
|
||||
|
||||
clock.setTime(System.currentTimeMillis() + 500000L + 10 * 60 * 1000);
|
||||
|
||||
// On next allocate request to scheduler, headroom reported will be 2048.
|
||||
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 0));
|
||||
allocator.schedule();
|
||||
dispatcher.await();
|
||||
// After allocate response from scheduler, all scheduled reduces are ramped
|
||||
// down and move to pending. 3 asks are also updated with 0 containers to
|
||||
// indicate ramping down of reduces to scheduler.
|
||||
Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
|
||||
Assert.assertEquals(2, allocator.getNumOfPendingReduces());
|
||||
Assert.assertEquals(3, allocator.getAsk().size());
|
||||
for (ResourceRequest req : allocator.getAsk()) {
|
||||
Assert.assertEquals(
|
||||
RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
|
||||
Assert.assertTrue(req.getResourceName().equals("*") ||
|
||||
req.getResourceName().equals("/default-rack") ||
|
||||
req.getResourceName().equals("h2"));
|
||||
Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability());
|
||||
Assert.assertEquals(0, req.getNumContainers());
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockScheduler implements ApplicationMasterProtocol {
|
||||
ApplicationAttemptId attemptId;
|
||||
long nextContainerId = 10;
|
||||
|
|
Loading…
Reference in New Issue