From 07321a677cc91ff4fbe03be1b84075fcbc090d7e Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Tue, 10 Apr 2012 18:59:03 +0000 Subject: [PATCH] svn merge -c 1311926 from trunk. FIXES: MAPREDUCE-4099. ApplicationMaster may fail to remove staging directory (Jason Lowe via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1311927 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 15 ++-- .../hadoop/mapreduce/v2/app/TestMRApp.java | 70 +++++++++++++++++++ 3 files changed, 81 insertions(+), 7 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 594737a4ca5..652c9f6c065 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -193,6 +193,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4117. mapred job -status throws NullPointerException (Devaraj K via bobby) + MAPREDUCE-4099. ApplicationMaster may fail to remove staging directory + (Jason Lowe via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 6fc8fab10fc..680872c9866 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -405,6 +405,14 @@ public class MRAppMaster extends CompositeService { } catch (InterruptedException e) { e.printStackTrace(); } + + // Cleanup staging directory + try { + cleanupStagingDir(); + } catch(IOException io) { + LOG.warn("Failed to delete staging dir", io); + } + try { // Stop all services // This will also send the final report to the ResourceManager @@ -415,13 +423,6 @@ public class MRAppMaster extends CompositeService { LOG.warn("Graceful stop failed ", t); } - // Cleanup staging directory - try { - cleanupStagingDir(); - } catch(IOException io) { - LOG.warn("Failed to delete staging dir"); - } - //Bring the process down by force. //Not needed after HADOOP-7140 LOG.info("Exiting MR AppMaster..GoodBye!"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index 9d1b8cc30bc..68d07a7ef2e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.spy; +import java.io.IOException; import java.util.Iterator; import junit.framework.Assert; @@ -35,11 +36,14 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.YarnException; import org.junit.Test; /** @@ -233,6 +237,71 @@ public class TestMRApp { } } + private final class MRAppTestCleanup extends MRApp { + boolean hasStopped; + boolean cleanedBeforeStopped; + + public MRAppTestCleanup(int maps, int reduces, boolean autoComplete, + String testName, boolean cleanOnStart) { + super(maps, reduces, autoComplete, testName, cleanOnStart); + hasStopped = false; + cleanedBeforeStopped = false; + } + + @Override + protected Job createJob(Configuration conf) { + UserGroupInformation currentUser = null; + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new YarnException(e); + } + Job newJob = new TestJob(getJobId(), getAttemptID(), conf, + getDispatcher().getEventHandler(), + getTaskAttemptListener(), getContext().getClock(), + getCommitter(), isNewApiCommitter(), + currentUser.getUserName(), getContext()); + ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); + + getDispatcher().register(JobFinishEvent.Type.class, + createJobFinishEventHandler()); + + return newJob; + } + + @Override + public void cleanupStagingDir() throws IOException { + cleanedBeforeStopped = !hasStopped; + } + + @Override + public synchronized void stop() { + hasStopped = true; + super.stop(); + } + + @Override + protected void sysexit() { + } + } + + @Test + public void testStagingCleanupOrder() throws Exception { + MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, + this.getClass().getName(), true); + JobImpl job = (JobImpl)app.submit(new Configuration()); + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + + int waitTime = 20 * 1000; + while (waitTime > 0 && !app.cleanedBeforeStopped) { + Thread.sleep(100); + waitTime -= 100; + } + Assert.assertTrue("Staging directory not cleaned before notifying RM", + app.cleanedBeforeStopped); + } + public static void main(String[] args) throws Exception { TestMRApp t = new TestMRApp(); t.testMapReduce(); @@ -241,5 +310,6 @@ public class TestMRApp { t.testCompletedMapsForReduceSlowstart(); t.testJobError(); t.testCountersOnJobFinish(); + t.testStagingCleanupOrder(); } }