From 44ea6e3ad5656d6edbaa8f3a510ab800e42694e3 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Tue, 16 Jul 2013 15:05:16 +0000 Subject: [PATCH] Back out revision 1503499 for MAPREDUCE-5317. Stale files left behind for failed jobs git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1503744 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 6 - .../v2/app/job/JobStateInternal.java | 1 - .../v2/app/job/event/JobEventType.java | 1 - .../mapreduce/v2/app/job/impl/JobImpl.java | 129 ++---------------- .../v2/app/job/impl/TestJobImpl.java | 35 ----- 5 files changed, 8 insertions(+), 164 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d1c0593c27f..bb634cd49a4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -158,9 +158,6 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5358. MRAppMaster throws invalid transitions for JobImpl (Devaraj K via jlowe) - MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via - jlowe) - Release 2.2.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -1223,9 +1220,6 @@ Release 0.23.10 - UNRELEASED MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the input path dir (Devaraj K via jlowe) - MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via - jlowe) - Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java index aec91a3828d..bdb627b2354 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java @@ -25,7 +25,6 @@ public enum JobStateInternal { RUNNING, COMMITTING, SUCCEEDED, - FAIL_WAIT, FAIL_ABORT, FAILED, KILL_WAIT, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java index ab4d7f5d11c..f6c38d30bc3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java @@ -44,7 +44,6 @@ public enum JobEventType { //Producer:Job JOB_COMPLETED, - JOB_FAIL_WAIT_TIMEDOUT, //Producer:Any component JOB_DIAGNOSTIC_UPDATE, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index c6eb8bcea9d..2c33650de41 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -30,9 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -316,8 +313,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, .addTransition (JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.RUNNING, - JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT, - JobStateInternal.FAIL_ABORT), + JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT), JobEventType.JOB_TASK_COMPLETED, new TaskCompletedTransition()) .addTransition @@ -428,37 +424,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobEventType.JOB_TASK_ATTEMPT_COMPLETED, JobEventType.JOB_MAP_TASK_RESCHEDULED)) - // Transitions from FAIL_WAIT state - .addTransition(JobStateInternal.FAIL_WAIT, - JobStateInternal.FAIL_WAIT, - JobEventType.JOB_DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(JobStateInternal.FAIL_WAIT, - JobStateInternal.FAIL_WAIT, - JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) - .addTransition(JobStateInternal.FAIL_WAIT, - EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT), - JobEventType.JOB_TASK_COMPLETED, - new JobFailWaitTransition()) - .addTransition(JobStateInternal.FAIL_WAIT, - JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT, - new JobFailWaitTimedOutTransition()) - .addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED, - JobEventType.JOB_KILL, - new KilledDuringAbortTransition()) - .addTransition(JobStateInternal.FAIL_WAIT, - JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - // Ignore-able events - .addTransition(JobStateInternal.FAIL_WAIT, - JobStateInternal.FAIL_WAIT, - EnumSet.of(JobEventType.JOB_UPDATED_NODES, - JobEventType.JOB_TASK_ATTEMPT_COMPLETED, - JobEventType.JOB_MAP_TASK_RESCHEDULED, - JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, - JobEventType.JOB_AM_REBOOT)) - - //Transitions from FAIL_ABORT state + // Transitions from FAIL_ABORT state .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAIL_ABORT, JobEventType.JOB_DIAGNOSTIC_UPDATE, @@ -485,8 +451,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_FAILED, - JobEventType.JOB_AM_REBOOT, - JobEventType.JOB_FAIL_WAIT_TIMEDOUT)) + JobEventType.JOB_AM_REBOOT)) // Transitions from KILL_ABORT state .addTransition(JobStateInternal.KILL_ABORT, @@ -637,10 +602,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private JobStateInternal forcedState = null; - //Executor used for running future tasks. Setting thread pool size to 1 - private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - private ScheduledFuture failWaitTriggerScheduledFuture; - public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, @@ -1001,7 +962,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, case SETUP: case COMMITTING: return JobState.RUNNING; - case FAIL_WAIT: case FAIL_ABORT: return JobState.FAILED; case REBOOT: @@ -1605,43 +1565,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.unsuccessfulFinish(finalState); } } - - //This transition happens when a job is to be failed. It waits for all the - //tasks to finish / be killed. - private static class JobFailWaitTransition - implements MultipleArcTransition { - @Override - public JobStateInternal transition(JobImpl job, JobEvent event) { - if(!job.failWaitTriggerScheduledFuture.isCancelled()) { - for(Task task: job.tasks.values()) { - if(!task.isFinished()) { - return JobStateInternal.FAIL_WAIT; - } - } - } - //Finished waiting. All tasks finished / were killed - job.failWaitTriggerScheduledFuture.cancel(false); - job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, - job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); - return JobStateInternal.FAIL_ABORT; - } - } - - //This transition happens when a job to be failed times out while waiting on - //tasks that had been sent the KILL signal. It is triggered by a - //ScheduledFuture task queued in the executor. - private static class JobFailWaitTimedOutTransition - implements SingleArcTransition { - @Override - public void transition(JobImpl job, JobEvent event) { - LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed." - + " Going to fail job anyway"); - job.failWaitTriggerScheduledFuture.cancel(false); - job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, - job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); - } - } - + // JobFinishedEvent triggers the move of the history file out of the staging // area. May need to create a new event type for this if JobFinished should // not be generated for KilledJobs, etc. @@ -1874,23 +1798,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, return checkJobAfterTaskCompletion(job); } - //This class is used to queue a ScheduledFuture to send an event to a job - //after some delay. This can be used to wait for maximum amount of time - //before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for - //all tasks to be killed. - static class TriggerScheduledFuture implements Runnable { - JobEvent toSend; - JobImpl job; - TriggerScheduledFuture(JobImpl job, JobEvent toSend) { - this.toSend = toSend; - this.job = job; - } - public void run() { - LOG.info("Sending event " + toSend + " to " + job.getID()); - job.getEventHandler().handle(toSend); - } - } - protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { //check for Job failure if (job.failedMapTaskCount*100 > @@ -1904,30 +1811,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, " failedReduces:" + job.failedReduceTaskCount; LOG.info(diagnosticMsg); job.addDiagnostic(diagnosticMsg); - - //Send kill signal to all unfinished tasks here. - boolean allDone = true; - for (Task task : job.tasks.values()) { - if(!task.isFinished()) { - allDone = false; - job.eventHandler.handle( - new TaskEvent(task.getID(), TaskEventType.T_KILL)); - } - } - - //If all tasks are already done, we should go directly to FAIL_ABORT - if(allDone) { - return JobStateInternal.FAIL_ABORT; - } - - //Set max timeout to wait for the tasks to get killed - job.failWaitTriggerScheduledFuture = job.executor.schedule( - new TriggerScheduledFuture(job, new JobEvent(job.getID(), - JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt( - MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, - MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS), - TimeUnit.MILLISECONDS); - return JobStateInternal.FAIL_WAIT; + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, + org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + return JobStateInternal.FAIL_ABORT; } return job.checkReadyForCommit(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 0048b733ca5..6da75a91bbf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -85,7 +84,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; /** @@ -334,39 +332,6 @@ public class TestJobImpl { commitHandler.stop(); } - @Test - public void testAbortJobCalledAfterKillingTasks() throws IOException, - InterruptedException { - Configuration conf = new Configuration(); - conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); - conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000"); - InlineDispatcher dispatcher = new InlineDispatcher(); - dispatcher.init(conf); - dispatcher.start(); - OutputCommitter committer = Mockito.mock(OutputCommitter.class); - CommitterEventHandler commitHandler = - createCommitterEventHandler(dispatcher, committer); - commitHandler.init(conf); - commitHandler.start(); - JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); - - //Fail one task. This should land the JobImpl in the FAIL_WAIT state - job.handle(new JobTaskEvent( - MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), - TaskState.FAILED)); - //Verify abort job hasn't been called - Mockito.verify(committer, Mockito.never()) - .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); - assertJobState(job, JobStateInternal.FAIL_WAIT); - - //Verify abortJob is called once and the job failed - Mockito.verify(committer, Mockito.timeout(2000).times(1)) - .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); - assertJobState(job, JobStateInternal.FAILED); - - dispatcher.stop(); - } - @Test(timeout=20000) public void testKilledDuringFailAbort() throws Exception { Configuration conf = new Configuration();