MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot command from the RM. Contributed by Jian He

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1464255 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-04-04 01:56:49 +00:00
parent da0e779e4b
commit fc75d3f3dc
9 changed files with 274 additions and 31 deletions

View File

@ -175,6 +175,9 @@ Release 2.0.5-alpha - UNRELEASED
MAPREDUCE-5098. Fix findbugs warnings in gridmix. (kkambatl via tucu)
MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot
command from the RM. (Jian He via jlowe)
Release 2.0.4-beta - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -549,8 +549,14 @@ public void shutDownJob() {
}
try {
//We are finishing cleanly so this is the last retry
isLastAMRetry = true;
//if isLastAMRetry comes as true, should never set it to false
if ( !isLastAMRetry){
if (((JobImpl)job).getInternalState() != JobStateInternal.REBOOT) {
LOG.info("We are finishing cleanly so this is the last retry");
isLastAMRetry = true;
}
}
notifyIsLastAMRetry(isLastAMRetry);
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
@ -1272,19 +1278,25 @@ public void run() {
// that they don't take too long in shutting down
if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
((ContainerAllocatorRouter) appMaster.containerAllocator)
.setSignalled(true);
((ContainerAllocatorRouter) appMaster.containerAllocator)
.setShouldUnregister(appMaster.isLastAMRetry);
}
if(appMaster.jobHistoryEventHandler != null) {
appMaster.jobHistoryEventHandler
.setForcejobCompletion(appMaster.isLastAMRetry);
.setSignalled(true);
}
appMaster.notifyIsLastAMRetry(appMaster.isLastAMRetry);
appMaster.stop();
}
}
public void notifyIsLastAMRetry(boolean isLastAMRetry){
if(containerAllocator instanceof ContainerAllocatorRouter) {
LOG.info("Notify RMCommunicator isAMLastRetry: " + isLastAMRetry);
((ContainerAllocatorRouter) containerAllocator)
.setShouldUnregister(isLastAMRetry);
}
if(jobHistoryEventHandler != null) {
LOG.info("Notify JHEH isAMLastRetry: " + isLastAMRetry);
jobHistoryEventHandler.setForcejobCompletion(isLastAMRetry);
}
}
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final YarnConfiguration conf, String jobUserName) throws IOException,
InterruptedException {

View File

@ -30,5 +30,6 @@ public enum JobStateInternal {
KILL_WAIT,
KILL_ABORT,
KILLED,
ERROR
ERROR,
REBOOT
}

View File

@ -54,6 +54,6 @@ public enum JobEventType {
JOB_TASK_ATTEMPT_FETCH_FAILURE,
//Producer:RMContainerAllocator
JOB_UPDATED_NODES
JOB_UPDATED_NODES,
JOB_AM_REBOOT
}

View File

@ -215,6 +215,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
private static final InternalErrorTransition
INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
private static final InternalRebootTransition
INTERNAL_REBOOT_TRANSITION = new InternalRebootTransition();
private static final TaskAttemptCompletedEventTransition
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new TaskAttemptCompletedEventTransition();
@ -246,6 +248,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
.addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
.addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT,
JobEventType.JOB_AM_REBOOT,
INTERNAL_REBOOT_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_UPDATED_NODES)
@ -265,6 +270,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
.addTransition(JobStateInternal.INITED, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
.addTransition(JobStateInternal.INITED, JobStateInternal.REBOOT,
JobEventType.JOB_AM_REBOOT,
INTERNAL_REBOOT_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_UPDATED_NODES)
@ -287,6 +295,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
.addTransition(JobStateInternal.SETUP, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
.addTransition(JobStateInternal.SETUP, JobStateInternal.REBOOT,
JobEventType.JOB_AM_REBOOT,
INTERNAL_REBOOT_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
JobEventType.JOB_UPDATED_NODES)
@ -327,6 +338,9 @@ JobEventType.JOB_KILL, new KillTasksTransition())
JobStateInternal.RUNNING,
JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
.addTransition(JobStateInternal.RUNNING, JobStateInternal.REBOOT,
JobEventType.JOB_AM_REBOOT,
INTERNAL_REBOOT_TRANSITION)
// Transitions from KILL_WAIT state.
.addTransition
@ -352,7 +366,8 @@ JobEventType.JOB_KILL, new KillTasksTransition())
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_AM_REBOOT))
// Transitions from COMMITTING state
.addTransition(JobStateInternal.COMMITTING,
@ -377,7 +392,10 @@ JobEventType.JOB_KILL, new KillTasksTransition())
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.COMMITTING, JobStateInternal.REBOOT,
JobEventType.JOB_AM_REBOOT,
INTERNAL_REBOOT_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.COMMITTING,
EnumSet.of(JobEventType.JOB_UPDATED_NODES,
@ -397,7 +415,8 @@ JobEventType.JOB_KILL, new KillTasksTransition())
.addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_AM_REBOOT))
// Transitions from FAIL_ABORT state
.addTransition(JobStateInternal.FAIL_ABORT,
@ -425,7 +444,8 @@ JobEventType.JOB_KILL, new KillTasksTransition())
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED))
JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_AM_REBOOT))
// Transitions from KILL_ABORT state
.addTransition(JobStateInternal.KILL_ABORT,
@ -452,7 +472,8 @@ JobEventType.JOB_KILL, new KillTasksTransition())
JobEventType.JOB_SETUP_COMPLETED,
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED))
JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_AM_REBOOT))
// Transitions from FAILED state
.addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
@ -476,7 +497,8 @@ JobEventType.JOB_KILL, new KillTasksTransition())
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_ABORT_COMPLETED))
JobEventType.JOB_ABORT_COMPLETED,
JobEventType.JOB_AM_REBOOT))
// Transitions from KILLED state
.addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
@ -498,7 +520,8 @@ JobEventType.JOB_KILL, new KillTasksTransition())
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_ABORT_COMPLETED))
JobEventType.JOB_ABORT_COMPLETED,
JobEventType.JOB_AM_REBOOT))
// No transitions from INTERNAL_ERROR state. Ignore all.
.addTransition(
@ -517,9 +540,33 @@ JobEventType.JOB_KILL, new KillTasksTransition())
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_ABORT_COMPLETED,
JobEventType.INTERNAL_ERROR))
JobEventType.INTERNAL_ERROR,
JobEventType.JOB_AM_REBOOT))
.addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// No transitions from AM_REBOOT state. Ignore all.
.addTransition(
JobStateInternal.REBOOT,
JobStateInternal.REBOOT,
EnumSet.of(JobEventType.JOB_INIT,
JobEventType.JOB_KILL,
JobEventType.JOB_TASK_COMPLETED,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_SETUP_COMPLETED,
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_ABORT_COMPLETED,
JobEventType.INTERNAL_ERROR,
JobEventType.JOB_AM_REBOOT))
.addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// create the topology tables
.installTopology();
@ -904,6 +951,8 @@ private static JobState getExternalState(JobStateInternal smState) {
return JobState.RUNNING;
case FAIL_ABORT:
return JobState.FAILED;
case REBOOT:
return JobState.ERROR;
default:
return JobState.valueOf(smState.name());
}
@ -972,6 +1021,7 @@ JobStateInternal finished(JobStateInternal finalState) {
case KILLED:
metrics.killedJob(this);
break;
case REBOOT:
case ERROR:
case FAILED:
metrics.failedJob(this);
@ -1898,8 +1948,17 @@ public void transition(JobImpl job, JobEvent event) {
}
}
private static class InternalErrorTransition implements
private static class InternalTerminationTransition implements
SingleArcTransition<JobImpl, JobEvent> {
JobStateInternal terminationState = null;
String jobHistoryString = null;
public InternalTerminationTransition(JobStateInternal stateInternal,
String jobHistoryString) {
this.terminationState = stateInternal;
//mostly a hack for jbhistoryserver
this.jobHistoryString = jobHistoryString;
}
@Override
public void transition(JobImpl job, JobEvent event) {
//TODO Is this JH event required.
@ -1907,9 +1966,21 @@ public void transition(JobImpl job, JobEvent event) {
JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0,
JobStateInternal.ERROR.toString());
jobHistoryString);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(JobStateInternal.ERROR);
job.finished(terminationState);
}
}
private static class InternalErrorTransition extends InternalTerminationTransition {
public InternalErrorTransition(){
super(JobStateInternal.ERROR, JobStateInternal.ERROR.toString());
}
}
private static class InternalRebootTransition extends InternalTerminationTransition {
public InternalRebootTransition(){
super(JobStateInternal.REBOOT, JobStateInternal.ERROR.toString());
}
}

View File

@ -123,7 +123,7 @@ protected synchronized void heartbeat() throws Exception {
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.INTERNAL_ERROR));
JobEventType.JOB_AM_REBOOT));
throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
}

View File

@ -574,7 +574,7 @@ private List<Container> getResources() throws Exception {
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.INTERNAL_ERROR));
JobEventType.JOB_AM_REBOOT));
throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
}

View File

@ -33,7 +33,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@ -45,6 +47,7 @@
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -86,9 +89,68 @@ public void testDeletionofStaging() throws IOException {
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
MRAppMaster appMaster = new TestMRApp(attemptId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
verify(fs).delete(stagingJobPath, true);
}
@Test (timeout = 30000)
public void testNoDeletionofStagingOnReboot() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
attemptId.setApplicationId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.REBOOT, 4);
appMaster.init(conf);
appMaster.start();
//shutdown the job, not the lastRetry
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(false, ((TestMRApp)appMaster).getTestIsLastAMRetry());
verify(fs, times(0)).delete(stagingJobPath, true);
}
@Test (timeout = 30000)
public void testDeletionofStagingOnReboot() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(1);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
attemptId.setApplicationId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.REBOOT, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
appMaster.start();
//shutdown the job, is lastRetry
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
verify(fs).delete(stagingJobPath, true);
}
@ -151,6 +213,8 @@ public void testDeletionofStagingOnKillLastTry() throws IOException {
private class TestMRApp extends MRAppMaster {
ContainerAllocator allocator;
boolean testIsLastAMRetry = false;
JobStateInternal jobStateInternal;
public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator, int maxAppAttempts) {
@ -160,9 +224,11 @@ public TestMRApp(ApplicationAttemptId applicationAttemptId,
this.allocator = allocator;
}
public TestMRApp(ApplicationAttemptId applicationAttemptId) {
this(applicationAttemptId, null,
MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator, JobStateInternal jobStateInternal,
int maxAppAttempts) {
this(applicationAttemptId, allocator, maxAppAttempts);
this.jobStateInternal = jobStateInternal;
}
@Override
@ -179,6 +245,31 @@ protected ContainerAllocator createContainerAllocator(
return allocator;
}
@Override
protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
JobImpl jobImpl = mock(JobImpl.class);
when(jobImpl.getInternalState()).thenReturn(this.jobStateInternal);
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
when(jobImpl.getID()).thenReturn(jobId);
((AppContext) getContext())
.getAllJobs().put(jobImpl.getID(), jobImpl);
return jobImpl;
}
@Override
public void start() {
super.start();
DefaultMetricsSystem.shutdown();
}
@Override
public void notifyIsLastAMRetry(boolean isLastAMRetry){
testIsLastAMRetry = isLastAMRetry;
super.notifyIsLastAMRetry(isLastAMRetry);
}
@Override
public RMHeartbeatHandler getRMHeartbeatHandler() {
return getStubbedHeartbeatHandler(getContext());
@ -197,6 +288,9 @@ public Configuration getConfig() {
protected void downloadTokensAndSetupUGI(Configuration conf) {
}
public boolean getTestIsLastAMRetry(){
return testIsLastAMRetry;
}
}
private final class MRAppTestCleanup extends MRApp {
@ -288,7 +382,7 @@ public void runOnNextHeartbeat(Runnable callback) {
};
}
@Test
@Test(timeout=20000)
public void testStagingCleanupOrder() throws Exception {
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
this.getClass().getName(), true);

View File

@ -192,6 +192,68 @@ public void testCheckJobCompleteSuccess() throws Exception {
commitHandler.stop();
}
@Test(timeout=20000)
public void testRebootedDuringSetup() throws Exception{
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = new StubbedOutputCommitter() {
@Override
public synchronized void setupJob(JobContext jobContext)
throws IOException {
while(!Thread.interrupted()){
try{
wait();
}catch (InterruptedException e) {
}
}
}
};
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
assertJobState(job, JobStateInternal.REBOOT);
dispatcher.stop();
commitHandler.stop();
}
@Test(timeout=20000)
public void testRebootedDuringCommit() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
CyclicBarrier syncBarrier = new CyclicBarrier(2);
OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
syncBarrier.await();
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
assertJobState(job, JobStateInternal.REBOOT);
dispatcher.stop();
commitHandler.stop();
}
@Test(timeout=20000)
public void testKilledDuringSetup() throws Exception {
Configuration conf = new Configuration();