MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory only after unregistering from the RM. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1516660 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-08-22 23:17:12 +00:00
parent 200220e8f3
commit 740f4cb97a
3 changed files with 64 additions and 22 deletions

View File

@ -237,6 +237,9 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5470. LocalJobRunner does not work on Windows. (Sandy Ryza via
cnauroth)
MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory
only after unregistering from the RM. (Jian He via vinodkv)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -326,17 +326,22 @@ public class MRAppMaster extends CompositeService {
eater);
}
if (copyHistory) {
// Now that there's a FINISHING state for application on RM to give AMs
// plenty of time to clean up after unregister it's safe to clean staging
// directory after unregistering with RM. So, we start the staging-dir
// cleaner BEFORE the ContainerAllocator so that on shut-down,
// ContainerAllocator unregisters first and then the staging-dir cleaner
// deletes staging directory.
addService(createStagingDirCleaningService());
}
// service to allocate containers from RM (if non-uber) or to fake it (uber)
containerAllocator = createContainerAllocator(null, context);
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
if (copyHistory) {
// Add the staging directory cleaner before the history server but after
// the container allocator so the staging directory is cleaned after
// the history has been flushed but before unregistering with the RM.
addService(createStagingDirCleaningService());
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
// ahead with shutdown.
@ -345,7 +350,6 @@ public class MRAppMaster extends CompositeService {
// queued inside the JobHistoryEventHandler
addIfService(historyService);
JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID,
dispatcher.getEventHandler());
addIfService(cpHist);
@ -396,6 +400,14 @@ public class MRAppMaster extends CompositeService {
dispatcher.register(Speculator.EventType.class,
speculatorEventDispatcher);
// Now that there's a FINISHING state for application on RM to give AMs
// plenty of time to clean up after unregister it's safe to clean staging
// directory after unregistering with RM. So, we start the staging-dir
// cleaner BEFORE the ContainerAllocator so that on shut-down,
// ContainerAllocator unregisters first and then the staging-dir cleaner
// deletes staging directory.
addService(createStagingDirCleaningService());
// service to allocate containers from RM (if non-uber) or to fake it (uber)
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
@ -405,11 +417,6 @@ public class MRAppMaster extends CompositeService {
addIfService(containerLauncher);
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
// Add the staging directory cleaner before the history server but after
// the container allocator so the staging directory is cleaned after
// the history has been flushed but before unregistering with the RM.
addService(createStagingDirCleaningService());
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
// ahead with shutdown.

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@ -54,6 +56,7 @@ import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -279,14 +282,17 @@ import org.junit.Test;
}
private final class MRAppTestCleanup extends MRApp {
boolean stoppedContainerAllocator;
boolean cleanedBeforeContainerAllocatorStopped;
int stagingDirCleanedup;
int ContainerAllocatorStopped;
int JobHistoryEventHandlerStopped;
int numStops;
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
stoppedContainerAllocator = false;
cleanedBeforeContainerAllocatorStopped = false;
stagingDirCleanedup = 0;
ContainerAllocatorStopped = 0;
JobHistoryEventHandlerStopped = 0;
numStops = 0;
}
@Override
@ -312,6 +318,26 @@ import org.junit.Test;
return newJob;
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new TestJobHistoryEventHandler(context, getStartCount());
}
private class TestJobHistoryEventHandler extends JobHistoryEventHandler {
public TestJobHistoryEventHandler(AppContext context, int startCount) {
super(context, startCount);
}
@Override
public void serviceStop() throws Exception {
numStops++;
JobHistoryEventHandlerStopped = numStops;
super.serviceStop();
}
}
@Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
@ -334,7 +360,8 @@ import org.junit.Test;
@Override
protected void serviceStop() throws Exception {
stoppedContainerAllocator = true;
numStops++;
ContainerAllocatorStopped = numStops;
super.serviceStop();
}
}
@ -346,7 +373,8 @@ import org.junit.Test;
@Override
public void cleanupStagingDir() throws IOException {
cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
numStops++;
stagingDirCleanedup = numStops;
}
@Override
@ -377,11 +405,15 @@ import org.junit.Test;
app.verifyCompleted();
int waitTime = 20 * 1000;
while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
while (waitTime > 0 && app.numStops < 3 ) {
Thread.sleep(100);
waitTime -= 100;
}
Assert.assertTrue("Staging directory not cleaned before notifying RM",
app.cleanedBeforeContainerAllocatorStopped);
// assert JobHistoryEventHandlerStopped first, then
// ContainerAllocatorStopped, and then stagingDirCleanedup
Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
Assert.assertEquals(2, app.ContainerAllocatorStopped);
Assert.assertEquals(3, app.stagingDirCleanedup);
}
}