Merge r1603338 from trunk. MAPREDUCE-5924. Changed TaskAttemptImpl to ignore TA_COMMIT_PENDING event at COMMIT_PENDING state. Contributed by Zhijie Shen

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1603339 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-06-18 00:53:23 +00:00
parent 97d57ff1f4
commit d3462755bb
3 changed files with 21 additions and 0 deletions

View File

@ -119,6 +119,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5920. Add Xattr option in DistCp docs. (Yi Liu via cnauroth) MAPREDUCE-5920. Add Xattr option in DistCp docs. (Yi Liu via cnauroth)
MAPREDUCE-5924. Changed TaskAttemptImpl to ignore TA_COMMIT_PENDING event
at COMMIT_PENDING state. (Zhijie Shen via jianhe)
Release 2.4.1 - 2014-06-23 Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -332,6 +332,15 @@ public abstract class TaskAttemptImpl implements
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
// AM is likely to receive duplicate TA_COMMIT_PENDINGs as the task attempt
// will re-send the commit message until it doesn't encounter any
// IOException and succeeds in delivering the commit message.
// Ignoring the duplicate commit message is a short-term fix. In long term,
// we need to make use of retry cache to help this and other MR protocol
// APIs that can be considered as @AtMostOnce.
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_COMMIT_PENDING)
// Transitions from SUCCESS_CONTAINER_CLEANUP state // Transitions from SUCCESS_CONTAINER_CLEANUP state
// kill and cleanup the container // kill and cleanup the container

View File

@ -112,6 +112,15 @@ public class TestMRApp {
//wait for first attempt to commit pending //wait for first attempt to commit pending
app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING); app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING);
//re-send the commit pending signal to the task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
attempt.getID(),
TaskAttemptEventType.TA_COMMIT_PENDING));
//the task attempt should be still at COMMIT_PENDING
app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING);
//send the done signal to the task //send the done signal to the task
app.getContext().getEventHandler().handle( app.getContext().getEventHandler().handle(
new TaskAttemptEvent( new TaskAttemptEvent(