From 740f4cb97a4d5ec498f6e91d91ee7e75ad1c52c2 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 22 Aug 2013 23:17:12 +0000 Subject: [PATCH] MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory only after unregistering from the RM. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1516660 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 31 ++++++----- .../mapreduce/v2/app/TestStagingCleanup.java | 52 +++++++++++++++---- 3 files changed, 64 insertions(+), 22 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 85650bd504a..6de12d2e9c1 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -237,6 +237,9 @@ Release 2.1.1-beta - UNRELEASED MAPREDUCE-5470. LocalJobRunner does not work on Windows. (Sandy Ryza via cnauroth) + MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory + only after unregistering from the RM. (Jian He via vinodkv) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index ab1c4feadf0..e6df1fcad38 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -325,18 +325,23 @@ public class MRAppMaster extends CompositeService { dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, eater); } - + + if (copyHistory) { + // Now that there's a FINISHING state for application on RM to give AMs + // plenty of time to clean up after unregister it's safe to clean staging + // directory after unregistering with RM. So, we start the staging-dir + // cleaner BEFORE the ContainerAllocator so that on shut-down, + // ContainerAllocator unregisters first and then the staging-dir cleaner + // deletes staging directory. + addService(createStagingDirCleaningService()); + } + // service to allocate containers from RM (if non-uber) or to fake it (uber) containerAllocator = createContainerAllocator(null, context); addIfService(containerAllocator); dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); if (copyHistory) { - // Add the staging directory cleaner before the history server but after - // the container allocator so the staging directory is cleaned after - // the history has been flushed but before unregistering with the RM. - addService(createStagingDirCleaningService()); - // Add the JobHistoryEventHandler last so that it is properly stopped first. // This will guarantee that all history-events are flushed before AM goes // ahead with shutdown. @@ -344,7 +349,6 @@ public class MRAppMaster extends CompositeService { // component creates a JobHistoryEvent in the meanwhile, it will be just be // queued inside the JobHistoryEventHandler addIfService(historyService); - JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID, dispatcher.getEventHandler()); @@ -396,6 +400,14 @@ public class MRAppMaster extends CompositeService { dispatcher.register(Speculator.EventType.class, speculatorEventDispatcher); + // Now that there's a FINISHING state for application on RM to give AMs + // plenty of time to clean up after unregister it's safe to clean staging + // directory after unregistering with RM. So, we start the staging-dir + // cleaner BEFORE the ContainerAllocator so that on shut-down, + // ContainerAllocator unregisters first and then the staging-dir cleaner + // deletes staging directory. + addService(createStagingDirCleaningService()); + // service to allocate containers from RM (if non-uber) or to fake it (uber) addIfService(containerAllocator); dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); @@ -405,11 +417,6 @@ public class MRAppMaster extends CompositeService { addIfService(containerLauncher); dispatcher.register(ContainerLauncher.EventType.class, containerLauncher); - // Add the staging directory cleaner before the history server but after - // the container allocator so the staging directory is cleaned after - // the history has been flushed but before unregistering with the RM. - addService(createStagingDirCleaningService()); - // Add the JobHistoryEventHandler last so that it is properly stopped first. // This will guarantee that all history-events are flushed before AM goes // ahead with shutdown. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index 46aa8984b90..a0c0cb6c35f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -36,6 +36,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; @@ -54,6 +56,7 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -279,14 +282,17 @@ import org.junit.Test; } private final class MRAppTestCleanup extends MRApp { - boolean stoppedContainerAllocator; - boolean cleanedBeforeContainerAllocatorStopped; - + int stagingDirCleanedup; + int ContainerAllocatorStopped; + int JobHistoryEventHandlerStopped; + int numStops; public MRAppTestCleanup(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) { super(maps, reduces, autoComplete, testName, cleanOnStart); - stoppedContainerAllocator = false; - cleanedBeforeContainerAllocatorStopped = false; + stagingDirCleanedup = 0; + ContainerAllocatorStopped = 0; + JobHistoryEventHandlerStopped = 0; + numStops = 0; } @Override @@ -312,6 +318,26 @@ import org.junit.Test; return newJob; } + @Override + protected EventHandler createJobHistoryHandler( + AppContext context) { + return new TestJobHistoryEventHandler(context, getStartCount()); + } + + private class TestJobHistoryEventHandler extends JobHistoryEventHandler { + + public TestJobHistoryEventHandler(AppContext context, int startCount) { + super(context, startCount); + } + + @Override + public void serviceStop() throws Exception { + numStops++; + JobHistoryEventHandlerStopped = numStops; + super.serviceStop(); + } + } + @Override protected ContainerAllocator createContainerAllocator( ClientService clientService, AppContext context) { @@ -334,7 +360,8 @@ import org.junit.Test; @Override protected void serviceStop() throws Exception { - stoppedContainerAllocator = true; + numStops++; + ContainerAllocatorStopped = numStops; super.serviceStop(); } } @@ -346,7 +373,8 @@ import org.junit.Test; @Override public void cleanupStagingDir() throws IOException { - cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator; + numStops++; + stagingDirCleanedup = numStops; } @Override @@ -377,11 +405,15 @@ import org.junit.Test; app.verifyCompleted(); int waitTime = 20 * 1000; - while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) { + while (waitTime > 0 && app.numStops < 3 ) { Thread.sleep(100); waitTime -= 100; } - Assert.assertTrue("Staging directory not cleaned before notifying RM", - app.cleanedBeforeContainerAllocatorStopped); + + // assert JobHistoryEventHandlerStopped first, then + // ContainerAllocatorStopped, and then stagingDirCleanedup + Assert.assertEquals(1, app.JobHistoryEventHandlerStopped); + Assert.assertEquals(2, app.ContainerAllocatorStopped); + Assert.assertEquals(3, app.stagingDirCleanedup); } }