From 293d42edf0889939547c202ce825b7c927e2bb2d Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Mon, 15 Jul 2013 22:30:39 +0000 Subject: [PATCH] svn merge -c 1503499 FIXES: MAPREDUCE-5317. Stale files left behind for failed jobs. Contributed by Ravi Prakash git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1503505 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 6 + .../v2/app/job/JobStateInternal.java | 1 + .../v2/app/job/event/JobEventType.java | 1 + .../mapreduce/v2/app/job/impl/JobImpl.java | 129 ++++++++++++++++-- .../v2/app/job/impl/TestJobImpl.java | 35 +++++ 5 files changed, 164 insertions(+), 8 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 48f15b6b7be..e2b3e27d467 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -21,6 +21,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5358. MRAppMaster throws invalid transitions for JobImpl (Devaraj K via jlowe) + MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via + jlowe) + Release 2.2.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -1095,6 +1098,9 @@ Release 0.23.10 - UNRELEASED MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the input path dir (Devaraj K via jlowe) + MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via + jlowe) + Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java index bdb627b2354..aec91a3828d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java @@ -25,6 +25,7 @@ public enum JobStateInternal { RUNNING, COMMITTING, SUCCEEDED, + FAIL_WAIT, FAIL_ABORT, FAILED, KILL_WAIT, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java index f6c38d30bc3..ab4d7f5d11c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java @@ -44,6 +44,7 @@ public enum JobEventType { //Producer:Job JOB_COMPLETED, + JOB_FAIL_WAIT_TIMEDOUT, //Producer:Any component JOB_DIAGNOSTIC_UPDATE, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 2c33650de41..c6eb8bcea9d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -30,6 +30,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -313,7 +316,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, .addTransition (JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.RUNNING, - JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT), + JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_ABORT), JobEventType.JOB_TASK_COMPLETED, new TaskCompletedTransition()) .addTransition @@ -424,7 +428,37 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobEventType.JOB_TASK_ATTEMPT_COMPLETED, JobEventType.JOB_MAP_TASK_RESCHEDULED)) - // Transitions from FAIL_ABORT state + // Transitions from FAIL_WAIT state + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_WAIT, + JobEventType.JOB_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_WAIT, + JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) + .addTransition(JobStateInternal.FAIL_WAIT, + EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT), + JobEventType.JOB_TASK_COMPLETED, + new JobFailWaitTransition()) + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT, + new JobFailWaitTimedOutTransition()) + .addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED, + JobEventType.JOB_KILL, + new KilledDuringAbortTransition()) + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able events + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_WAIT, + EnumSet.of(JobEventType.JOB_UPDATED_NODES, + JobEventType.JOB_TASK_ATTEMPT_COMPLETED, + JobEventType.JOB_MAP_TASK_RESCHEDULED, + JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, + JobEventType.JOB_AM_REBOOT)) + + //Transitions from FAIL_ABORT state .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAIL_ABORT, JobEventType.JOB_DIAGNOSTIC_UPDATE, @@ -451,7 +485,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_FAILED, - JobEventType.JOB_AM_REBOOT)) + JobEventType.JOB_AM_REBOOT, + JobEventType.JOB_FAIL_WAIT_TIMEDOUT)) // Transitions from KILL_ABORT state .addTransition(JobStateInternal.KILL_ABORT, @@ -602,6 +637,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private JobStateInternal forcedState = null; + //Executor used for running future tasks. Setting thread pool size to 1 + private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + private ScheduledFuture failWaitTriggerScheduledFuture; + public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, @@ -962,6 +1001,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, case SETUP: case COMMITTING: return JobState.RUNNING; + case FAIL_WAIT: case FAIL_ABORT: return JobState.FAILED; case REBOOT: @@ -1565,7 +1605,43 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.unsuccessfulFinish(finalState); } } - + + //This transition happens when a job is to be failed. It waits for all the + //tasks to finish / be killed. + private static class JobFailWaitTransition + implements MultipleArcTransition { + @Override + public JobStateInternal transition(JobImpl job, JobEvent event) { + if(!job.failWaitTriggerScheduledFuture.isCancelled()) { + for(Task task: job.tasks.values()) { + if(!task.isFinished()) { + return JobStateInternal.FAIL_WAIT; + } + } + } + //Finished waiting. All tasks finished / were killed + job.failWaitTriggerScheduledFuture.cancel(false); + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + return JobStateInternal.FAIL_ABORT; + } + } + + //This transition happens when a job to be failed times out while waiting on + //tasks that had been sent the KILL signal. It is triggered by a + //ScheduledFuture task queued in the executor. + private static class JobFailWaitTimedOutTransition + implements SingleArcTransition { + @Override + public void transition(JobImpl job, JobEvent event) { + LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed." + + " Going to fail job anyway"); + job.failWaitTriggerScheduledFuture.cancel(false); + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + } + } + // JobFinishedEvent triggers the move of the history file out of the staging // area. May need to create a new event type for this if JobFinished should // not be generated for KilledJobs, etc. @@ -1798,6 +1874,23 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, return checkJobAfterTaskCompletion(job); } + //This class is used to queue a ScheduledFuture to send an event to a job + //after some delay. This can be used to wait for maximum amount of time + //before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for + //all tasks to be killed. + static class TriggerScheduledFuture implements Runnable { + JobEvent toSend; + JobImpl job; + TriggerScheduledFuture(JobImpl job, JobEvent toSend) { + this.toSend = toSend; + this.job = job; + } + public void run() { + LOG.info("Sending event " + toSend + " to " + job.getID()); + job.getEventHandler().handle(toSend); + } + } + protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { //check for Job failure if (job.failedMapTaskCount*100 > @@ -1811,10 +1904,30 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, " failedReduces:" + job.failedReduceTaskCount; LOG.info(diagnosticMsg); job.addDiagnostic(diagnosticMsg); - job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, - job.jobContext, - org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); - return JobStateInternal.FAIL_ABORT; + + //Send kill signal to all unfinished tasks here. + boolean allDone = true; + for (Task task : job.tasks.values()) { + if(!task.isFinished()) { + allDone = false; + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); + } + } + + //If all tasks are already done, we should go directly to FAIL_ABORT + if(allDone) { + return JobStateInternal.FAIL_ABORT; + } + + //Set max timeout to wait for the tasks to get killed + job.failWaitTriggerScheduledFuture = job.executor.schedule( + new TriggerScheduledFuture(job, new JobEvent(job.getID(), + JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt( + MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, + MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS), + TimeUnit.MILLISECONDS); + return JobStateInternal.FAIL_WAIT; } return job.checkReadyForCommit(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 6da75a91bbf..0048b733ca5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -84,6 +85,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; /** @@ -332,6 +334,39 @@ public class TestJobImpl { commitHandler.stop(); } + @Test + public void testAbortJobCalledAfterKillingTasks() throws IOException, + InterruptedException { + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000"); + InlineDispatcher dispatcher = new InlineDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = Mockito.mock(OutputCommitter.class); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + + //Fail one task. This should land the JobImpl in the FAIL_WAIT state + job.handle(new JobTaskEvent( + MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), + TaskState.FAILED)); + //Verify abort job hasn't been called + Mockito.verify(committer, Mockito.never()) + .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); + assertJobState(job, JobStateInternal.FAIL_WAIT); + + //Verify abortJob is called once and the job failed + Mockito.verify(committer, Mockito.timeout(2000).times(1)) + .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); + assertJobState(job, JobStateInternal.FAILED); + + dispatcher.stop(); + } + @Test(timeout=20000) public void testKilledDuringFailAbort() throws Exception { Configuration conf = new Configuration();