MAPREDUCE-4099. ApplicationMaster may fail to remove staging directory (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1311926 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-04-10 18:57:34 +00:00
parent cbb5f61090
commit 793746870b
3 changed files with 81 additions and 7 deletions

View File

@ -292,6 +292,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4117. mapred job -status throws NullPointerException (Devaraj K
via bobby)
MAPREDUCE-4099. ApplicationMaster may fail to remove staging directory
(Jason Lowe via bobby)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -405,6 +405,14 @@ public void handle(JobFinishEvent event) {
} catch (InterruptedException e) {
e.printStackTrace();
}
// Cleanup staging directory
try {
cleanupStagingDir();
} catch(IOException io) {
LOG.warn("Failed to delete staging dir", io);
}
try {
// Stop all services
// This will also send the final report to the ResourceManager
@ -415,13 +423,6 @@ public void handle(JobFinishEvent event) {
LOG.warn("Graceful stop failed ", t);
}
// Cleanup staging directory
try {
cleanupStagingDir();
} catch(IOException io) {
LOG.warn("Failed to delete staging dir");
}
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");

View File

@ -22,6 +22,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.Iterator;
import junit.framework.Assert;
@ -35,11 +36,14 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.junit.Test;
/**
@ -233,6 +237,71 @@ public void checkTaskStateTypeConversion() {
}
}
private final class MRAppTestCleanup extends MRApp {
boolean hasStopped;
boolean cleanedBeforeStopped;
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
hasStopped = false;
cleanedBeforeStopped = false;
}
@Override
protected Job createJob(Configuration conf) {
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new YarnException(e);
}
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName(), getContext());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
@Override
public void cleanupStagingDir() throws IOException {
cleanedBeforeStopped = !hasStopped;
}
@Override
public synchronized void stop() {
hasStopped = true;
super.stop();
}
@Override
protected void sysexit() {
}
}
@Test
public void testStagingCleanupOrder() throws Exception {
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
this.getClass().getName(), true);
JobImpl job = (JobImpl)app.submit(new Configuration());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
int waitTime = 20 * 1000;
while (waitTime > 0 && !app.cleanedBeforeStopped) {
Thread.sleep(100);
waitTime -= 100;
}
Assert.assertTrue("Staging directory not cleaned before notifying RM",
app.cleanedBeforeStopped);
}
public static void main(String[] args) throws Exception {
TestMRApp t = new TestMRApp();
t.testMapReduce();
@ -241,5 +310,6 @@ public static void main(String[] args) throws Exception {
t.testCompletedMapsForReduceSlowstart();
t.testJobError();
t.testCountersOnJobFinish();
t.testStagingCleanupOrder();
}
}