svn merge -c 1311926 from trunk. FIXES: MAPREDUCE-4099. ApplicationMaster may fail to remove staging directory (Jason Lowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1311927 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ce9bdceac7
commit
07321a677c
|
@ -193,6 +193,9 @@ Release 0.23.3 - UNRELEASED
|
|||
MAPREDUCE-4117. mapred job -status throws NullPointerException (Devaraj K
|
||||
via bobby)
|
||||
|
||||
MAPREDUCE-4099. ApplicationMaster may fail to remove staging directory
|
||||
(Jason Lowe via bobby)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -405,6 +405,14 @@ public class MRAppMaster extends CompositeService {
|
|||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
// Cleanup staging directory
|
||||
try {
|
||||
cleanupStagingDir();
|
||||
} catch(IOException io) {
|
||||
LOG.warn("Failed to delete staging dir", io);
|
||||
}
|
||||
|
||||
try {
|
||||
// Stop all services
|
||||
// This will also send the final report to the ResourceManager
|
||||
|
@ -415,13 +423,6 @@ public class MRAppMaster extends CompositeService {
|
|||
LOG.warn("Graceful stop failed ", t);
|
||||
}
|
||||
|
||||
// Cleanup staging directory
|
||||
try {
|
||||
cleanupStagingDir();
|
||||
} catch(IOException io) {
|
||||
LOG.warn("Failed to delete staging dir");
|
||||
}
|
||||
|
||||
//Bring the process down by force.
|
||||
//Not needed after HADOOP-7140
|
||||
LOG.info("Exiting MR AppMaster..GoodBye!");
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.mockito.Mockito.times;
|
|||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
@ -35,11 +36,14 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -233,6 +237,71 @@ public class TestMRApp {
|
|||
}
|
||||
}
|
||||
|
||||
private final class MRAppTestCleanup extends MRApp {
|
||||
boolean hasStopped;
|
||||
boolean cleanedBeforeStopped;
|
||||
|
||||
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
|
||||
String testName, boolean cleanOnStart) {
|
||||
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
||||
hasStopped = false;
|
||||
cleanedBeforeStopped = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Job createJob(Configuration conf) {
|
||||
UserGroupInformation currentUser = null;
|
||||
try {
|
||||
currentUser = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException e) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
|
||||
getDispatcher().getEventHandler(),
|
||||
getTaskAttemptListener(), getContext().getClock(),
|
||||
getCommitter(), isNewApiCommitter(),
|
||||
currentUser.getUserName(), getContext());
|
||||
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
|
||||
|
||||
getDispatcher().register(JobFinishEvent.Type.class,
|
||||
createJobFinishEventHandler());
|
||||
|
||||
return newJob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanupStagingDir() throws IOException {
|
||||
cleanedBeforeStopped = !hasStopped;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
hasStopped = true;
|
||||
super.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sysexit() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStagingCleanupOrder() throws Exception {
|
||||
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
|
||||
this.getClass().getName(), true);
|
||||
JobImpl job = (JobImpl)app.submit(new Configuration());
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
app.verifyCompleted();
|
||||
|
||||
int waitTime = 20 * 1000;
|
||||
while (waitTime > 0 && !app.cleanedBeforeStopped) {
|
||||
Thread.sleep(100);
|
||||
waitTime -= 100;
|
||||
}
|
||||
Assert.assertTrue("Staging directory not cleaned before notifying RM",
|
||||
app.cleanedBeforeStopped);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
TestMRApp t = new TestMRApp();
|
||||
t.testMapReduce();
|
||||
|
@ -241,5 +310,6 @@ public class TestMRApp {
|
|||
t.testCompletedMapsForReduceSlowstart();
|
||||
t.testJobError();
|
||||
t.testCountersOnJobFinish();
|
||||
t.testStagingCleanupOrder();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue