diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a74a8858f3f..ce06003d8cd 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -72,6 +72,9 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-2793. Corrected AppIDs, JobIDs, TaskAttemptIDs to be of correct format on the web pages. (Bikas Saha via vinodkv) + MAPREDUCE-3614. Fixed MR AM to close history file quickly and send a correct + final state to the RM when it is killed. (Ravi Prakash via vinodkv) + OPTIMIZATIONS MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 212c86cf77e..6e36760dd99 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -95,9 +96,12 @@ public class JobHistoryEventHandler extends AbstractService private static final Log LOG = LogFactory.getLog( JobHistoryEventHandler.class); - private static final Map fileMap = + protected static final Map fileMap = Collections.synchronizedMap(new HashMap()); + // Has a signal (SIGTERM etc) been issued? + protected volatile boolean isSignalled = false; + public JobHistoryEventHandler(AppContext context, int startCount) { super("JobHistoryEventHandler"); this.context = context; @@ -314,7 +318,30 @@ public class JobHistoryEventHandler extends AbstractService LOG.info("In stop, writing event " + ev.getType()); handleEvent(ev); } - + + // Process JobUnsuccessfulCompletionEvent for jobIds which still haven't + // closed their event writers + Iterator jobIt = fileMap.keySet().iterator(); + if(isSignalled) { + while (jobIt.hasNext()) { + JobId toClose = jobIt.next(); + MetaInfo mi = fileMap.get(toClose); + if(mi != null && mi.isWriterActive()) { + LOG.warn("Found jobId " + toClose + + " to have not been closed. Will close"); + //Create a JobFinishEvent so that it is written to the job history + JobUnsuccessfulCompletionEvent jucEvent = + new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose), + System.currentTimeMillis(), context.getJob(toClose) + .getCompletedMaps(), context.getJob(toClose).getCompletedReduces(), + JobState.KILLED.toString()); + JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent); + //Bypass the queue mechanism which might wait. Call the method directly + handleEvent(jfEvent); + } + } + } + //close all file handles for (MetaInfo mi : fileMap.values()) { try { @@ -710,7 +737,7 @@ public class JobHistoryEventHandler extends AbstractService } } - private class MetaInfo { + protected class MetaInfo { private Path historyFile; private Path confFile; private EventWriter writer; @@ -880,4 +907,10 @@ public class JobHistoryEventHandler extends AbstractService //TODO. Some error checking here. return tmpFileName.substring(0, tmpFileName.length()-4); } + + public void setSignalled(boolean isSignalled) { + this.isSignalled = isSignalled; + LOG.info("JobHistoryEventHandler notified that isSignalled was " + + isSignalled); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 0dddd66d59f..f837252b061 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; @@ -155,6 +156,7 @@ public class MRAppMaster extends CompositeService { private boolean newApiCommitter; private OutputCommitter committer; private JobEventDispatcher jobEventDispatcher; + private JobHistoryEventHandler jobHistoryEventHandler; private boolean inRecovery = false; private SpeculatorEventDispatcher speculatorEventDispatcher; @@ -502,9 +504,9 @@ public class MRAppMaster extends CompositeService { protected EventHandler createJobHistoryHandler( AppContext context) { - JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, - getStartCount()); - return eventHandler; + this.jobHistoryEventHandler = new JobHistoryEventHandler(context, + getStartCount()); + return this.jobHistoryEventHandler; } protected Speculator createSpeculator(Configuration conf, AppContext context) { @@ -659,6 +661,10 @@ public class MRAppMaster extends CompositeService { public void handle(ContainerAllocatorEvent event) { this.containerAllocator.handle(event); } + + public void setSignalled(boolean isSignalled) { + ((RMCommunicator) containerAllocator).setSignalled(true); + } } /** @@ -957,12 +963,16 @@ public class MRAppMaster extends CompositeService { Integer.parseInt(nodePortString), Integer.parseInt(nodeHttpPortString), appSubmitTime); Runtime.getRuntime().addShutdownHook( - new CompositeServiceShutdownHook(appMaster)); + new MRAppMasterShutdownHook(appMaster)); YarnConfiguration conf = new YarnConfiguration(new JobConf()); conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE)); String jobUserName = System .getenv(ApplicationConstants.Environment.USER.name()); conf.set(MRJobConfig.USER_NAME, jobUserName); + // Do not automatically close FileSystem objects so that in case of + // SIGTERM I have a chance to write out the job history. I'll be closing + // the objects myself. + conf.setBoolean("fs.automatic.close", false); initAndStartAppMaster(appMaster, conf, jobUserName); } catch (Throwable t) { LOG.fatal("Error starting MRAppMaster", t); @@ -970,6 +980,35 @@ public class MRAppMaster extends CompositeService { } } + // The shutdown hook that runs when a signal is received AND during normal + // close of the JVM. + static class MRAppMasterShutdownHook extends Thread { + MRAppMaster appMaster; + MRAppMasterShutdownHook(MRAppMaster appMaster) { + this.appMaster = appMaster; + } + public void run() { + LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and " + + "JobHistoryEventHandler."); + // Notify the JHEH and RMCommunicator that a SIGTERM has been received so + // that they don't take too long in shutting down + if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) { + ((ContainerAllocatorRouter) appMaster.containerAllocator) + .setSignalled(true); + } + if(appMaster.jobHistoryEventHandler != null) { + appMaster.jobHistoryEventHandler.setSignalled(true); + } + appMaster.stop(); + try { + //Close all the FileSystem objects + FileSystem.closeAll(); + } catch (IOException ioe) { + LOG.warn("Failed to close all FileSystem objects", ioe); + } + } + } + protected static void initAndStartAppMaster(final MRAppMaster appMaster, final YarnConfiguration conf, String jobUserName) throws IOException, InterruptedException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 5ff838c5257..6a51f2dbd57 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -81,6 +81,8 @@ public abstract class RMCommunicator extends AbstractService { private final AppContext context; private Job job; + // Has a signal (SIGTERM etc) been issued? + protected volatile boolean isSignalled = false; public RMCommunicator(ClientService clientService, AppContext context) { super("RMCommunicator"); @@ -158,7 +160,8 @@ public abstract class RMCommunicator extends AbstractService { FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; if (job.getState() == JobState.SUCCEEDED) { finishState = FinalApplicationStatus.SUCCEEDED; - } else if (job.getState() == JobState.KILLED) { + } else if (job.getState() == JobState.KILLED + || (job.getState() == JobState.RUNNING && isSignalled)) { finishState = FinalApplicationStatus.KILLED; } else if (job.getState() == JobState.FAILED || job.getState() == JobState.ERROR) { @@ -278,4 +281,9 @@ public abstract class RMCommunicator extends AbstractService { } protected abstract void heartbeat() throws Exception; + + public void setSignalled(boolean isSignalled) { + this.isSignalled = isSignalled; + LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 5e7af4629f5..fce41d6086c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.jobhistory; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -48,6 +50,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.verification.VerificationMode; public class TestJobHistoryEventHandler { @@ -277,6 +281,68 @@ public class TestJobHistoryEventHandler { JobId jobId = MRBuilderUtils.newJobId(appId, 1); AppContext mockAppContext = mockAppContext(jobId); } + + private JobHistoryEvent getEventToEnqueue(JobId jobId) { + JobHistoryEvent toReturn = Mockito.mock(JobHistoryEvent.class); + HistoryEvent he = Mockito.mock(HistoryEvent.class); + Mockito.when(he.getEventType()).thenReturn(EventType.JOB_STATUS_CHANGED); + Mockito.when(toReturn.getHistoryEvent()).thenReturn(he); + Mockito.when(toReturn.getJobID()).thenReturn(jobId); + return toReturn; + } + + @Test + /** + * Tests that in case of SIGTERM, the JHEH stops without processing its event + * queue (because we must stop quickly lest we get SIGKILLed) and processes + * a JobUnsuccessfulEvent for jobs which were still running (so that they may + * show up in the JobHistoryServer) + */ + public void testSigTermedFunctionality() throws IOException { + AppContext mockedContext = Mockito.mock(AppContext.class); + JHEventHandlerForSigtermTest jheh = + new JHEventHandlerForSigtermTest(mockedContext, 0); + + JobId jobId = Mockito.mock(JobId.class); + jheh.addToFileMap(jobId); + + //Submit 4 events and check that they're handled in the absence of a signal + final int numEvents = 4; + JobHistoryEvent events[] = new JobHistoryEvent[numEvents]; + for(int i=0; i < numEvents; ++i) { + events[i] = getEventToEnqueue(jobId); + jheh.handle(events[i]); + } + jheh.stop(); + //Make sure events were handled + assertTrue("handleEvent should've been called only 4 times but was " + + jheh.eventsHandled, jheh.eventsHandled == 4); + + //Create a new jheh because the last stop closed the eventWriter etc. + jheh = new JHEventHandlerForSigtermTest(mockedContext, 0); + + // Make constructor of JobUnsuccessfulCompletionEvent pass + Job job = Mockito.mock(Job.class); + Mockito.when(mockedContext.getJob(jobId)).thenReturn(job); + // Make TypeConverter(JobID) pass + ApplicationId mockAppId = Mockito.mock(ApplicationId.class); + Mockito.when(mockAppId.getClusterTimestamp()).thenReturn(1000l); + Mockito.when(jobId.getAppId()).thenReturn(mockAppId); + + jheh.addToFileMap(jobId); + jheh.setSignalled(true); + for(int i=0; i < numEvents; ++i) { + events[i] = getEventToEnqueue(jobId); + jheh.handle(events[i]); + } + jheh.stop(); + //Make sure events were handled, 4 + 1 finish event + assertTrue("handleEvent should've been called only 5 times but was " + + jheh.eventsHandled, jheh.eventsHandled == 5); + assertTrue("Last event handled wasn't JobUnsuccessfulCompletionEvent", + jheh.lastEventHandled.getHistoryEvent() + instanceof JobUnsuccessfulCompletionEvent); + } } class JHEvenHandlerForTest extends JobHistoryEventHandler { @@ -307,4 +373,28 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler { public EventWriter getEventWriter() { return this.eventWriter; } +} + +/** + * Class to help with testSigTermedFunctionality + */ +class JHEventHandlerForSigtermTest extends JobHistoryEventHandler { + private MetaInfo metaInfo; + public JHEventHandlerForSigtermTest(AppContext context, int startCount) { + super(context, startCount); + } + + public void addToFileMap(JobId jobId) { + metaInfo = Mockito.mock(MetaInfo.class); + Mockito.when(metaInfo.isWriterActive()).thenReturn(true); + fileMap.put(jobId, metaInfo); + } + + JobHistoryEvent lastEventHandled; + int eventsHandled = 0; + @Override + protected void handleEvent(JobHistoryEvent event) { + this.lastEventHandled = event; + this.eventsHandled++; + } } \ No newline at end of file