MAPREDUCE-5505. Clients should be notified job finished only after job successfully unregistered (Zhijie Shen via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1526071 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-09-25 00:46:48 +00:00
parent 10a4289ebf
commit 524dad1109
10 changed files with 134 additions and 14 deletions

View File

@ -216,6 +216,9 @@ Release 2.1.2 - UNRELEASED
needs to set up its own certificates etc and not depend on clusters'. needs to set up its own certificates etc and not depend on clusters'.
(Omkar Vinit Joshi via vinodkv) (Omkar Vinit Joshi via vinodkv)
MAPREDUCE-5505. Clients should be notified job finished only after job
successfully unregistered (Zhijie Shen via bikas)
Release 2.1.1-beta - 2013-09-23 Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -63,4 +63,7 @@ public interface AppContext {
ClientToAMTokenSecretManager getClientToAMTokenSecretManager(); ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
boolean isLastAMRetry(); boolean isLastAMRetry();
boolean safeToReportTerminationToUser();
} }

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -209,6 +210,10 @@ public class MRAppMaster extends CompositeService {
private long recoveredJobStartTime = 0; private long recoveredJobStartTime = 0;
@VisibleForTesting
protected AtomicBoolean safeToReportTerminationToUser =
new AtomicBoolean(false);
public MRAppMaster(ApplicationAttemptId applicationAttemptId, public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime, int maxAppAttempts) { long appSubmitTime, int maxAppAttempts) {
@ -554,8 +559,10 @@ public class MRAppMaster extends CompositeService {
LOG.info("Calling stop for all the services"); LOG.info("Calling stop for all the services");
MRAppMaster.this.stop(); MRAppMaster.this.stop();
// TODO: Stop ClientService last, since only ClientService should wait for // Except ClientService, other services are already stopped, it is safe to
// some time so clients can know the final states. Will be removed once RM come on. // let clients know the final states. ClientService should wait for some
// time so clients have enough time to know the final states.
safeToReportTerminationToUser.set(true);
try { try {
Thread.sleep(5000); Thread.sleep(5000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -964,6 +971,11 @@ public class MRAppMaster extends CompositeService {
public boolean isLastAMRetry(){ public boolean isLastAMRetry(){
return isLastAMRetry; return isLastAMRetry;
} }
@Override
public boolean safeToReportTerminationToUser() {
return safeToReportTerminationToUser.get();
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -641,6 +641,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
private ScheduledFuture failWaitTriggerScheduledFuture; private ScheduledFuture failWaitTriggerScheduledFuture;
private JobState lastNonFinalState = JobState.NEW;
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler, Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, TaskAttemptListener taskAttemptListener,
@ -928,7 +930,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
public JobState getState() { public JobState getState() {
readLock.lock(); readLock.lock();
try { try {
return getExternalState(getInternalState()); JobState state = getExternalState(getInternalState());
if (!appContext.safeToReportTerminationToUser()
&& (state == JobState.SUCCEEDED || state == JobState.FAILED
|| state == JobState.KILLED || state == JobState.ERROR)) {
return lastNonFinalState;
} else {
return state;
}
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -972,6 +981,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if (oldState != getInternalState()) { if (oldState != getInternalState()) {
LOG.info(jobId + "Job Transitioned from " + oldState + " to " LOG.info(jobId + "Job Transitioned from " + oldState + " to "
+ getInternalState()); + getInternalState());
rememberLastNonFinalState(oldState);
} }
} }
@ -980,6 +990,15 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
} }
} }
private void rememberLastNonFinalState(JobStateInternal stateInternal) {
JobState state = getExternalState(stateInternal);
// if state is not the final state, set lastNonFinalState
if (state != JobState.SUCCEEDED && state != JobState.FAILED
&& state != JobState.KILLED && state != JobState.ERROR) {
lastNonFinalState = state;
}
}
@Private @Private
public JobStateInternal getInternalState() { public JobStateInternal getInternalState() {
readLock.lock(); readLock.lock();

View File

@ -135,11 +135,22 @@ public class MRApp extends MRAppMaster {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock); this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, Clock clock, boolean shutdown) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock,
shutdown);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart) { boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1); this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, boolean shutdown) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, shutdown);
}
@Override @Override
protected void initJobCredentialsAndUGI(Configuration conf) { protected void initJobCredentialsAndUGI(Configuration conf) {
// Fake a shuffle secret that normally is provided by the job client. // Fake a shuffle secret that normally is provided by the job client.
@ -169,23 +180,43 @@ public class MRApp extends MRAppMaster {
new SystemClock()); new SystemClock());
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean shutdown) {
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
new SystemClock(), shutdown);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount, clock, shutdown);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock) { boolean cleanOnStart, int startCount, Clock clock) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId( this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName, applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount, clock); cleanOnStart, startCount, clock, true);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean shutdown) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), shutdown);
} }
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) { boolean cleanOnStart, int startCount) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock()); cleanOnStart, startCount, new SystemClock(), true);
} }
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock) { boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
this.testWorkDir = new File("target", testName); this.testWorkDir = new File("target", testName);
@ -204,6 +235,9 @@ public class MRApp extends MRAppMaster {
this.maps = maps; this.maps = maps;
this.reduces = reduces; this.reduces = reduces;
this.autoComplete = autoComplete; this.autoComplete = autoComplete;
// If safeToReportTerminationToUser is set to true, we can verify whether
// the job can reaches the final state when MRAppMaster shuts down.
this.safeToReportTerminationToUser.set(shutdown);
} }
@Override @Override

View File

@ -135,4 +135,11 @@ public class MockAppContext implements AppContext {
public boolean isLastAMRetry() { public boolean isLastAMRetry() {
return false; return false;
} }
@Override
public boolean safeToReportTerminationToUser() {
// bogus - Not Required
return true;
}
} }

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@ -374,6 +375,19 @@ public class TestMRApp {
app.waitForState(job, JobState.ERROR); app.waitForState(job, JobState.ERROR);
} }
@SuppressWarnings("resource")
@Test
public void testJobSuccess() throws Exception {
MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true, false);
JobImpl job = (JobImpl) app.submit(new Configuration());
app.waitForInternalState(job, JobStateInternal.SUCCEEDED);
// AM is not unregistered
Assert.assertEquals(JobState.RUNNING, job.getState());
// imitate that AM is unregistered
app.safeToReportTerminationToUser.set(true);
app.waitForState(job, JobState.SUCCEEDED);
}
@Test @Test
public void testJobRebootNotLastRetry() throws Exception { public void testJobRebootNotLastRetry() throws Exception {
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true); MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);

View File

@ -867,5 +867,12 @@ public class TestRuntimeEstimators {
public boolean isLastAMRetry() { public boolean isLastAMRetry() {
return false; return false;
} }
@Override
public boolean safeToReportTerminationToUser() {
// bogus - Not Required
return true;
}
} }
} }

View File

@ -275,6 +275,7 @@ public class TestJobImpl {
AppContext mockContext = mock(AppContext.class); AppContext mockContext = mock(AppContext.class);
when(mockContext.isLastAMRetry()).thenReturn(true); when(mockContext.isLastAMRetry()).thenReturn(true);
when(mockContext.safeToReportTerminationToUser()).thenReturn(false);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
completeJobTasks(job); completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING); assertJobState(job, JobStateInternal.COMMITTING);
@ -282,7 +283,9 @@ public class TestJobImpl {
syncBarrier.await(); syncBarrier.await();
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
assertJobState(job, JobStateInternal.REBOOT); assertJobState(job, JobStateInternal.REBOOT);
// return the external state as FAILED since this is last retry. // return the external state as ERROR since this is last retry.
Assert.assertEquals(JobState.RUNNING, job.getState());
when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
Assert.assertEquals(JobState.ERROR, job.getState()); Assert.assertEquals(JobState.ERROR, job.getState());
dispatcher.stop(); dispatcher.stop();
@ -590,12 +593,14 @@ public class TestJobImpl {
final JobDiagnosticsUpdateEvent diagUpdateEvent = final JobDiagnosticsUpdateEvent diagUpdateEvent =
new JobDiagnosticsUpdateEvent(jobId, diagMsg); new JobDiagnosticsUpdateEvent(jobId, diagMsg);
MRAppMetrics mrAppMetrics = MRAppMetrics.create(); MRAppMetrics mrAppMetrics = MRAppMetrics.create();
AppContext mockContext = mock(AppContext.class);
when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
JobImpl job = new JobImpl(jobId, Records JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), new Configuration(), .newRecord(ApplicationAttemptId.class), new Configuration(),
mock(EventHandler.class), mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null, new SystemClock(), null,
mrAppMetrics, null, true, null, 0, null, null, null, null); mrAppMetrics, null, true, null, 0, null, mockContext, null, null);
job.handle(diagUpdateEvent); job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics(); String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics); Assert.assertNotNull(diagnostics);
@ -606,7 +611,7 @@ public class TestJobImpl {
mock(EventHandler.class), mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null, new SystemClock(), null,
mrAppMetrics, null, true, null, 0, null, null, null, null); mrAppMetrics, null, true, null, 0, null, mockContext, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent); job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics(); diagnostics = job.getReport().getDiagnostics();
@ -699,7 +704,9 @@ public class TestJobImpl {
commitHandler.init(conf); commitHandler.init(conf);
commitHandler.start(); commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2, null); AppContext mockContext = mock(AppContext.class);
when(mockContext.safeToReportTerminationToUser()).thenReturn(false);
JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
JobId jobId = job.getID(); JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED); assertJobState(job, JobStateInternal.INITED);
@ -707,12 +714,15 @@ public class TestJobImpl {
assertJobState(job, JobStateInternal.FAILED); assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED)); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
Assert.assertEquals(JobState.FAILED, job.getState()); assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED)); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
Assert.assertEquals(JobState.FAILED, job.getState()); assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED)); job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
Assert.assertEquals(JobState.FAILED, job.getState()); assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
assertJobState(job, JobStateInternal.FAILED);
Assert.assertEquals(JobState.RUNNING, job.getState());
when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
Assert.assertEquals(JobState.FAILED, job.getState()); Assert.assertEquals(JobState.FAILED, job.getState());
dispatcher.stop(); dispatcher.stop();
@ -750,6 +760,10 @@ public class TestJobImpl {
Dispatcher dispatcher, int numSplits, AppContext appContext) { Dispatcher dispatcher, int numSplits, AppContext appContext) {
JobID jobID = JobID.forName("job_1234567890000_0001"); JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID); JobId jobId = TypeConverter.toYarn(jobID);
if (appContext == null) {
appContext = mock(AppContext.class);
when(appContext.safeToReportTerminationToUser()).thenReturn(true);
}
StubbedJob job = new StubbedJob(jobId, StubbedJob job = new StubbedJob(jobId,
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext); conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext);

View File

@ -387,4 +387,11 @@ public class JobHistory extends AbstractService implements HistoryContext {
// bogus - Not Required // bogus - Not Required
return false; return false;
} }
@Override
public boolean safeToReportTerminationToUser() {
// bogus - Not Required
return true;
}
} }