MAPREDUCE-6514. Fixed MapReduce ApplicationMaster to properly updated resources ask after ramping down of all reducers avoiding job hangs. Contributed by Varun Saxena and Wangda Tan.
(cherry picked from commit 8d48266720
)
This commit is contained in:
parent
35859872b7
commit
e9c7a17bc8
|
@ -517,12 +517,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
}
|
}
|
||||||
|
|
||||||
private void clearAllPendingReduceRequests() {
|
private void clearAllPendingReduceRequests() {
|
||||||
LOG.info("Ramping down all scheduled reduces:"
|
rampDownReduces(Integer.MAX_VALUE);
|
||||||
+ scheduledRequests.reduces.size());
|
|
||||||
for (ContainerRequest req : scheduledRequests.reduces.values()) {
|
|
||||||
pendingReduces.add(req);
|
|
||||||
}
|
|
||||||
scheduledRequests.reduces.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void preemptReducer(int hangingMapRequests) {
|
private void preemptReducer(int hangingMapRequests) {
|
||||||
|
@ -694,9 +689,13 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
@Private
|
@Private
|
||||||
public void rampDownReduces(int rampDown) {
|
public void rampDownReduces(int rampDown) {
|
||||||
//remove from the scheduled and move back to pending
|
//remove from the scheduled and move back to pending
|
||||||
for (int i = 0; i < rampDown; i++) {
|
while (rampDown > 0) {
|
||||||
ContainerRequest request = scheduledRequests.removeReduce();
|
ContainerRequest request = scheduledRequests.removeReduce();
|
||||||
|
if (request == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
pendingReduces.add(request);
|
pendingReduces.add(request);
|
||||||
|
rampDown--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -936,6 +935,11 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
Resources.add(assignedMapResource, assignedReduceResource));
|
Resources.add(assignedMapResource, assignedReduceResource));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getNumOfPendingReduces() {
|
||||||
|
return pendingReduces.size();
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
class ScheduledRequests {
|
class ScheduledRequests {
|
||||||
|
@ -951,8 +955,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
final Map<TaskAttemptId, ContainerRequest> maps =
|
final Map<TaskAttemptId, ContainerRequest> maps =
|
||||||
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
|
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
|
||||||
|
|
||||||
private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
|
@VisibleForTesting
|
||||||
|
final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
|
||||||
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
|
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
|
||||||
|
|
||||||
boolean remove(TaskAttemptId tId) {
|
boolean remove(TaskAttemptId tId) {
|
||||||
|
@ -1352,7 +1357,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
class AssignedRequests {
|
class AssignedRequests {
|
||||||
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
|
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
|
||||||
new HashMap<ContainerId, TaskAttemptId>();
|
new HashMap<ContainerId, TaskAttemptId>();
|
||||||
private final LinkedHashMap<TaskAttemptId, Container> maps =
|
@VisibleForTesting
|
||||||
|
final LinkedHashMap<TaskAttemptId, Container> maps =
|
||||||
new LinkedHashMap<TaskAttemptId, Container>();
|
new LinkedHashMap<TaskAttemptId, Container>();
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
final LinkedHashMap<TaskAttemptId, Container> reduces =
|
final LinkedHashMap<TaskAttemptId, Container> reduces =
|
||||||
|
|
|
@ -562,4 +562,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
public Set<String> getBlacklistedNodes() {
|
public Set<String> getBlacklistedNodes() {
|
||||||
return blacklistedNodes;
|
return blacklistedNodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
Set<ResourceRequest> getAsk() {
|
||||||
|
return ask;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1898,6 +1898,7 @@ public class TestRMContainerAllocator {
|
||||||
when(context.getApplicationID()).thenReturn(appId);
|
when(context.getApplicationID()).thenReturn(appId);
|
||||||
when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
|
when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
|
||||||
when(context.getJob(isA(JobId.class))).thenReturn(job);
|
when(context.getJob(isA(JobId.class))).thenReturn(job);
|
||||||
|
when(context.getClock()).thenReturn(new ControlledClock());
|
||||||
when(context.getClusterInfo()).thenReturn(
|
when(context.getClusterInfo()).thenReturn(
|
||||||
new ClusterInfo(Resource.newInstance(10240, 1)));
|
new ClusterInfo(Resource.newInstance(10240, 1)));
|
||||||
when(context.getEventHandler()).thenReturn(new EventHandler() {
|
when(context.getEventHandler()).thenReturn(new EventHandler() {
|
||||||
|
@ -2888,6 +2889,128 @@ public class TestRMContainerAllocator {
|
||||||
allocator.schedule();
|
allocator.schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAskOnRampDownAllReduces() throws Exception {
|
||||||
|
LOG.info("Running testUpdateAskOnRampDownAllReduces");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MyResourceManager rm = new MyResourceManager(conf);
|
||||||
|
rm.start();
|
||||||
|
DrainDispatcher dispatcher =
|
||||||
|
(DrainDispatcher) rm.getRMContext().getDispatcher();
|
||||||
|
|
||||||
|
// Submit the application
|
||||||
|
RMApp app = rm.submitApp(1024);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
|
||||||
|
amNodeManager.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId();
|
||||||
|
rm.sendAMLaunched(appAttemptId);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
||||||
|
Job mockJob = mock(Job.class);
|
||||||
|
when(mockJob.getReport()).thenReturn(
|
||||||
|
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||||
|
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||||
|
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||||
|
appAttemptId, mockJob);
|
||||||
|
// Use a controlled clock to advance time for test.
|
||||||
|
ControlledClock clock = (ControlledClock)allocator.getContext().getClock();
|
||||||
|
clock.setTime(System.currentTimeMillis());
|
||||||
|
|
||||||
|
// Register nodes to RM.
|
||||||
|
MockNM nodeManager = rm.registerNode("h1:1234", 1024);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
|
||||||
|
ContainerRequestEvent event1 =
|
||||||
|
createReq(jobId, 1, 1024, new String[] { "h1" });
|
||||||
|
allocator.sendRequest(event1);
|
||||||
|
ContainerRequestEvent event2 =
|
||||||
|
createReq(jobId, 2, 1024, new String[] { "h2" });
|
||||||
|
allocator.sendRequest(event2);
|
||||||
|
ContainerRequestEvent event3 =
|
||||||
|
createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
|
||||||
|
allocator.sendRequest(event3);
|
||||||
|
|
||||||
|
// This will tell the scheduler about the requests but there will be no
|
||||||
|
// allocations as nodes are not added.
|
||||||
|
allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Advance clock so that maps can be considered as hanging.
|
||||||
|
clock.setTime(System.currentTimeMillis() + 500000L);
|
||||||
|
|
||||||
|
// Request for another reducer on h3 which has not registered.
|
||||||
|
ContainerRequestEvent event4 =
|
||||||
|
createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
|
||||||
|
allocator.sendRequest(event4);
|
||||||
|
|
||||||
|
allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Update resources in scheduler through node heartbeat from h1.
|
||||||
|
nodeManager.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
|
||||||
|
allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// One map is assigned.
|
||||||
|
Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
|
||||||
|
// Send deallocate request for map so that no maps are assigned after this.
|
||||||
|
ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false);
|
||||||
|
allocator.sendDeallocate(deallocate);
|
||||||
|
// Now one reducer should be scheduled and one should be pending.
|
||||||
|
Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size());
|
||||||
|
Assert.assertEquals(1, allocator.getNumOfPendingReduces());
|
||||||
|
// No map should be assigned and one should be scheduled.
|
||||||
|
Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
|
||||||
|
Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
|
||||||
|
|
||||||
|
Assert.assertEquals(6, allocator.getAsk().size());
|
||||||
|
for (ResourceRequest req : allocator.getAsk()) {
|
||||||
|
boolean isReduce =
|
||||||
|
req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
|
||||||
|
if (isReduce) {
|
||||||
|
// 1 reducer each asked on h2, * and default-rack
|
||||||
|
Assert.assertTrue((req.getResourceName().equals("*") ||
|
||||||
|
req.getResourceName().equals("/default-rack") ||
|
||||||
|
req.getResourceName().equals("h2")) && req.getNumContainers() == 1);
|
||||||
|
} else { //map
|
||||||
|
// 0 mappers asked on h1 and 1 each on * and default-rack
|
||||||
|
Assert.assertTrue(((req.getResourceName().equals("*") ||
|
||||||
|
req.getResourceName().equals("/default-rack")) &&
|
||||||
|
req.getNumContainers() == 1) || (req.getResourceName().equals("h1")
|
||||||
|
&& req.getNumContainers() == 0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// On next allocate request to scheduler, headroom reported will be 0.
|
||||||
|
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0));
|
||||||
|
allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
// After allocate response from scheduler, all scheduled reduces are ramped
|
||||||
|
// down and move to pending. 3 asks are also updated with 0 containers to
|
||||||
|
// indicate ramping down of reduces to scheduler.
|
||||||
|
Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
|
||||||
|
Assert.assertEquals(2, allocator.getNumOfPendingReduces());
|
||||||
|
Assert.assertEquals(3, allocator.getAsk().size());
|
||||||
|
for (ResourceRequest req : allocator.getAsk()) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
|
||||||
|
Assert.assertTrue(req.getResourceName().equals("*") ||
|
||||||
|
req.getResourceName().equals("/default-rack") ||
|
||||||
|
req.getResourceName().equals("h2"));
|
||||||
|
Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability());
|
||||||
|
Assert.assertEquals(0, req.getNumContainers());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class MockScheduler implements ApplicationMasterProtocol {
|
private static class MockScheduler implements ApplicationMasterProtocol {
|
||||||
ApplicationAttemptId attemptId;
|
ApplicationAttemptId attemptId;
|
||||||
long nextContainerId = 10;
|
long nextContainerId = 10;
|
||||||
|
|
Loading…
Reference in New Issue