MAPREDUCE-7240. Fix Invalid event: TA_TOO_MANY_FETCH_FAILURE at SUCCESS_FINISHING_CONTAINER.

Contributed by Huachao and Peter Bacsko. Reviewed by Wilfred Spiegelenburg.
This commit is contained in:
prabhujoseph 2019-11-27 16:51:45 +05:30
parent 80a97e6ac2
commit 6a4966f336
2 changed files with 33 additions and 0 deletions

View File

@ -382,6 +382,10 @@ public abstract class TaskAttemptImpl implements
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
new TooManyFetchFailureTransition())
// ignore-able events // ignore-able events
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
@ -2148,6 +2152,10 @@ public abstract class TaskAttemptImpl implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
if (taskAttempt.getInternalState() ==
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER) {
sendContainerCleanup(taskAttempt, event);
}
TaskAttemptTooManyFetchFailureEvent fetchFailureEvent = TaskAttemptTooManyFetchFailureEvent fetchFailureEvent =
(TaskAttemptTooManyFetchFailureEvent) event; (TaskAttemptTooManyFetchFailureEvent) event;
// too many fetch failure can only happen for map tasks // too many fetch failure can only happen for map tasks

View File

@ -1782,6 +1782,31 @@ public class TestTaskAttempt{
ResourceUtils.resetResourceTypes(conf); ResourceUtils.resetResourceTypes(conf);
} }
@Test
public void testTooManyFetchFailureWhileSuccessFinishing() throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
TaskId reducetaskId = MRBuilderUtils.newTaskId(taImpl.getID().getTaskId()
.getJobId(), 1, TaskType.REDUCE);
TaskAttemptId reduceTAId =
MRBuilderUtils.newTaskAttemptId(reducetaskId, 0);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_DONE));
assertEquals("Task attempt's internal state is not " +
"SUCCESS_FINISHING_CONTAINER",
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
taImpl.getInternalState());
taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(taImpl.getID(),
reduceTAId, "Host"));
assertEquals("Task attempt is not in FAILED state",
TaskAttemptState.FAILED,
taImpl.getState());
assertFalse("InternalError occurred", eventHandler.internalError);
}
private void setupTaskAttemptFinishingMonitor( private void setupTaskAttemptFinishingMonitor(
EventHandler eventHandler, JobConf jobConf, AppContext appCtx) { EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
TaskAttemptFinishingMonitor taskAttemptFinishingMonitor = TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =