From 25e96e455b3473387df865fbc1c3ad7ebf9ff1e4 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 31 Aug 2012 20:43:46 +0000 Subject: [PATCH] MAPREDUCE-4611. MR AM dies badly when Node is decommissioned (Robert Evans via tgraves) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1379599 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../jobhistory/JobHistoryEventHandler.java | 14 +-- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 38 ++++++- .../mapreduce/v2/app/rm/RMCommunicator.java | 14 ++- .../TestJobHistoryEventHandler.java | 2 +- .../mapreduce/v2/app/TestStagingCleanup.java | 106 ++++++++++++++---- 6 files changed, 143 insertions(+), 34 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e4593d0e9dc..83273845610 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -858,6 +858,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4614. Simplify debugging a job's tokens (daryn via bobby) + MAPREDUCE-4611. MR AM dies badly when Node is decommissioned (Robert + Evans via tgraves) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index d94f074af99..7ba3ccd03a3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -99,8 +99,8 @@ public class JobHistoryEventHandler extends AbstractService protected static final Map fileMap = Collections.synchronizedMap(new HashMap()); - // Has a signal (SIGTERM etc) been issued? - protected volatile boolean isSignalled = false; + // should job completion be force when the AM shuts down? + protected volatile boolean forceJobCompletion = false; public JobHistoryEventHandler(AppContext context, int startCount) { super("JobHistoryEventHandler"); @@ -322,7 +322,7 @@ public void stop() { // Process JobUnsuccessfulCompletionEvent for jobIds which still haven't // closed their event writers Iterator jobIt = fileMap.keySet().iterator(); - if(isSignalled) { + if(forceJobCompletion) { while (jobIt.hasNext()) { JobId toClose = jobIt.next(); MetaInfo mi = fileMap.get(toClose); @@ -911,9 +911,9 @@ private String getFileNameFromTmpFN(String tmpFileName) { return tmpFileName.substring(0, tmpFileName.length()-4); } - public void setSignalled(boolean isSignalled) { - this.isSignalled = isSignalled; - LOG.info("JobHistoryEventHandler notified that isSignalled was " - + isSignalled); + public void setForcejobCompletion(boolean forceJobCompletion) { + this.forceJobCompletion = forceJobCompletion; + LOG.info("JobHistoryEventHandler notified that forceJobCompletion is " + + forceJobCompletion); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index d80653767e4..64d8bb82253 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -170,6 +170,8 @@ public class MRAppMaster extends CompositeService { private Credentials fsTokens = new Credentials(); // Filled during init private UserGroupInformation currentUser; // Will be setup during init + private volatile boolean isLastAMRetry = false; + public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, long appSubmitTime) { @@ -195,11 +197,21 @@ public MRAppMaster(ApplicationAttemptId applicationAttemptId, @Override public void init(final Configuration conf) { - conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); downloadTokensAndSetupUGI(conf); - + + //TODO this is a hack, we really need the RM to inform us when we + // are the last one. This would allow us to configure retries on + // a per application basis. + int numAMRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, + YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES); + isLastAMRetry = appAttemptID.getAttemptId() >= numAMRetries; + LOG.info("AM Retries: " + numAMRetries + + " attempt num: " + appAttemptID.getAttemptId() + + " is last retry: " + isLastAMRetry); + + context = new RunningAppContext(conf); // Job name is the same as the app name util we support DAG of jobs @@ -417,6 +429,8 @@ public void handle(JobFinishEvent event) { } try { + //We are finishing cleanly so this is the last retry + isLastAMRetry = true; // Stop all services // This will also send the final report to the ResourceManager LOG.info("Calling stop for all the services"); @@ -666,7 +680,11 @@ public void handle(ContainerAllocatorEvent event) { } public void setSignalled(boolean isSignalled) { - ((RMCommunicator) containerAllocator).setSignalled(true); + ((RMCommunicator) containerAllocator).setSignalled(isSignalled); + } + + public void setShouldUnregister(boolean shouldUnregister) { + ((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister); } } @@ -717,7 +735,12 @@ private final class StagingDirCleaningService extends AbstractService { @Override public synchronized void stop() { try { - cleanupStagingDir(); + if(isLastAMRetry) { + cleanupStagingDir(); + } else { + LOG.info("Skipping cleaning up the staging dir. " + + "assuming AM will be retried."); + } } catch (IOException io) { LOG.error("Failed to cleanup staging dir: ", io); } @@ -1016,14 +1039,19 @@ static class MRAppMasterShutdownHook implements Runnable { public void run() { LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and " + "JobHistoryEventHandler."); + // Notify the JHEH and RMCommunicator that a SIGTERM has been received so // that they don't take too long in shutting down if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) { ((ContainerAllocatorRouter) appMaster.containerAllocator) .setSignalled(true); + ((ContainerAllocatorRouter) appMaster.containerAllocator) + .setShouldUnregister(appMaster.isLastAMRetry); } + if(appMaster.jobHistoryEventHandler != null) { - appMaster.jobHistoryEventHandler.setSignalled(true); + appMaster.jobHistoryEventHandler + .setForcejobCompletion(appMaster.isLastAMRetry); } appMaster.stop(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index b0471e68ca0..e587ba852e6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -84,6 +84,7 @@ public abstract class RMCommunicator extends AbstractService { private Job job; // Has a signal (SIGTERM etc) been issued? protected volatile boolean isSignalled = false; + private volatile boolean shouldUnregister = true; public RMCommunicator(ClientService clientService, AppContext context) { super("RMCommunicator"); @@ -213,7 +214,9 @@ public void stop() { } catch (InterruptedException ie) { LOG.warn("InterruptedException while stopping", ie); } - unregister(); + if(shouldUnregister) { + unregister(); + } super.stop(); } @@ -288,8 +291,15 @@ public AMRMProtocol run() { protected abstract void heartbeat() throws Exception; + public void setShouldUnregister(boolean shouldUnregister) { + this.shouldUnregister = shouldUnregister; + LOG.info("RMCommunicator notified that shouldUnregistered is: " + + shouldUnregister); + } + public void setSignalled(boolean isSignalled) { this.isSignalled = isSignalled; - LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled); + LOG.info("RMCommunicator notified that iSignalled is: " + + isSignalled); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index c1c227064eb..4f86f91432a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -330,7 +330,7 @@ public void testSigTermedFunctionality() throws IOException { Mockito.when(jobId.getAppId()).thenReturn(mockAppId); jheh.addToFileMap(jobId); - jheh.setSignalled(true); + jheh.setForcejobCompletion(true); for(int i=0; i < numEvents; ++i) { events[i] = getEventToEnqueue(jobId); jheh.handle(events[i]); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index 063fcfa2cfd..67c9cf5b539 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import java.io.IOException; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -89,28 +91,94 @@ public void testDeletionofStaging() throws IOException { handler.handle(new JobFinishEvent(jobid)); verify(fs).delete(stagingJobPath, true); } + + @Test + public void testDeletionofStagingOnKill() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + ApplicationAttemptId attemptId = recordFactory.newRecordInstance( + ApplicationAttemptId.class); + attemptId.setAttemptId(0); + ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(System.currentTimeMillis()); + appId.setId(0); + attemptId.setApplicationId(appId); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); + appMaster.init(conf); + //simulate the process being killed + MRAppMaster.MRAppMasterShutdownHook hook = + new MRAppMaster.MRAppMasterShutdownHook(appMaster); + hook.run(); + verify(fs, times(0)).delete(stagingJobPath, true); + } + + @Test + public void testDeletionofStagingOnKillLastTry() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + ApplicationAttemptId attemptId = recordFactory.newRecordInstance( + ApplicationAttemptId.class); + attemptId.setAttemptId(1); + ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(System.currentTimeMillis()); + appId.setId(0); + attemptId.setApplicationId(appId); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); + appMaster.init(conf); + //simulate the process being killed + MRAppMaster.MRAppMasterShutdownHook hook = + new MRAppMaster.MRAppMasterShutdownHook(appMaster); + hook.run(); + verify(fs).delete(stagingJobPath, true); + } private class TestMRApp extends MRAppMaster { + ContainerAllocator allocator; - public TestMRApp(ApplicationAttemptId applicationAttemptId) { - super(applicationAttemptId, BuilderUtils.newContainerId( - applicationAttemptId, 1), "testhost", 2222, 3333, System - .currentTimeMillis()); - } - - @Override - protected FileSystem getFileSystem(Configuration conf) { - return fs; - } - - @Override - protected void sysexit() { - } - - @Override - public Configuration getConfig() { - return conf; - } + public TestMRApp(ApplicationAttemptId applicationAttemptId, + ContainerAllocator allocator) { + super(applicationAttemptId, BuilderUtils.newContainerId( + applicationAttemptId, 1), "testhost", 2222, 3333, System + .currentTimeMillis()); + this.allocator = allocator; + } + + public TestMRApp(ApplicationAttemptId applicationAttemptId) { + this(applicationAttemptId, null); + } + + @Override + protected FileSystem getFileSystem(Configuration conf) { + return fs; + } + + @Override + protected ContainerAllocator createContainerAllocator( + final ClientService clientService, final AppContext context) { + if(allocator == null) { + return super.createContainerAllocator(clientService, context); + } + return allocator; + } + + @Override + protected void sysexit() { + } + + @Override + public Configuration getConfig() { + return conf; + } } private final class MRAppTestCleanup extends MRApp {