MAPREDUCE-6771. RMContainerAllocator sends container diagnostics event after corresponding completion event. Contributed by Haibo Chen

This commit is contained in:
Jason Lowe 2016-09-29 15:27:17 +00:00
parent 2ae5a3a5bf
commit a1b8251bf7
2 changed files with 77 additions and 20 deletions

View File

@ -150,7 +150,7 @@ public class RMContainerAllocator extends RMContainerRequestor
new LinkedList<ContainerRequest>(); new LinkedList<ContainerRequest>();
//holds information about the assigned containers to task attempts //holds information about the assigned containers to task attempts
private final AssignedRequests assignedRequests = new AssignedRequests(); private final AssignedRequests assignedRequests;
//holds scheduled requests to be fulfilled by RM //holds scheduled requests to be fulfilled by RM
private final ScheduledRequests scheduledRequests = new ScheduledRequests(); private final ScheduledRequests scheduledRequests = new ScheduledRequests();
@ -200,6 +200,11 @@ public class RMContainerAllocator extends RMContainerRequestor
this.preemptionPolicy = preemptionPolicy; this.preemptionPolicy = preemptionPolicy;
this.stopped = new AtomicBoolean(false); this.stopped = new AtomicBoolean(false);
this.clock = context.getClock(); this.clock = context.getClock();
this.assignedRequests = createAssignedRequests();
}
protected AssignedRequests createAssignedRequests() {
return new AssignedRequests();
} }
@Override @Override
@ -833,28 +838,34 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
for (ContainerStatus cont : finishedContainers) { for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont.getContainerId()); processFinishedContainer(cont);
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId()); }
return newContainers;
}
@SuppressWarnings("unchecked")
@VisibleForTesting
void processFinishedContainer(ContainerStatus container) {
LOG.info("Received completed container " + container.getContainerId());
TaskAttemptId attemptID = assignedRequests.get(container.getContainerId());
if (attemptID == null) { if (attemptID == null) {
LOG.error("Container complete event for unknown container id " LOG.error("Container complete event for unknown container "
+ cont.getContainerId()); + container.getContainerId());
} else { } else {
pendingRelease.remove(cont.getContainerId()); pendingRelease.remove(container.getContainerId());
assignedRequests.remove(attemptID); assignedRequests.remove(attemptID);
// send the container completed event to Task attempt
eventHandler.handle(createContainerFinishedEvent(cont, attemptID));
// Send the diagnostics // Send the diagnostics
String diagnostics = StringInterner.weakIntern(cont.getDiagnostics()); String diagnostic = StringInterner.weakIntern(container.getDiagnostics());
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
diagnostics)); diagnostic));
// send the container completed event to Task attempt
eventHandler.handle(createContainerFinishedEvent(container, attemptID));
preemptionPolicy.handleCompletedContainer(attemptID); preemptionPolicy.handleCompletedContainer(attemptID);
} }
} }
return newContainers;
}
private void applyConcurrentTaskLimits() { private void applyConcurrentTaskLimits() {
int numScheduledMaps = scheduledRequests.maps.size(); int numScheduledMaps = scheduledRequests.maps.size();

View File

@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -70,11 +71,13 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -144,6 +147,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.mockito.InOrder;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class TestRMContainerAllocator { public class TestRMContainerAllocator {
@ -3017,6 +3021,48 @@ public class TestRMContainerAllocator {
} }
} }
/**
* MAPREDUCE-6771. Test if RMContainerAllocator generates the events in the
* right order while processing finished containers.
*/
@Test
public void testHandlingFinishedContainers() {
EventHandler eventHandler = mock(EventHandler.class);
AppContext context = mock(RunningAppContext.class);
when(context.getClock()).thenReturn(new ControlledClock());
when(context.getClusterInfo()).thenReturn(
new ClusterInfo(Resource.newInstance(10240, 1)));
when(context.getEventHandler()).thenReturn(eventHandler);
RMContainerAllocator containerAllocator =
new RMContainerAllocatorForFinishedContainer(null, context,
mock(AMPreemptionPolicy.class));
ContainerStatus finishedContainer = ContainerStatus.newInstance(
mock(ContainerId.class), ContainerState.COMPLETE, "", 0);
containerAllocator.processFinishedContainer(finishedContainer);
InOrder inOrder = inOrder(eventHandler);
inOrder.verify(eventHandler).handle(
isA(TaskAttemptDiagnosticsUpdateEvent.class));
inOrder.verify(eventHandler).handle(isA(TaskAttemptEvent.class));
inOrder.verifyNoMoreInteractions();
}
private static class RMContainerAllocatorForFinishedContainer
extends RMContainerAllocator {
public RMContainerAllocatorForFinishedContainer(ClientService clientService,
AppContext context, AMPreemptionPolicy preemptionPolicy) {
super(clientService, context, preemptionPolicy);
}
@Override
protected AssignedRequests createAssignedRequests() {
AssignedRequests assignedReqs = mock(AssignedRequests.class);
TaskAttemptId taskAttempt = mock(TaskAttemptId.class);
when(assignedReqs.get(any(ContainerId.class))).thenReturn(taskAttempt);
return assignedReqs;
}
}
@Test @Test
public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired()