MAPREDUCE-6771. RMContainerAllocator sends container diagnostics event after corresponding completion event. Contributed by Haibo Chen

(cherry picked from commit c52ad9ee86)
This commit is contained in:
Jason Lowe 2016-09-29 15:50:50 +00:00
parent 1c8e388181
commit db93a64eca
2 changed files with 74 additions and 18 deletions

View File

@ -142,7 +142,7 @@ public class RMContainerAllocator extends RMContainerRequestor
new LinkedList<ContainerRequest>(); new LinkedList<ContainerRequest>();
//holds information about the assigned containers to task attempts //holds information about the assigned containers to task attempts
private final AssignedRequests assignedRequests = new AssignedRequests(); private final AssignedRequests assignedRequests;
//holds scheduled requests to be fulfilled by RM //holds scheduled requests to be fulfilled by RM
private final ScheduledRequests scheduledRequests = new ScheduledRequests(); private final ScheduledRequests scheduledRequests = new ScheduledRequests();
@ -188,6 +188,11 @@ public class RMContainerAllocator extends RMContainerRequestor
super(clientService, context); super(clientService, context);
this.stopped = new AtomicBoolean(false); this.stopped = new AtomicBoolean(false);
this.clock = context.getClock(); this.clock = context.getClock();
this.assignedRequests = createAssignedRequests();
}
protected AssignedRequests createAssignedRequests() {
return new AssignedRequests();
} }
@Override @Override
@ -797,27 +802,33 @@ public class RMContainerAllocator extends RMContainerRequestor
handleJobPriorityChange(response); handleJobPriorityChange(response);
for (ContainerStatus cont : finishedContainers) { for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont.getContainerId()); processFinishedContainer(cont);
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
if (attemptID == null) {
LOG.error("Container complete event for unknown container id "
+ cont.getContainerId());
} else {
pendingRelease.remove(cont.getContainerId());
assignedRequests.remove(attemptID);
// send the container completed event to Task attempt
eventHandler.handle(createContainerFinishedEvent(cont, attemptID));
// Send the diagnostics
String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
diagnostics));
}
} }
return newContainers; return newContainers;
} }
@SuppressWarnings("unchecked")
@VisibleForTesting
void processFinishedContainer(ContainerStatus container) {
LOG.info("Received completed container " + container.getContainerId());
TaskAttemptId attemptID = assignedRequests.get(container.getContainerId());
if (attemptID == null) {
LOG.error("Container complete event for unknown container "
+ container.getContainerId());
} else {
pendingRelease.remove(container.getContainerId());
assignedRequests.remove(attemptID);
// Send the diagnostics
String diagnostic = StringInterner.weakIntern(container.getDiagnostics());
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
diagnostic));
// send the container completed event to Task attempt
eventHandler.handle(createContainerFinishedEvent(container, attemptID));
}
}
private void applyConcurrentTaskLimits() { private void applyConcurrentTaskLimits() {
int numScheduledMaps = scheduledRequests.maps.size(); int numScheduledMaps = scheduledRequests.maps.size();
if (maxRunningMaps > 0 && numScheduledMaps > 0) { if (maxRunningMaps > 0 && numScheduledMaps > 0) {

View File

@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -59,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
@ -69,6 +71,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
@ -142,6 +145,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.mockito.InOrder;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class TestRMContainerAllocator { public class TestRMContainerAllocator {
@ -3011,6 +3015,47 @@ public class TestRMContainerAllocator {
} }
} }
/**
* MAPREDUCE-6771. Test if RMContainerAllocator generates the events in the
* right order while processing finished containers.
*/
@Test
public void testHandlingFinishedContainers() {
EventHandler eventHandler = mock(EventHandler.class);
AppContext context = mock(MRAppMaster.RunningAppContext.class);
when(context.getClock()).thenReturn(new ControlledClock());
when(context.getClusterInfo()).thenReturn(
new ClusterInfo(Resource.newInstance(10240, 1)));
when(context.getEventHandler()).thenReturn(eventHandler);
RMContainerAllocator containerAllocator =
new RMContainerAllocatorForFinishedContainer(null, context);
ContainerStatus finishedContainer = ContainerStatus.newInstance(
mock(ContainerId.class), ContainerState.COMPLETE, "", 0);
containerAllocator.processFinishedContainer(finishedContainer);
InOrder inOrder = inOrder(eventHandler);
inOrder.verify(eventHandler).handle(
isA(TaskAttemptDiagnosticsUpdateEvent.class));
inOrder.verify(eventHandler).handle(isA(TaskAttemptEvent.class));
inOrder.verifyNoMoreInteractions();
}
private static class RMContainerAllocatorForFinishedContainer
extends RMContainerAllocator {
public RMContainerAllocatorForFinishedContainer(ClientService clientService,
AppContext context) {
super(clientService, context);
}
@Override
protected AssignedRequests createAssignedRequests() {
AssignedRequests assignedReqs = mock(AssignedRequests.class);
TaskAttemptId taskAttempt = mock(TaskAttemptId.class);
when(assignedReqs.get(any(ContainerId.class))).thenReturn(taskAttempt);
return assignedReqs;
}
}
@Test @Test
public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired()