MAPREDUCE-3614. Fixed MR AM to close history file quickly and send a correct final state to the RM when it is killed. Contributed by Ravi Prakash.

svn merge --ignore-ancestry -c 1296747 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1296748 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-03-04 05:25:07 +00:00
parent f942f395c1
commit 526f466d0c
5 changed files with 181 additions and 8 deletions

View File

@ -72,6 +72,9 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-2793. Corrected AppIDs, JobIDs, TaskAttemptIDs to be of correct
format on the web pages. (Bikas Saha via vinodkv)
MAPREDUCE-3614. Fixed MR AM to close history file quickly and send a correct
final state to the RM when it is killed. (Ravi Prakash via vinodkv)
OPTIMIZATIONS
MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -95,9 +96,12 @@ public class JobHistoryEventHandler extends AbstractService
private static final Log LOG = LogFactory.getLog(
JobHistoryEventHandler.class);
private static final Map<JobId, MetaInfo> fileMap =
protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
// Has a signal (SIGTERM etc) been issued?
protected volatile boolean isSignalled = false;
public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
this.context = context;
@ -314,7 +318,30 @@ public void stop() {
LOG.info("In stop, writing event " + ev.getType());
handleEvent(ev);
}
// Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
// closed their event writers
Iterator<JobId> jobIt = fileMap.keySet().iterator();
if(isSignalled) {
while (jobIt.hasNext()) {
JobId toClose = jobIt.next();
MetaInfo mi = fileMap.get(toClose);
if(mi != null && mi.isWriterActive()) {
LOG.warn("Found jobId " + toClose
+ " to have not been closed. Will close");
//Create a JobFinishEvent so that it is written to the job history
JobUnsuccessfulCompletionEvent jucEvent =
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
System.currentTimeMillis(), context.getJob(toClose)
.getCompletedMaps(), context.getJob(toClose).getCompletedReduces(),
JobState.KILLED.toString());
JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
//Bypass the queue mechanism which might wait. Call the method directly
handleEvent(jfEvent);
}
}
}
//close all file handles
for (MetaInfo mi : fileMap.values()) {
try {
@ -710,7 +737,7 @@ public void stop() {
}
}
private class MetaInfo {
protected class MetaInfo {
private Path historyFile;
private Path confFile;
private EventWriter writer;
@ -880,4 +907,10 @@ private String getFileNameFromTmpFN(String tmpFileName) {
//TODO. Some error checking here.
return tmpFileName.substring(0, tmpFileName.length()-4);
}
public void setSignalled(boolean isSignalled) {
this.isSignalled = isSignalled;
LOG.info("JobHistoryEventHandler notified that isSignalled was "
+ isSignalled);
}
}

View File

@ -76,6 +76,7 @@
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
@ -155,6 +156,7 @@ public class MRAppMaster extends CompositeService {
private boolean newApiCommitter;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private JobHistoryEventHandler jobHistoryEventHandler;
private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
@ -502,9 +504,9 @@ protected void addIfService(Object object) {
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
getStartCount());
return eventHandler;
this.jobHistoryEventHandler = new JobHistoryEventHandler(context,
getStartCount());
return this.jobHistoryEventHandler;
}
protected Speculator createSpeculator(Configuration conf, AppContext context) {
@ -659,6 +661,10 @@ public synchronized void stop() {
public void handle(ContainerAllocatorEvent event) {
this.containerAllocator.handle(event);
}
public void setSignalled(boolean isSignalled) {
((RMCommunicator) containerAllocator).setSignalled(true);
}
}
/**
@ -957,12 +963,16 @@ public static void main(String[] args) {
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), appSubmitTime);
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(appMaster));
new MRAppMasterShutdownHook(appMaster));
YarnConfiguration conf = new YarnConfiguration(new JobConf());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
String jobUserName = System
.getenv(ApplicationConstants.Environment.USER.name());
conf.set(MRJobConfig.USER_NAME, jobUserName);
// Do not automatically close FileSystem objects so that in case of
// SIGTERM I have a chance to write out the job history. I'll be closing
// the objects myself.
conf.setBoolean("fs.automatic.close", false);
initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) {
LOG.fatal("Error starting MRAppMaster", t);
@ -970,6 +980,35 @@ public static void main(String[] args) {
}
}
// The shutdown hook that runs when a signal is received AND during normal
// close of the JVM.
static class MRAppMasterShutdownHook extends Thread {
MRAppMaster appMaster;
MRAppMasterShutdownHook(MRAppMaster appMaster) {
this.appMaster = appMaster;
}
public void run() {
LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
+ "JobHistoryEventHandler.");
// Notify the JHEH and RMCommunicator that a SIGTERM has been received so
// that they don't take too long in shutting down
if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
((ContainerAllocatorRouter) appMaster.containerAllocator)
.setSignalled(true);
}
if(appMaster.jobHistoryEventHandler != null) {
appMaster.jobHistoryEventHandler.setSignalled(true);
}
appMaster.stop();
try {
//Close all the FileSystem objects
FileSystem.closeAll();
} catch (IOException ioe) {
LOG.warn("Failed to close all FileSystem objects", ioe);
}
}
}
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final YarnConfiguration conf, String jobUserName) throws IOException,
InterruptedException {

View File

@ -81,6 +81,8 @@ public abstract class RMCommunicator extends AbstractService {
private final AppContext context;
private Job job;
// Has a signal (SIGTERM etc) been issued?
protected volatile boolean isSignalled = false;
public RMCommunicator(ClientService clientService, AppContext context) {
super("RMCommunicator");
@ -158,7 +160,8 @@ protected void unregister() {
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
if (job.getState() == JobState.SUCCEEDED) {
finishState = FinalApplicationStatus.SUCCEEDED;
} else if (job.getState() == JobState.KILLED) {
} else if (job.getState() == JobState.KILLED
|| (job.getState() == JobState.RUNNING && isSignalled)) {
finishState = FinalApplicationStatus.KILLED;
} else if (job.getState() == JobState.FAILED
|| job.getState() == JobState.ERROR) {
@ -278,4 +281,9 @@ public AMRMProtocol run() {
}
protected abstract void heartbeat() throws Exception;
public void setSignalled(boolean isSignalled) {
this.isSignalled = isSignalled;
LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.jobhistory;
import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -48,6 +50,8 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
public class TestJobHistoryEventHandler {
@ -277,6 +281,68 @@ private class TestParams {
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
AppContext mockAppContext = mockAppContext(jobId);
}
private JobHistoryEvent getEventToEnqueue(JobId jobId) {
JobHistoryEvent toReturn = Mockito.mock(JobHistoryEvent.class);
HistoryEvent he = Mockito.mock(HistoryEvent.class);
Mockito.when(he.getEventType()).thenReturn(EventType.JOB_STATUS_CHANGED);
Mockito.when(toReturn.getHistoryEvent()).thenReturn(he);
Mockito.when(toReturn.getJobID()).thenReturn(jobId);
return toReturn;
}
@Test
/**
* Tests that in case of SIGTERM, the JHEH stops without processing its event
* queue (because we must stop quickly lest we get SIGKILLed) and processes
* a JobUnsuccessfulEvent for jobs which were still running (so that they may
* show up in the JobHistoryServer)
*/
public void testSigTermedFunctionality() throws IOException {
AppContext mockedContext = Mockito.mock(AppContext.class);
JHEventHandlerForSigtermTest jheh =
new JHEventHandlerForSigtermTest(mockedContext, 0);
JobId jobId = Mockito.mock(JobId.class);
jheh.addToFileMap(jobId);
//Submit 4 events and check that they're handled in the absence of a signal
final int numEvents = 4;
JobHistoryEvent events[] = new JobHistoryEvent[numEvents];
for(int i=0; i < numEvents; ++i) {
events[i] = getEventToEnqueue(jobId);
jheh.handle(events[i]);
}
jheh.stop();
//Make sure events were handled
assertTrue("handleEvent should've been called only 4 times but was "
+ jheh.eventsHandled, jheh.eventsHandled == 4);
//Create a new jheh because the last stop closed the eventWriter etc.
jheh = new JHEventHandlerForSigtermTest(mockedContext, 0);
// Make constructor of JobUnsuccessfulCompletionEvent pass
Job job = Mockito.mock(Job.class);
Mockito.when(mockedContext.getJob(jobId)).thenReturn(job);
// Make TypeConverter(JobID) pass
ApplicationId mockAppId = Mockito.mock(ApplicationId.class);
Mockito.when(mockAppId.getClusterTimestamp()).thenReturn(1000l);
Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
jheh.addToFileMap(jobId);
jheh.setSignalled(true);
for(int i=0; i < numEvents; ++i) {
events[i] = getEventToEnqueue(jobId);
jheh.handle(events[i]);
}
jheh.stop();
//Make sure events were handled, 4 + 1 finish event
assertTrue("handleEvent should've been called only 5 times but was "
+ jheh.eventsHandled, jheh.eventsHandled == 5);
assertTrue("Last event handled wasn't JobUnsuccessfulCompletionEvent",
jheh.lastEventHandled.getHistoryEvent()
instanceof JobUnsuccessfulCompletionEvent);
}
}
class JHEvenHandlerForTest extends JobHistoryEventHandler {
@ -307,4 +373,28 @@ protected void closeEventWriter(JobId jobId) {
public EventWriter getEventWriter() {
return this.eventWriter;
}
}
/**
* Class to help with testSigTermedFunctionality
*/
class JHEventHandlerForSigtermTest extends JobHistoryEventHandler {
private MetaInfo metaInfo;
public JHEventHandlerForSigtermTest(AppContext context, int startCount) {
super(context, startCount);
}
public void addToFileMap(JobId jobId) {
metaInfo = Mockito.mock(MetaInfo.class);
Mockito.when(metaInfo.isWriterActive()).thenReturn(true);
fileMap.put(jobId, metaInfo);
}
JobHistoryEvent lastEventHandled;
int eventsHandled = 0;
@Override
protected void handleEvent(JobHistoryEvent event) {
this.lastEventHandled = event;
this.eventsHandled++;
}
}