MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it recovers from a commit during a previous attempt. Contributed by Xuan Gong.

svn merge --ignore-ancestry -c 1581180 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1581181 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-25 02:01:13 +00:00
parent 5303f4a66d
commit e51fbb6a9e
6 changed files with 127 additions and 12 deletions

View File

@ -121,6 +121,9 @@ Release 2.4.0 - UNRELEASED
FadviseFileRegion::transferTo does not read disks efficiently. FadviseFileRegion::transferTo does not read disks efficiently.
(Nikola Vujic via cnauroth) (Nikola Vujic via cnauroth)
MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it
recovers from a commit during a previous attempt. (Xuan Gong via vinodkv)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -49,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
@ -348,7 +350,9 @@ public class JobHistoryEventHandler extends AbstractService
JobUnsuccessfulCompletionEvent jucEvent = JobUnsuccessfulCompletionEvent jucEvent =
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose), new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
System.currentTimeMillis(), job.getCompletedMaps(), System.currentTimeMillis(), job.getCompletedMaps(),
job.getCompletedReduces(), JobState.KILLED.toString(), job.getCompletedReduces(),
createJobStateForJobUnsuccessfulCompletionEvent(
mi.getForcedJobStateOnShutDown()),
job.getDiagnostics()); job.getDiagnostics());
JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent); JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
//Bypass the queue mechanism which might wait. Call the method directly //Bypass the queue mechanism which might wait. Call the method directly
@ -381,9 +385,10 @@ public class JobHistoryEventHandler extends AbstractService
* This should be the first call to history for a job * This should be the first call to history for a job
* *
* @param jobId the jobId. * @param jobId the jobId.
* @param forcedJobStateOnShutDown
* @throws IOException * @throws IOException
*/ */
protected void setupEventWriter(JobId jobId) protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown)
throws IOException { throws IOException {
if (stagingDirPath == null) { if (stagingDirPath == null) {
LOG.error("Log Directory is null, returning"); LOG.error("Log Directory is null, returning");
@ -438,7 +443,7 @@ public class JobHistoryEventHandler extends AbstractService
} }
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
user, jobName, jobId); user, jobName, jobId, forcedJobStateOnShutDown);
fi.getJobSummary().setJobId(jobId); fi.getJobSummary().setJobId(jobId);
fileMap.put(jobId, fi); fileMap.put(jobId, fi);
} }
@ -481,13 +486,17 @@ public class JobHistoryEventHandler extends AbstractService
return false; return false;
} }
protected void handleEvent(JobHistoryEvent event) { @Private
public void handleEvent(JobHistoryEvent event) {
synchronized (lock) { synchronized (lock) {
// If this is JobSubmitted Event, setup the writer // If this is JobSubmitted Event, setup the writer
if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) { if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
try { try {
setupEventWriter(event.getJobID()); AMStartedEvent amStartedEvent =
(AMStartedEvent) event.getHistoryEvent();
setupEventWriter(event.getJobID(),
amStartedEvent.getForcedJobStateOnShutDown());
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Error JobHistoryEventHandler in handleEvent: " + event, LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
ioe); ioe);
@ -804,9 +813,10 @@ public class JobHistoryEventHandler extends AbstractService
Timer flushTimer; Timer flushTimer;
FlushTimerTask flushTimerTask; FlushTimerTask flushTimerTask;
private boolean isTimerShutDown = false; private boolean isTimerShutDown = false;
private String forcedJobStateOnShutDown;
MetaInfo(Path historyFile, Path conf, EventWriter writer, String user, MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
String jobName, JobId jobId) { String jobName, JobId jobId, String forcedJobStateOnShutDown) {
this.historyFile = historyFile; this.historyFile = historyFile;
this.confFile = conf; this.confFile = conf;
this.writer = writer; this.writer = writer;
@ -814,6 +824,7 @@ public class JobHistoryEventHandler extends AbstractService
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null); new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
this.jobSummary = new JobSummary(); this.jobSummary = new JobSummary();
this.flushTimer = new Timer("FlushTimer", true); this.flushTimer = new Timer("FlushTimer", true);
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
} }
Path getHistoryFile() { Path getHistoryFile() {
@ -840,6 +851,10 @@ public class JobHistoryEventHandler extends AbstractService
return isTimerShutDown; return isTimerShutDown;
} }
String getForcedJobStateOnShutDown() {
return forcedJobStateOnShutDown;
}
@Override @Override
public String toString() { public String toString() {
return "Job MetaInfo for "+ jobSummary.getJobId() return "Job MetaInfo for "+ jobSummary.getJobId()
@ -983,4 +998,20 @@ public class JobHistoryEventHandler extends AbstractService
LOG.info("JobHistoryEventHandler notified that forceJobCompletion is " LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
+ forceJobCompletion); + forceJobCompletion);
} }
private String createJobStateForJobUnsuccessfulCompletionEvent(
String forcedJobStateOnShutDown) {
if (forcedJobStateOnShutDown == null || forcedJobStateOnShutDown
.isEmpty()) {
return JobState.KILLED.toString();
} else if (forcedJobStateOnShutDown.equals(
JobStateInternal.ERROR.toString()) ||
forcedJobStateOnShutDown.equals(JobStateInternal.FAILED.toString())) {
return JobState.FAILED.toString();
} else if (forcedJobStateOnShutDown.equals(JobStateInternal.SUCCEEDED
.toString())) {
return JobState.SUCCEEDED.toString();
}
return JobState.KILLED.toString();
}
} }

View File

@ -1012,14 +1012,13 @@ public class MRAppMaster extends CompositeService {
AMInfo amInfo = AMInfo amInfo =
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
nmPort, nmHttpPort); nmPort, nmHttpPort);
amInfos.add(amInfo);
// /////////////////// Create the job itself. // /////////////////// Create the job itself.
job = createJob(getConfig(), forcedState, shutDownMessage); job = createJob(getConfig(), forcedState, shutDownMessage);
// End of creating the job. // End of creating the job.
// Send out an MR AM inited event for this AM and all previous AMs. // Send out an MR AM inited event for all previous AMs.
for (AMInfo info : amInfos) { for (AMInfo info : amInfos) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(info new JobHistoryEvent(job.getID(), new AMStartedEvent(info
@ -1028,6 +1027,15 @@ public class MRAppMaster extends CompositeService {
.getNodeManagerHttpPort()))); .getNodeManagerHttpPort())));
} }
// Send out an MR AM inited event for this AM.
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
.getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
.getNodeManagerHttpPort(), this.forcedState == null ? null
: this.forcedState.toString())));
amInfos.add(amInfo);
// metrics system init is really init & start. // metrics system init is really init & start.
// It's more test friendly to put it here. // It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster"); DefaultMetricsSystem.initialize("MRAppMaster");

View File

@ -497,7 +497,7 @@ class JHEventHandlerForSigtermTest extends JobHistoryEventHandler {
JobHistoryEvent lastEventHandled; JobHistoryEvent lastEventHandled;
int eventsHandled = 0; int eventsHandled = 0;
@Override @Override
protected void handleEvent(JobHistoryEvent event) { public void handleEvent(JobHistoryEvent event) {
this.lastEventHandled = event; this.lastEventHandled = event;
this.eventsHandled++; this.eventsHandled++;
} }

View File

@ -21,7 +21,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -44,6 +45,10 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
@ -70,6 +75,8 @@ import org.apache.log4j.Logger;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
public class TestMRAppMaster { public class TestMRAppMaster {
private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class); private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
@ -154,6 +161,9 @@ public class TestMRAppMaster {
assertTrue(appMaster.errorHappenedShutDown); assertTrue(appMaster.errorHappenedShutDown);
assertEquals(JobStateInternal.ERROR, appMaster.forcedState); assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
appMaster.stop(); appMaster.stop();
// verify the final status is FAILED
verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
} }
@Test @Test
@ -190,6 +200,9 @@ public class TestMRAppMaster {
assertTrue(appMaster.errorHappenedShutDown); assertTrue(appMaster.errorHappenedShutDown);
assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState); assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
appMaster.stop(); appMaster.stop();
// verify the final status is SUCCEEDED
verifyFailedStatus((MRAppMasterTest)appMaster, "SUCCEEDED");
} }
@Test @Test
@ -226,6 +239,9 @@ public class TestMRAppMaster {
assertTrue(appMaster.errorHappenedShutDown); assertTrue(appMaster.errorHappenedShutDown);
assertEquals(JobStateInternal.FAILED, appMaster.forcedState); assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
appMaster.stop(); appMaster.stop();
// verify the final status is FAILED
verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
} }
@Test @Test
@ -423,8 +439,20 @@ public class TestMRAppMaster {
} }
}
private void verifyFailedStatus(MRAppMasterTest appMaster,
String expectedJobState) {
ArgumentCaptor<JobHistoryEvent> captor = ArgumentCaptor
.forClass(JobHistoryEvent.class);
// handle two events: AMStartedEvent and JobUnsuccessfulCompletionEvent
verify(appMaster.spyHistoryService, times(2))
.handleEvent(captor.capture());
HistoryEvent event = captor.getValue().getHistoryEvent();
assertTrue(event instanceof JobUnsuccessfulCompletionEvent);
assertEquals(((JobUnsuccessfulCompletionEvent) event).getStatus()
, expectedJobState);
}
}
class MRAppMasterTest extends MRAppMaster { class MRAppMasterTest extends MRAppMaster {
Path stagingDirPath; Path stagingDirPath;
@ -434,6 +462,7 @@ class MRAppMasterTest extends MRAppMaster {
ContainerAllocator mockContainerAllocator; ContainerAllocator mockContainerAllocator;
CommitterEventHandler mockCommitterEventHandler; CommitterEventHandler mockCommitterEventHandler;
RMHeartbeatHandler mockRMHeartbeatHandler; RMHeartbeatHandler mockRMHeartbeatHandler;
JobHistoryEventHandler spyHistoryService;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort, ContainerId containerId, String host, int port, int httpPort,
@ -502,4 +531,14 @@ class MRAppMasterTest extends MRAppMaster {
public UserGroupInformation getUgi() { public UserGroupInformation getUgi() {
return currentUser; return currentUser;
} }
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
spyHistoryService =
Mockito.spy((JobHistoryEventHandler) super
.createJobHistoryHandler(context));
spyHistoryService.setForcejobCompletion(this.isLastAMRetry);
return spyHistoryService;
}
} }

View File

@ -34,6 +34,7 @@ import org.apache.avro.util.Utf8;
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class AMStartedEvent implements HistoryEvent { public class AMStartedEvent implements HistoryEvent {
private AMStarted datum = new AMStarted(); private AMStarted datum = new AMStarted();
private String forcedJobStateOnShutDown;
/** /**
* Create an event to record the start of an MR AppMaster * Create an event to record the start of an MR AppMaster
@ -54,12 +55,38 @@ public class AMStartedEvent implements HistoryEvent {
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime, public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerPort, ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
int nodeManagerHttpPort) { int nodeManagerHttpPort) {
this(appAttemptId, startTime, containerId, nodeManagerHost,
nodeManagerPort, nodeManagerHttpPort, null);
}
/**
* Create an event to record the start of an MR AppMaster
*
* @param appAttemptId
* the application attempt id.
* @param startTime
* the start time of the AM.
* @param containerId
* the containerId of the AM.
* @param nodeManagerHost
* the node on which the AM is running.
* @param nodeManagerPort
* the port on which the AM is running.
* @param nodeManagerHttpPort
* the httpPort for the node running the AM.
* @param forcedJobStateOnShutDown
* the state to force the job into
*/
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
int nodeManagerHttpPort, String forcedJobStateOnShutDown) {
datum.applicationAttemptId = new Utf8(appAttemptId.toString()); datum.applicationAttemptId = new Utf8(appAttemptId.toString());
datum.startTime = startTime; datum.startTime = startTime;
datum.containerId = new Utf8(containerId.toString()); datum.containerId = new Utf8(containerId.toString());
datum.nodeManagerHost = new Utf8(nodeManagerHost); datum.nodeManagerHost = new Utf8(nodeManagerHost);
datum.nodeManagerPort = nodeManagerPort; datum.nodeManagerPort = nodeManagerPort;
datum.nodeManagerHttpPort = nodeManagerHttpPort; datum.nodeManagerHttpPort = nodeManagerHttpPort;
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
} }
AMStartedEvent() { AMStartedEvent() {
@ -116,6 +143,13 @@ public class AMStartedEvent implements HistoryEvent {
return datum.nodeManagerHttpPort; return datum.nodeManagerHttpPort;
} }
/**
* @return the state to force the job into
*/
public String getForcedJobStateOnShutDown() {
return this.forcedJobStateOnShutDown;
}
/** Get the attempt id */ /** Get the attempt id */
@Override @Override