MAPREDUCE-5562. Fixed MR App Master to perform pending tasks like staging-dir cleanup, sending job-end notification correctly when unregister with RM fails. Contributed by Zhijie Shen.
svn merge --ignore-ancestry -c 1529682 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1529683 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4b32d6ad9a
commit
ad9515fa9b
|
@ -145,6 +145,10 @@ Release 2.1.2 - UNRELEASED
|
|||
aren't heart-beating for a while, so that we can aggressively speculate
|
||||
instead of waiting for task-timeout (Xuan Gong via vinodkv)
|
||||
|
||||
MAPREDUCE-5562. Fixed MR App Master to perform pending tasks like staging-dir
|
||||
cleanup, sending job-end notification correctly when unregister with RM
|
||||
fails. (Zhijie Shen via vinodkv)
|
||||
|
||||
Release 2.1.1-beta - 2013-09-23
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -64,6 +64,6 @@ public interface AppContext {
|
|||
|
||||
boolean isLastAMRetry();
|
||||
|
||||
boolean safeToReportTerminationToUser();
|
||||
boolean hasSuccessfullyUnregistered();
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,21 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.app;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -27,20 +41,37 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.mapred.*;
|
||||
import org.apache.hadoop.mapreduce.*;
|
||||
import org.apache.hadoop.mapred.FileOutputCommitter;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.LocalContainerLauncher;
|
||||
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
|
||||
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.*;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.EventType;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.*;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
|
||||
|
@ -51,14 +82,26 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.*;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.*;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
||||
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
|
||||
|
@ -95,14 +138,7 @@ import org.apache.hadoop.yarn.util.Clock;
|
|||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.*;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* The Map-Reduce Application Master.
|
||||
|
@ -166,7 +202,8 @@ public class MRAppMaster extends CompositeService {
|
|||
private Credentials jobCredentials = new Credentials(); // Filled during init
|
||||
protected UserGroupInformation currentUser; // Will be setup during init
|
||||
|
||||
private volatile boolean isLastAMRetry = false;
|
||||
@VisibleForTesting
|
||||
protected volatile boolean isLastAMRetry = false;
|
||||
//Something happened and we should shut down right after we start up.
|
||||
boolean errorHappenedShutDown = false;
|
||||
private String shutDownMessage = null;
|
||||
|
@ -175,7 +212,7 @@ public class MRAppMaster extends CompositeService {
|
|||
private long recoveredJobStartTime = 0;
|
||||
|
||||
@VisibleForTesting
|
||||
protected AtomicBoolean safeToReportTerminationToUser =
|
||||
protected AtomicBoolean successfullyUnregistered =
|
||||
new AtomicBoolean(false);
|
||||
|
||||
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
|
||||
|
@ -208,14 +245,14 @@ public class MRAppMaster extends CompositeService {
|
|||
|
||||
initJobCredentialsAndUGI(conf);
|
||||
|
||||
isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
|
||||
context = new RunningAppContext(conf);
|
||||
|
||||
((RunningAppContext)context).computeIsLastAMRetry();
|
||||
LOG.info("The specific max attempts: " + maxAppAttempts +
|
||||
" for application: " + appAttemptID.getApplicationId().getId() +
|
||||
". Attempt num: " + appAttemptID.getAttemptId() +
|
||||
" is last retry: " + isLastAMRetry);
|
||||
|
||||
context = new RunningAppContext(conf);
|
||||
|
||||
// Job name is the same as the app name util we support DAG of jobs
|
||||
// for an app later
|
||||
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
|
||||
|
@ -511,11 +548,6 @@ public class MRAppMaster extends CompositeService {
|
|||
MRAppMaster.this.stop();
|
||||
|
||||
if (isLastAMRetry) {
|
||||
// Except ClientService, other services are already stopped, it is safe to
|
||||
// let clients know the final states. ClientService should wait for some
|
||||
// time so clients have enough time to know the final states.
|
||||
safeToReportTerminationToUser.set(true);
|
||||
|
||||
// Send job-end notification when it is safe to report termination to
|
||||
// users and it is the last AM retry
|
||||
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
|
||||
|
@ -524,7 +556,14 @@ public class MRAppMaster extends CompositeService {
|
|||
+ job.getReport().getJobId());
|
||||
JobEndNotifier notifier = new JobEndNotifier();
|
||||
notifier.setConf(getConfig());
|
||||
notifier.notify(job.getReport());
|
||||
JobReport report = job.getReport();
|
||||
// If unregistration fails, the final state is unavailable. However,
|
||||
// at the last AM Retry, the client will finally be notified FAILED
|
||||
// from RM, so we should let users know FAILED via notifier as well
|
||||
if (!context.hasSuccessfullyUnregistered()) {
|
||||
report.setJobState(JobState.FAILED);
|
||||
}
|
||||
notifier.notify(report);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Job end notification interrupted for jobID : "
|
||||
+ job.getReport().getJobId(), ie);
|
||||
|
@ -863,7 +902,7 @@ public class MRAppMaster extends CompositeService {
|
|||
}
|
||||
}
|
||||
|
||||
private class RunningAppContext implements AppContext {
|
||||
public class RunningAppContext implements AppContext {
|
||||
|
||||
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
|
||||
private final Configuration conf;
|
||||
|
@ -942,8 +981,16 @@ public class MRAppMaster extends CompositeService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean safeToReportTerminationToUser() {
|
||||
return safeToReportTerminationToUser.get();
|
||||
public boolean hasSuccessfullyUnregistered() {
|
||||
return successfullyUnregistered.get();
|
||||
}
|
||||
|
||||
public void markSuccessfulUnregistration() {
|
||||
successfullyUnregistered.set(true);
|
||||
}
|
||||
|
||||
public void computeIsLastAMRetry() {
|
||||
isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -128,8 +128,6 @@ import org.apache.hadoop.yarn.state.StateMachine;
|
|||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/** Implementation of Job interface. Maintains the state machines of Job.
|
||||
* The read and write calls use ReadWriteLock for concurrency.
|
||||
*/
|
||||
|
@ -933,7 +931,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
readLock.lock();
|
||||
try {
|
||||
JobState state = getExternalState(getInternalState());
|
||||
if (!appContext.safeToReportTerminationToUser()
|
||||
if (!appContext.hasSuccessfullyUnregistered()
|
||||
&& (state == JobState.SUCCEEDED || state == JobState.FAILED
|
||||
|| state == JobState.KILLED || state == JobState.ERROR)) {
|
||||
return lastNonFinalState;
|
||||
|
|
|
@ -29,11 +29,11 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
|
@ -52,10 +52,13 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Registers/unregisters to RM and sends heartbeats to RM.
|
||||
*/
|
||||
|
@ -171,41 +174,57 @@ public abstract class RMCommunicator extends AbstractService
|
|||
|
||||
protected void unregister() {
|
||||
try {
|
||||
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
|
||||
JobImpl jobImpl = (JobImpl)job;
|
||||
if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) {
|
||||
finishState = FinalApplicationStatus.SUCCEEDED;
|
||||
} else if (jobImpl.getInternalState() == JobStateInternal.KILLED
|
||||
|| (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) {
|
||||
finishState = FinalApplicationStatus.KILLED;
|
||||
} else if (jobImpl.getInternalState() == JobStateInternal.FAILED
|
||||
|| jobImpl.getInternalState() == JobStateInternal.ERROR) {
|
||||
finishState = FinalApplicationStatus.FAILED;
|
||||
}
|
||||
StringBuffer sb = new StringBuffer();
|
||||
for (String s : job.getDiagnostics()) {
|
||||
sb.append(s).append("\n");
|
||||
}
|
||||
LOG.info("Setting job diagnostics to " + sb.toString());
|
||||
|
||||
String historyUrl =
|
||||
MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(),
|
||||
context.getApplicationID());
|
||||
LOG.info("History url is " + historyUrl);
|
||||
FinishApplicationMasterRequest request =
|
||||
FinishApplicationMasterRequest.newInstance(finishState,
|
||||
sb.toString(), historyUrl);
|
||||
while (true) {
|
||||
FinishApplicationMasterResponse response =
|
||||
scheduler.finishApplicationMaster(request);
|
||||
if (response.getIsUnregistered()) {
|
||||
break;
|
||||
}
|
||||
LOG.info("Waiting for application to be successfully unregistered.");
|
||||
Thread.sleep(rmPollInterval);
|
||||
}
|
||||
doUnregistration();
|
||||
} catch(Exception are) {
|
||||
LOG.error("Exception while unregistering ", are);
|
||||
// if unregistration failed, isLastAMRetry needs to be recalculated
|
||||
// to see whether AM really has the chance to retry
|
||||
RunningAppContext raContext = (RunningAppContext) context;
|
||||
raContext.computeIsLastAMRetry();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void doUnregistration()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
|
||||
JobImpl jobImpl = (JobImpl)job;
|
||||
if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) {
|
||||
finishState = FinalApplicationStatus.SUCCEEDED;
|
||||
} else if (jobImpl.getInternalState() == JobStateInternal.KILLED
|
||||
|| (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) {
|
||||
finishState = FinalApplicationStatus.KILLED;
|
||||
} else if (jobImpl.getInternalState() == JobStateInternal.FAILED
|
||||
|| jobImpl.getInternalState() == JobStateInternal.ERROR) {
|
||||
finishState = FinalApplicationStatus.FAILED;
|
||||
}
|
||||
StringBuffer sb = new StringBuffer();
|
||||
for (String s : job.getDiagnostics()) {
|
||||
sb.append(s).append("\n");
|
||||
}
|
||||
LOG.info("Setting job diagnostics to " + sb.toString());
|
||||
|
||||
String historyUrl =
|
||||
MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(),
|
||||
context.getApplicationID());
|
||||
LOG.info("History url is " + historyUrl);
|
||||
FinishApplicationMasterRequest request =
|
||||
FinishApplicationMasterRequest.newInstance(finishState,
|
||||
sb.toString(), historyUrl);
|
||||
while (true) {
|
||||
FinishApplicationMasterResponse response =
|
||||
scheduler.finishApplicationMaster(request);
|
||||
if (response.getIsUnregistered()) {
|
||||
// When excepting ClientService, other services are already stopped,
|
||||
// it is safe to let clients know the final states. ClientService
|
||||
// should wait for some time so clients have enough time to know the
|
||||
// final states.
|
||||
RunningAppContext raContext = (RunningAppContext) context;
|
||||
raContext.markSuccessfulUnregistration();
|
||||
break;
|
||||
}
|
||||
LOG.info("Waiting for application to be successfully unregistered.");
|
||||
Thread.sleep(rmPollInterval);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -235,7 +254,6 @@ public abstract class RMCommunicator extends AbstractService
|
|||
|
||||
protected void startAllocatorThread() {
|
||||
allocatorThread = new Thread(new Runnable() {
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void run() {
|
||||
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
|
||||
|
|
|
@ -136,9 +136,9 @@ public class MRApp extends MRAppMaster {
|
|||
}
|
||||
|
||||
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
||||
boolean cleanOnStart, Clock clock, boolean shutdown) {
|
||||
boolean cleanOnStart, Clock clock, boolean unregistered) {
|
||||
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock,
|
||||
shutdown);
|
||||
unregistered);
|
||||
}
|
||||
|
||||
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
||||
|
@ -147,8 +147,8 @@ public class MRApp extends MRAppMaster {
|
|||
}
|
||||
|
||||
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
||||
boolean cleanOnStart, boolean shutdown) {
|
||||
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, shutdown);
|
||||
boolean cleanOnStart, boolean unregistered) {
|
||||
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, unregistered);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -181,16 +181,16 @@ public class MRApp extends MRAppMaster {
|
|||
}
|
||||
|
||||
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
||||
boolean cleanOnStart, int startCount, boolean shutdown) {
|
||||
boolean cleanOnStart, int startCount, boolean unregistered) {
|
||||
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
|
||||
new SystemClock(), shutdown);
|
||||
new SystemClock(), unregistered);
|
||||
}
|
||||
|
||||
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
||||
boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) {
|
||||
boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) {
|
||||
this(getApplicationAttemptId(applicationId, startCount), getContainerId(
|
||||
applicationId, startCount), maps, reduces, autoComplete, testName,
|
||||
cleanOnStart, startCount, clock, shutdown);
|
||||
cleanOnStart, startCount, clock, unregistered);
|
||||
}
|
||||
|
||||
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
||||
|
@ -202,9 +202,9 @@ public class MRApp extends MRAppMaster {
|
|||
|
||||
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
|
||||
int maps, int reduces, boolean autoComplete, String testName,
|
||||
boolean cleanOnStart, int startCount, boolean shutdown) {
|
||||
boolean cleanOnStart, int startCount, boolean unregistered) {
|
||||
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
|
||||
cleanOnStart, startCount, new SystemClock(), shutdown);
|
||||
cleanOnStart, startCount, new SystemClock(), unregistered);
|
||||
}
|
||||
|
||||
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
|
||||
|
@ -216,7 +216,7 @@ public class MRApp extends MRAppMaster {
|
|||
|
||||
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
|
||||
int maps, int reduces, boolean autoComplete, String testName,
|
||||
boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) {
|
||||
boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) {
|
||||
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
|
||||
.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
|
||||
this.testWorkDir = new File("target", testName);
|
||||
|
@ -237,7 +237,7 @@ public class MRApp extends MRAppMaster {
|
|||
this.autoComplete = autoComplete;
|
||||
// If safeToReportTerminationToUser is set to true, we can verify whether
|
||||
// the job can reaches the final state when MRAppMaster shuts down.
|
||||
this.safeToReportTerminationToUser.set(shutdown);
|
||||
this.successfullyUnregistered.set(unregistered);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -137,7 +137,7 @@ public class MockAppContext implements AppContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean safeToReportTerminationToUser() {
|
||||
public boolean hasSuccessfullyUnregistered() {
|
||||
// bogus - Not Required
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app;
|
|||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -41,10 +42,16 @@ import org.apache.hadoop.mapred.JobContext;
|
|||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -185,25 +192,19 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testNotificationOnNormalShutdown() throws Exception {
|
||||
public void testNotificationOnLastRetryNormalShutdown() throws Exception {
|
||||
HttpServer server = startHttpServer();
|
||||
// Act like it is the second attempt. Default max attempts is 2
|
||||
MRApp app = spy(new MRApp(2, 2, true, this.getClass().getName(), true, 2));
|
||||
// Make use of safeToReportflag so that we can look at final job-state as
|
||||
// seen by real users.
|
||||
app.safeToReportTerminationToUser.set(false);
|
||||
MRApp app = spy(new MRAppWithCustomContainerAllocator(
|
||||
2, 2, true, this.getClass().getName(), true, 2, true));
|
||||
doNothing().when(app).sysexit();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
|
||||
JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
|
||||
JobImpl job = (JobImpl)app.submit(conf);
|
||||
// Even though auto-complete is true, because app is not shut-down yet, user
|
||||
// will only see RUNNING state.
|
||||
app.waitForInternalState(job, JobStateInternal.SUCCEEDED);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
// Now shutdown. User should see SUCCEEDED state.
|
||||
// Unregistration succeeds: successfullyUnregistered is set
|
||||
app.shutDownJob();
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
Assert.assertEquals(true, app.isLastAMRetry());
|
||||
Assert.assertEquals(1, JobEndServlet.calledTimes);
|
||||
Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED",
|
||||
|
@ -214,24 +215,25 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testNotificationOnNonLastRetryShutdown() throws Exception {
|
||||
public void testAbsentNotificationOnNotLastRetryUnregistrationFailure()
|
||||
throws Exception {
|
||||
HttpServer server = startHttpServer();
|
||||
MRApp app = spy(new MRApp(2, 2, false, this.getClass().getName(), true));
|
||||
MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
|
||||
this.getClass().getName(), true, 1, false));
|
||||
doNothing().when(app).sysexit();
|
||||
// Make use of safeToReportflag so that we can look at final job-state as
|
||||
// seen by real users.
|
||||
app.safeToReportTerminationToUser.set(false);
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
|
||||
JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
|
||||
JobImpl job = (JobImpl)app.submit(new Configuration());
|
||||
JobImpl job = (JobImpl)app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
app.getContext().getEventHandler()
|
||||
.handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT));
|
||||
app.waitForInternalState(job, JobStateInternal.REBOOT);
|
||||
// Now shutdown.
|
||||
// Unregistration fails: isLastAMRetry is recalculated, this is not
|
||||
app.shutDownJob();
|
||||
// Not the last AM attempt. So user should that the job is still running.
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
app.shutDownJob();
|
||||
Assert.assertEquals(false, app.isLastAMRetry());
|
||||
Assert.assertEquals(0, JobEndServlet.calledTimes);
|
||||
Assert.assertEquals(null, JobEndServlet.requestUri);
|
||||
|
@ -239,6 +241,33 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
|||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotificationOnLastRetryUnregistrationFailure()
|
||||
throws Exception {
|
||||
HttpServer server = startHttpServer();
|
||||
MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
|
||||
this.getClass().getName(), true, 2, false));
|
||||
doNothing().when(app).sysexit();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
|
||||
JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
|
||||
JobImpl job = (JobImpl)app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
app.getContext().getEventHandler()
|
||||
.handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT));
|
||||
app.waitForInternalState(job, JobStateInternal.REBOOT);
|
||||
// Now shutdown. User should see FAILED state.
|
||||
// Unregistration fails: isLastAMRetry is recalculated, this is
|
||||
app.shutDownJob();
|
||||
Assert.assertEquals(true, app.isLastAMRetry());
|
||||
Assert.assertEquals(1, JobEndServlet.calledTimes);
|
||||
Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED",
|
||||
JobEndServlet.requestUri.getQuery());
|
||||
Assert.assertEquals(JobState.FAILED.toString(),
|
||||
JobEndServlet.foundJobState);
|
||||
server.stop();
|
||||
}
|
||||
|
||||
private static HttpServer startHttpServer() throws Exception {
|
||||
new File(System.getProperty(
|
||||
"build.webapps", "build/webapps") + "/test").mkdirs();
|
||||
|
@ -280,4 +309,83 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
|||
}
|
||||
}
|
||||
|
||||
private class MRAppWithCustomContainerAllocator extends MRApp {
|
||||
|
||||
private boolean crushUnregistration;
|
||||
|
||||
public MRAppWithCustomContainerAllocator(int maps, int reduces,
|
||||
boolean autoComplete, String testName, boolean cleanOnStart,
|
||||
int startCount, boolean crushUnregistration) {
|
||||
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
|
||||
false);
|
||||
this.crushUnregistration = crushUnregistration;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerAllocator createContainerAllocator(
|
||||
ClientService clientService, AppContext context) {
|
||||
context = spy(context);
|
||||
when(context.getEventHandler()).thenReturn(null);
|
||||
when(context.getApplicationID()).thenReturn(null);
|
||||
return new CustomContainerAllocator(this, context);
|
||||
}
|
||||
|
||||
private class CustomContainerAllocator
|
||||
extends RMCommunicator
|
||||
implements ContainerAllocator, RMHeartbeatHandler {
|
||||
private MRAppWithCustomContainerAllocator app;
|
||||
private MRAppContainerAllocator allocator =
|
||||
new MRAppContainerAllocator();
|
||||
|
||||
public CustomContainerAllocator(
|
||||
MRAppWithCustomContainerAllocator app, AppContext context) {
|
||||
super(null, context);
|
||||
this.app = app;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStart() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStop() {
|
||||
unregister();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doUnregistration()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
if (crushUnregistration) {
|
||||
app.successfullyUnregistered.set(true);
|
||||
} else {
|
||||
throw new YarnException("test exception");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ContainerAllocatorEvent event) {
|
||||
allocator.handle(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastHeartbeatTime() {
|
||||
return allocator.getLastHeartbeatTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runOnNextHeartbeat(Runnable callback) {
|
||||
allocator.runOnNextHeartbeat(callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void heartbeat() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Iterator;
|
|||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
|
@ -44,7 +43,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
|
@ -55,15 +53,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
|||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -384,12 +379,13 @@ public class TestMRApp {
|
|||
// AM is not unregistered
|
||||
Assert.assertEquals(JobState.RUNNING, job.getState());
|
||||
// imitate that AM is unregistered
|
||||
app.safeToReportTerminationToUser.set(true);
|
||||
app.successfullyUnregistered.set(true);
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobRebootNotLastRetry() throws Exception {
|
||||
public void testJobRebootNotLastRetryOnUnregistrationFailure()
|
||||
throws Exception {
|
||||
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
|
||||
Job job = app.submit(new Configuration());
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
|
@ -408,10 +404,12 @@ public class TestMRApp {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testJobRebootOnLastRetry() throws Exception {
|
||||
public void testJobRebootOnLastRetryOnUnregistrationFailure()
|
||||
throws Exception {
|
||||
// make startCount as 2 since this is last retry which equals to
|
||||
// DEFAULT_MAX_AM_RETRY
|
||||
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2);
|
||||
// The last param mocks the unregistration failure
|
||||
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2, false);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
Job job = app.submit(conf);
|
||||
|
@ -425,8 +423,10 @@ public class TestMRApp {
|
|||
app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
|
||||
// return exteranl state as ERROR if this is the last retry
|
||||
app.waitForState(job, JobState.ERROR);
|
||||
app.waitForInternalState((JobImpl) job, JobStateInternal.REBOOT);
|
||||
// return exteranl state as RUNNING if this is the last retry while
|
||||
// unregistration fails
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
}
|
||||
|
||||
private final class MRAppWithSpiedJob extends MRApp {
|
||||
|
|
|
@ -869,7 +869,7 @@ public class TestRuntimeEstimators {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean safeToReportTerminationToUser() {
|
||||
public boolean hasSuccessfullyUnregistered() {
|
||||
// bogus - Not Required
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app;
|
|||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -36,18 +37,17 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
|
@ -57,7 +57,7 @@ import org.apache.hadoop.service.Service;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
@ -75,7 +75,44 @@ import org.junit.Test;
|
|||
private Path stagingJobPath = new Path(stagingJobDir);
|
||||
private final static RecordFactory recordFactory = RecordFactoryProvider.
|
||||
getRecordFactory(null);
|
||||
|
||||
|
||||
@Test
|
||||
public void testDeletionofStagingOnUnregistrationFailure()
|
||||
throws IOException {
|
||||
testDeletionofStagingOnUnregistrationFailure(2, false);
|
||||
testDeletionofStagingOnUnregistrationFailure(1, true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
private void testDeletionofStagingOnUnregistrationFailure(
|
||||
int maxAttempts, boolean shouldHaveDeleted) throws IOException {
|
||||
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
|
||||
fs = mock(FileSystem.class);
|
||||
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
|
||||
//Staging Dir exists
|
||||
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
|
||||
when(fs.exists(stagingDir)).thenReturn(true);
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||
JobId jobid = recordFactory.newRecordInstance(JobId.class);
|
||||
jobid.setAppId(appId);
|
||||
TestMRApp appMaster = new TestMRApp(attemptId, null,
|
||||
JobStateInternal.RUNNING, maxAttempts);
|
||||
appMaster.crushUnregistration = true;
|
||||
appMaster.init(conf);
|
||||
appMaster.start();
|
||||
appMaster.shutDownJob();
|
||||
((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry();
|
||||
if (shouldHaveDeleted) {
|
||||
Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry());
|
||||
verify(fs).delete(stagingJobPath, true);
|
||||
} else {
|
||||
Assert.assertEquals(new Boolean(false), appMaster.isLastAMRetry());
|
||||
verify(fs, never()).delete(stagingJobPath, true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeletionofStaging() throws IOException {
|
||||
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
|
||||
|
@ -204,6 +241,7 @@ import org.junit.Test;
|
|||
ContainerAllocator allocator;
|
||||
boolean testIsLastAMRetry = false;
|
||||
JobStateInternal jobStateInternal;
|
||||
boolean crushUnregistration = false;
|
||||
|
||||
public TestMRApp(ApplicationAttemptId applicationAttemptId,
|
||||
ContainerAllocator allocator, int maxAppAttempts) {
|
||||
|
@ -211,6 +249,7 @@ import org.junit.Test;
|
|||
applicationAttemptId, 1), "testhost", 2222, 3333,
|
||||
System.currentTimeMillis(), maxAppAttempts);
|
||||
this.allocator = allocator;
|
||||
this.successfullyUnregistered.set(true);
|
||||
}
|
||||
|
||||
public TestMRApp(ApplicationAttemptId applicationAttemptId,
|
||||
|
@ -229,7 +268,11 @@ import org.junit.Test;
|
|||
protected ContainerAllocator createContainerAllocator(
|
||||
final ClientService clientService, final AppContext context) {
|
||||
if(allocator == null) {
|
||||
return super.createContainerAllocator(clientService, context);
|
||||
if (crushUnregistration) {
|
||||
return new CustomContainerAllocator(context);
|
||||
} else {
|
||||
return super.createContainerAllocator(clientService, context);
|
||||
}
|
||||
}
|
||||
return allocator;
|
||||
}
|
||||
|
@ -280,6 +323,41 @@ import org.junit.Test;
|
|||
public boolean getTestIsLastAMRetry(){
|
||||
return testIsLastAMRetry;
|
||||
}
|
||||
|
||||
private class CustomContainerAllocator extends RMCommunicator
|
||||
implements ContainerAllocator {
|
||||
|
||||
public CustomContainerAllocator(AppContext context) {
|
||||
super(null, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStart() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStop() {
|
||||
unregister();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doUnregistration()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
throw new YarnException("test exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void heartbeat() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ContainerAllocatorEvent event) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class MRAppTestCleanup extends MRApp {
|
||||
|
|
|
@ -275,7 +275,7 @@ public class TestJobImpl {
|
|||
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
when(mockContext.isLastAMRetry()).thenReturn(true);
|
||||
when(mockContext.safeToReportTerminationToUser()).thenReturn(false);
|
||||
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
|
||||
completeJobTasks(job);
|
||||
assertJobState(job, JobStateInternal.COMMITTING);
|
||||
|
@ -285,7 +285,7 @@ public class TestJobImpl {
|
|||
assertJobState(job, JobStateInternal.REBOOT);
|
||||
// return the external state as ERROR since this is last retry.
|
||||
Assert.assertEquals(JobState.RUNNING, job.getState());
|
||||
when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
|
||||
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
|
||||
Assert.assertEquals(JobState.ERROR, job.getState());
|
||||
|
||||
dispatcher.stop();
|
||||
|
@ -594,7 +594,7 @@ public class TestJobImpl {
|
|||
new JobDiagnosticsUpdateEvent(jobId, diagMsg);
|
||||
MRAppMetrics mrAppMetrics = MRAppMetrics.create();
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
|
||||
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
|
||||
JobImpl job = new JobImpl(jobId, Records
|
||||
.newRecord(ApplicationAttemptId.class), new Configuration(),
|
||||
mock(EventHandler.class),
|
||||
|
@ -705,7 +705,7 @@ public class TestJobImpl {
|
|||
commitHandler.start();
|
||||
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
when(mockContext.safeToReportTerminationToUser()).thenReturn(false);
|
||||
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
|
||||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
|
@ -722,7 +722,7 @@ public class TestJobImpl {
|
|||
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
|
||||
assertJobState(job, JobStateInternal.FAILED);
|
||||
Assert.assertEquals(JobState.RUNNING, job.getState());
|
||||
when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
|
||||
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
|
||||
Assert.assertEquals(JobState.FAILED, job.getState());
|
||||
|
||||
dispatcher.stop();
|
||||
|
@ -762,7 +762,7 @@ public class TestJobImpl {
|
|||
JobId jobId = TypeConverter.toYarn(jobID);
|
||||
if (appContext == null) {
|
||||
appContext = mock(AppContext.class);
|
||||
when(appContext.safeToReportTerminationToUser()).thenReturn(true);
|
||||
when(appContext.hasSuccessfullyUnregistered()).thenReturn(true);
|
||||
}
|
||||
StubbedJob job = new StubbedJob(jobId,
|
||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
|
||||
|
|
|
@ -88,6 +88,10 @@ public class TestLocalContainerAllocator {
|
|||
protected void register() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void unregister() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void startAllocatorThread() {
|
||||
allocatorThread = new Thread();
|
||||
|
|
|
@ -389,7 +389,7 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean safeToReportTerminationToUser() {
|
||||
public boolean hasSuccessfullyUnregistered() {
|
||||
// bogus - Not Required
|
||||
return true;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue