MAPREDUCE-6514. Fixed MapReduce ApplicationMaster to properly updated resources ask after ramping down of all reducers avoiding job hangs. Contributed by Varun Saxena and Wangda Tan.

(cherry picked from commit 8d48266720)
This commit is contained in:
Vinod Kumar Vavilapalli 2016-05-05 19:01:52 -07:00
parent 59eeee7501
commit 921562720e
3 changed files with 145 additions and 10 deletions

View File

@ -517,12 +517,7 @@ public class RMContainerAllocator extends RMContainerRequestor
}
private void clearAllPendingReduceRequests() {
LOG.info("Ramping down all scheduled reduces:"
+ scheduledRequests.reduces.size());
for (ContainerRequest req : scheduledRequests.reduces.values()) {
pendingReduces.add(req);
}
scheduledRequests.reduces.clear();
rampDownReduces(Integer.MAX_VALUE);
}
private void preemptReducer(int hangingMapRequests) {
@ -694,9 +689,13 @@ public class RMContainerAllocator extends RMContainerRequestor
@Private
public void rampDownReduces(int rampDown) {
//remove from the scheduled and move back to pending
for (int i = 0; i < rampDown; i++) {
while (rampDown > 0) {
ContainerRequest request = scheduledRequests.removeReduce();
if (request == null) {
return;
}
pendingReduces.add(request);
rampDown--;
}
}
@ -936,6 +935,11 @@ public class RMContainerAllocator extends RMContainerRequestor
Resources.add(assignedMapResource, assignedReduceResource));
}
@VisibleForTesting
public int getNumOfPendingReduces() {
return pendingReduces.size();
}
@Private
@VisibleForTesting
class ScheduledRequests {
@ -952,7 +956,8 @@ public class RMContainerAllocator extends RMContainerRequestor
final Map<TaskAttemptId, ContainerRequest> maps =
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
@VisibleForTesting
final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
boolean remove(TaskAttemptId tId) {
@ -1352,7 +1357,8 @@ public class RMContainerAllocator extends RMContainerRequestor
class AssignedRequests {
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
new HashMap<ContainerId, TaskAttemptId>();
private final LinkedHashMap<TaskAttemptId, Container> maps =
@VisibleForTesting
final LinkedHashMap<TaskAttemptId, Container> maps =
new LinkedHashMap<TaskAttemptId, Container>();
@VisibleForTesting
final LinkedHashMap<TaskAttemptId, Container> reduces =

View File

@ -562,4 +562,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
public Set<String> getBlacklistedNodes() {
return blacklistedNodes;
}
@Private
@VisibleForTesting
Set<ResourceRequest> getAsk() {
return ask;
}
}

View File

@ -1898,6 +1898,7 @@ public class TestRMContainerAllocator {
when(context.getApplicationID()).thenReturn(appId);
when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
when(context.getJob(isA(JobId.class))).thenReturn(job);
when(context.getClock()).thenReturn(new ControlledClock());
when(context.getClusterInfo()).thenReturn(
new ClusterInfo(Resource.newInstance(10240, 1)));
when(context.getEventHandler()).thenReturn(new EventHandler() {
@ -2888,6 +2889,128 @@ public class TestRMContainerAllocator {
allocator.schedule();
}
@Test
public void testUpdateAskOnRampDownAllReduces() throws Exception {
LOG.info("Running testUpdateAskOnRampDownAllReduces");
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher =
(DrainDispatcher) rm.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
// Use a controlled clock to advance time for test.
ControlledClock clock = (ControlledClock)allocator.getContext().getClock();
clock.setTime(System.currentTimeMillis());
// Register nodes to RM.
MockNM nodeManager = rm.registerNode("h1:1234", 1024);
dispatcher.await();
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 =
createReq(jobId, 1, 1024, new String[] { "h1" });
allocator.sendRequest(event1);
ContainerRequestEvent event2 =
createReq(jobId, 2, 1024, new String[] { "h2" });
allocator.sendRequest(event2);
ContainerRequestEvent event3 =
createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
allocator.sendRequest(event3);
// This will tell the scheduler about the requests but there will be no
// allocations as nodes are not added.
allocator.schedule();
dispatcher.await();
// Advance clock so that maps can be considered as hanging.
clock.setTime(System.currentTimeMillis() + 500000L);
// Request for another reducer on h3 which has not registered.
ContainerRequestEvent event4 =
createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
allocator.sendRequest(event4);
allocator.schedule();
dispatcher.await();
// Update resources in scheduler through node heartbeat from h1.
nodeManager.nodeHeartbeat(true);
dispatcher.await();
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
allocator.schedule();
dispatcher.await();
// One map is assigned.
Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
// Send deallocate request for map so that no maps are assigned after this.
ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false);
allocator.sendDeallocate(deallocate);
// Now one reducer should be scheduled and one should be pending.
Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size());
Assert.assertEquals(1, allocator.getNumOfPendingReduces());
// No map should be assigned and one should be scheduled.
Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
Assert.assertEquals(6, allocator.getAsk().size());
for (ResourceRequest req : allocator.getAsk()) {
boolean isReduce =
req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
if (isReduce) {
// 1 reducer each asked on h2, * and default-rack
Assert.assertTrue((req.getResourceName().equals("*") ||
req.getResourceName().equals("/default-rack") ||
req.getResourceName().equals("h2")) && req.getNumContainers() == 1);
} else { //map
// 0 mappers asked on h1 and 1 each on * and default-rack
Assert.assertTrue(((req.getResourceName().equals("*") ||
req.getResourceName().equals("/default-rack")) &&
req.getNumContainers() == 1) || (req.getResourceName().equals("h1")
&& req.getNumContainers() == 0));
}
}
// On next allocate request to scheduler, headroom reported will be 0.
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0));
allocator.schedule();
dispatcher.await();
// After allocate response from scheduler, all scheduled reduces are ramped
// down and move to pending. 3 asks are also updated with 0 containers to
// indicate ramping down of reduces to scheduler.
Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
Assert.assertEquals(2, allocator.getNumOfPendingReduces());
Assert.assertEquals(3, allocator.getAsk().size());
for (ResourceRequest req : allocator.getAsk()) {
Assert.assertEquals(
RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
Assert.assertTrue(req.getResourceName().equals("*") ||
req.getResourceName().equals("/default-rack") ||
req.getResourceName().equals("h2"));
Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability());
Assert.assertEquals(0, req.getNumContainers());
}
}
private static class MockScheduler implements ApplicationMasterProtocol {
ApplicationAttemptId attemptId;
long nextContainerId = 10;