From 30c95ddaff2619d00bd232e52086ea57ed3664a1 Mon Sep 17 00:00:00 2001 From: prabhujoseph Date: Thu, 28 Nov 2019 14:34:50 +0530 Subject: [PATCH] MAPREDUCE-7249. Fix Invalid event TA_TOO_MANY_FETCH_FAILURE at SUCCESS_CONTAINER_CLEANUP causes job failure. Contributed by Wilfred Spiegelenburg. (cherry picked from commit a97f7776bd05f957a998b4883141855cad5afb31) --- .../v2/app/job/impl/TaskAttemptImpl.java | 16 ++++++---- .../v2/app/job/impl/TestTaskAttempt.java | 32 +++++++++++++++++++ 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 7dcdff754ab..d4947d17c79 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -469,12 +469,16 @@ public abstract class TaskAttemptImpl implements TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING) - // Transitions from SUCCESS_CONTAINER_CLEANUP state - // kill and cleanup the container - .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptStateInternal.SUCCEEDED, - TaskAttemptEventType.TA_CONTAINER_CLEANED) - .addTransition( + // Transitions from SUCCESS_CONTAINER_CLEANUP state + // kill and cleanup the container + .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + TaskAttemptStateInternal.SUCCEEDED, + TaskAttemptEventType.TA_CONTAINER_CLEANED) + .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + TaskAttemptStateInternal.FAILED, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE, + new TooManyFetchFailureTransition()) + .addTransition( TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 9fab43f5113..4c739552973 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -1773,6 +1773,38 @@ public class TestTaskAttempt{ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); } + @Test + public void testTooManyFetchFailureWhileContainerCleanup() { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + TaskId reducetaskId = MRBuilderUtils.newTaskId(taImpl.getID().getTaskId() + .getJobId(), 1, TaskType.REDUCE); + TaskAttemptId reduceTAId = + MRBuilderUtils.newTaskAttemptId(reducetaskId, 0); + + // move in two steps to the desired state (cannot get there directly) + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + taImpl.getInternalState()); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_TIMED_OUT)); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_CONTAINER_CLEANUP", + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + taImpl.getInternalState()); + + taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(taImpl.getID(), + reduceTAId, "Host")); + assertEquals("Task attempt is not in FAILED state", + TaskAttemptState.FAILED, + taImpl.getState()); + assertFalse("InternalError occurred", eventHandler.internalError); + } + private void initResourceTypes() { Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,