MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory after the history service is stopped. (Contributed by Jason Lowe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1324866 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-04-11 17:09:54 +00:00
parent bc2340e8b5
commit cfafd8c29d
5 changed files with 131 additions and 82 deletions

View File

@ -310,6 +310,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4040. History links should use hostname rather than IP address.
(Bhallamudi Venkata Siva Kamesh via sseth)
MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory
after the history service is stopped. (Jason Lowe via sseth)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -285,6 +285,11 @@ public class MRAppMaster extends CompositeService {
addIfService(containerLauncher);
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
// Add the staging directory cleaner before the history server but after
// the container allocator so the staging directory is cleaned after
// the history has been flushed but before unregistering with the RM.
addService(createStagingDirCleaningService());
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
// ahead with shutdown.
@ -406,13 +411,6 @@ public class MRAppMaster extends CompositeService {
e.printStackTrace();
}
// Cleanup staging directory
try {
cleanupStagingDir();
} catch(IOException io) {
LOG.warn("Failed to delete staging dir", io);
}
try {
// Stop all services
// This will also send the final report to the ResourceManager
@ -512,6 +510,10 @@ public class MRAppMaster extends CompositeService {
return this.jobHistoryEventHandler;
}
protected AbstractService createStagingDirCleaningService() {
return new StagingDirCleaningService();
}
protected Speculator createSpeculator(Configuration conf, AppContext context) {
Class<? extends Speculator> speculatorClass;
@ -710,6 +712,22 @@ public class MRAppMaster extends CompositeService {
}
}
private final class StagingDirCleaningService extends AbstractService {
StagingDirCleaningService() {
super(StagingDirCleaningService.class.getName());
}
@Override
public synchronized void stop() {
try {
cleanupStagingDir();
} catch (IOException io) {
LOG.error("Failed to cleanup staging dir: ", io);
}
super.stop();
}
}
private class RunningAppContext implements AppContext {
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();

View File

@ -428,9 +428,13 @@ public class MRApp extends MRAppMaster {
@Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, final AppContext context) {
return new ContainerAllocator(){
private int containerCount;
@Override
return new MRAppContainerAllocator();
}
protected class MRAppContainerAllocator implements ContainerAllocator {
private int containerCount;
@Override
public void handle(ContainerAllocatorEvent event) {
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
@ -452,7 +456,6 @@ public class MRApp extends MRAppMaster {
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null));
}
};
}
@Override

View File

@ -18,11 +18,10 @@
package org.apache.hadoop.mapreduce.v2.app;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.Iterator;
import junit.framework.Assert;
@ -36,14 +35,11 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.junit.Test;
/**
@ -237,71 +233,6 @@ public class TestMRApp {
}
}
private final class MRAppTestCleanup extends MRApp {
boolean hasStopped;
boolean cleanedBeforeStopped;
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
hasStopped = false;
cleanedBeforeStopped = false;
}
@Override
protected Job createJob(Configuration conf) {
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new YarnException(e);
}
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName(), getContext());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
@Override
public void cleanupStagingDir() throws IOException {
cleanedBeforeStopped = !hasStopped;
}
@Override
public synchronized void stop() {
hasStopped = true;
super.stop();
}
@Override
protected void sysexit() {
}
}
@Test
public void testStagingCleanupOrder() throws Exception {
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
this.getClass().getName(), true);
JobImpl job = (JobImpl)app.submit(new Configuration());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
int waitTime = 20 * 1000;
while (waitTime > 0 && !app.cleanedBeforeStopped) {
Thread.sleep(100);
waitTime -= 100;
}
Assert.assertTrue("Staging directory not cleaned before notifying RM",
app.cleanedBeforeStopped);
}
public static void main(String[] args) throws Exception {
TestMRApp t = new TestMRApp();
t.testMapReduce();
@ -310,6 +241,5 @@ public class TestMRApp {
t.testCompletedMapsForReduceSlowstart();
t.testJobError();
t.testCountersOnJobFinish();
t.testStagingCleanupOrder();
}
}

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
@ -35,12 +36,21 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
@ -103,4 +113,89 @@ import org.junit.Test;
}
}
private final class MRAppTestCleanup extends MRApp {
boolean stoppedContainerAllocator;
boolean cleanedBeforeContainerAllocatorStopped;
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
stoppedContainerAllocator = false;
cleanedBeforeContainerAllocatorStopped = false;
}
@Override
protected Job createJob(Configuration conf) {
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new YarnException(e);
}
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName(), getContext());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
@Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
return new TestCleanupContainerAllocator();
}
private class TestCleanupContainerAllocator extends AbstractService
implements ContainerAllocator {
private MRAppContainerAllocator allocator;
TestCleanupContainerAllocator() {
super(TestCleanupContainerAllocator.class.getName());
allocator = new MRAppContainerAllocator();
}
@Override
public void handle(ContainerAllocatorEvent event) {
allocator.handle(event);
}
@Override
public synchronized void stop() {
stoppedContainerAllocator = true;
super.stop();
}
}
@Override
public void cleanupStagingDir() throws IOException {
cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
}
@Override
protected void sysexit() {
}
}
@Test
public void testStagingCleanupOrder() throws Exception {
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
this.getClass().getName(), true);
JobImpl job = (JobImpl)app.submit(new Configuration());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
int waitTime = 20 * 1000;
while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
Thread.sleep(100);
waitTime -= 100;
}
Assert.assertTrue("Staging directory not cleaned before notifying RM",
app.cleanedBeforeContainerAllocatorStopped);
}
}