YARN-2074. Changed ResourceManager to not count AM preemptions towards app failures. Contributed by Jian He.

svn merge --ignore-ancestry -c 1605106 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1605107 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-06-24 15:16:36 +00:00
parent 7741cc62b4
commit c8dfed19ba
16 changed files with 282 additions and 57 deletions

View File

@ -160,6 +160,9 @@ Release 2.5.0 - UNRELEASED
YARN-2195. Clean a piece of code in ResourceRequest. (Wei Yan via devaraj) YARN-2195. Clean a piece of code in ResourceRequest. (Wei Yan via devaraj)
YARN-2074. Changed ResourceManager to not count AM preemptions towards app
failures. (Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -127,6 +127,7 @@ message ApplicationAttemptStateDataProto {
optional string diagnostics = 6 [default = "N/A"]; optional string diagnostics = 6 [default = "N/A"];
optional int64 start_time = 7; optional int64 start_time = 7;
optional FinalApplicationStatusProto final_application_status = 8; optional FinalApplicationStatusProto final_application_status = 8;
optional int32 am_container_exit_status = 9 [default = -1000];
} }
message RMStateVersionProto { message RMStateVersionProto {

View File

@ -216,7 +216,8 @@ public class FileSystemRMStateStore extends RMStateStore {
attemptStateData.getState(), attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(), attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(), attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus()); attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus());
// assert child node name is same as application attempt id // assert child node name is same as application attempt id
assert attemptId.equals(attemptState.getAttemptId()); assert attemptId.equals(attemptState.getAttemptId());

View File

@ -152,7 +152,8 @@ public class MemoryRMStateStore extends RMStateStore {
attemptStateData.getStartTime(), attemptStateData.getState(), attemptStateData.getStartTime(), attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(), attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(), attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus()); attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus());
ApplicationState appState = ApplicationState appState =
state.getApplicationState().get( state.getApplicationState().get(

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -258,19 +259,21 @@ public abstract class RMStateStore extends AbstractService {
RMAppAttemptState state; RMAppAttemptState state;
String finalTrackingUrl = "N/A"; String finalTrackingUrl = "N/A";
String diagnostics; String diagnostics;
int exitStatus = ContainerExitStatus.INVALID;
FinalApplicationStatus amUnregisteredFinalStatus; FinalApplicationStatus amUnregisteredFinalStatus;
public ApplicationAttemptState(ApplicationAttemptId attemptId, public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials, Container masterContainer, Credentials appAttemptCredentials,
long startTime) { long startTime) {
this(attemptId, masterContainer, appAttemptCredentials, startTime, null, this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
null, "", null); null, "", null, ContainerExitStatus.INVALID);
} }
public ApplicationAttemptState(ApplicationAttemptId attemptId, public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials, Container masterContainer, Credentials appAttemptCredentials,
long startTime, RMAppAttemptState state, String finalTrackingUrl, long startTime, RMAppAttemptState state, String finalTrackingUrl,
String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus) { String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
int exitStatus) {
this.attemptId = attemptId; this.attemptId = attemptId;
this.masterContainer = masterContainer; this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials; this.appAttemptCredentials = appAttemptCredentials;
@ -279,6 +282,7 @@ public abstract class RMStateStore extends AbstractService {
this.finalTrackingUrl = finalTrackingUrl; this.finalTrackingUrl = finalTrackingUrl;
this.diagnostics = diagnostics == null ? "" : diagnostics; this.diagnostics = diagnostics == null ? "" : diagnostics;
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
this.exitStatus = exitStatus;
} }
public Container getMasterContainer() { public Container getMasterContainer() {
@ -305,6 +309,9 @@ public abstract class RMStateStore extends AbstractService {
public FinalApplicationStatus getFinalApplicationStatus() { public FinalApplicationStatus getFinalApplicationStatus() {
return amUnregisteredFinalStatus; return amUnregisteredFinalStatus;
} }
public int getAMContainerExitStatus(){
return this.exitStatus;
}
} }
/** /**

View File

@ -539,11 +539,11 @@ public class ZKRMStateStore extends RMStateStore {
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId, new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials, attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(), attemptStateData.getStartTime(), attemptStateData.getState(),
attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(), attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(), attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus()); attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus());
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} }

View File

@ -43,7 +43,7 @@ public abstract class ApplicationAttemptStateData {
ApplicationAttemptId attemptId, Container container, ApplicationAttemptId attemptId, Container container,
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics, String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus) { FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class); Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId); attemptStateData.setAttemptId(attemptId);
@ -54,6 +54,7 @@ public abstract class ApplicationAttemptStateData {
attemptStateData.setDiagnostics(diagnostics); attemptStateData.setDiagnostics(diagnostics);
attemptStateData.setStartTime(startTime); attemptStateData.setStartTime(startTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
return attemptStateData; return attemptStateData;
} }
@ -69,9 +70,9 @@ public abstract class ApplicationAttemptStateData {
return newInstance(attemptState.getAttemptId(), return newInstance(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens, attemptState.getMasterContainer(), appAttemptTokens,
attemptState.getStartTime(), attemptState.getState(), attemptState.getStartTime(), attemptState.getState(),
attemptState.getFinalTrackingUrl(), attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
attemptState.getDiagnostics(), attemptState.getFinalApplicationStatus(),
attemptState.getFinalApplicationStatus()); attemptState.getAMContainerExitStatus());
} }
public abstract ApplicationAttemptStateDataProto getProto(); public abstract ApplicationAttemptStateDataProto getProto();
@ -150,5 +151,10 @@ public abstract class ApplicationAttemptStateData {
*/ */
public abstract FinalApplicationStatus getFinalApplicationStatus(); public abstract FinalApplicationStatus getFinalApplicationStatus();
public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState); public abstract void setFinalApplicationStatus(
FinalApplicationStatus finishState);
public abstract int getAMContainerExitStatus();
public abstract void setAMContainerExitStatus(int exitStatus);
} }

View File

@ -252,6 +252,19 @@ public class ApplicationAttemptStateDataPBImpl extends
return getProto().hashCode(); return getProto().hashCode();
} }
@Override
public int getAMContainerExitStatus() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
return p.getAmContainerExitStatus();
}
@Override
public void setAMContainerExitStatus(int exitStatus) {
maybeInitBuilder();
builder.setAmContainerExitStatus(exitStatus);
}
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
if (other == null) if (other == null)
@ -281,5 +294,4 @@ public class ApplicationAttemptStateDataPBImpl extends
private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
return ProtoUtils.convertFromProtoFormat(s); return ProtoUtils.convertFromProtoFormat(s);
} }
} }

View File

@ -78,7 +78,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.resource.Resources;
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
public class RMAppImpl implements RMApp, Recoverable { public class RMAppImpl implements RMApp, Recoverable {
@ -686,7 +685,11 @@ public class RMAppImpl implements RMApp, Recoverable {
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
RMAppAttempt attempt = RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf, maxAppAttempts == attempts.size()); submissionContext, conf,
// The newly created attempt maybe last attempt if (number of
// previously NonPreempted attempts + 1) equal to the max-attempt
// limit.
maxAppAttempts == (getNumNonPreemptedAppAttempts() + 1));
attempts.put(appAttemptId, attempt); attempts.put(appAttemptId, attempt);
currentAttempt = attempt; currentAttempt = attempt;
} }
@ -794,7 +797,7 @@ public class RMAppImpl implements RMApp, Recoverable {
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED && (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED || app.currentAttempt.getState() == RMAppAttemptState.FINISHED
|| (app.currentAttempt.getState() == RMAppAttemptState.FAILED || (app.currentAttempt.getState() == RMAppAttemptState.FAILED
&& app.attempts.size() == app.maxAppAttempts))) { && app.getNumNonPreemptedAppAttempts() == app.maxAppAttempts))) {
return RMAppState.ACCEPTED; return RMAppState.ACCEPTED;
} }
@ -885,7 +888,7 @@ public class RMAppImpl implements RMApp, Recoverable {
msg = "Unmanaged application " + this.getApplicationId() msg = "Unmanaged application " + this.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics() + " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application."; + ". Failing the application.";
} else if (this.attempts.size() >= this.maxAppAttempts) { } else if (getNumNonPreemptedAppAttempts() >= this.maxAppAttempts) {
msg = "Application " + this.getApplicationId() + " failed " msg = "Application " + this.getApplicationId() + " failed "
+ this.maxAppAttempts + " times due to " + this.maxAppAttempts + " times due to "
+ failedEvent.getDiagnostics() + ". Failing the application."; + failedEvent.getDiagnostics() + ". Failing the application.";
@ -1102,6 +1105,17 @@ public class RMAppImpl implements RMApp, Recoverable {
}; };
} }
private int getNumNonPreemptedAppAttempts() {
int completedAttempts = 0;
// Do not count AM preemption as attempt failure.
for (RMAppAttempt attempt : attempts.values()) {
if (!attempt.isPreempted()) {
completedAttempts++;
}
}
return completedAttempts;
}
private static final class AttemptFailedTransition implements private static final class AttemptFailedTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> { MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@ -1113,8 +1127,9 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override @Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) { public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (!app.submissionContext.getUnmanagedAM() if (!app.submissionContext.getUnmanagedAM()
&& app.attempts.size() < app.maxAppAttempts) { && app.getNumNonPreemptedAppAttempts() < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false; boolean transferStateFromPreviousAttempt = false;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt = transferStateFromPreviousAttempt =

View File

@ -196,4 +196,9 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
*/ */
ApplicationAttemptReport createApplicationAttemptReport(); ApplicationAttemptReport createApplicationAttemptReport();
/**
* Return the flag which indicates whether the attempt is preempted by the
* scheduler.
*/
boolean isPreempted();
} }

View File

@ -48,11 +48,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -146,9 +146,14 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// if an RMAppAttemptUnregistrationEvent occurs // if an RMAppAttemptUnregistrationEvent occurs
private FinalApplicationStatus finalStatus = null; private FinalApplicationStatus finalStatus = null;
private final StringBuilder diagnostics = new StringBuilder(); private final StringBuilder diagnostics = new StringBuilder();
private int amContainerExitStatus = ContainerExitStatus.INVALID;
private Configuration conf; private Configuration conf;
private final boolean isLastAttempt; // Since AM preemption is not counted towards AM failure count,
// even if this flag is true, a new attempt can still be re-created if this
// attempt is eventually preempted. So this flag indicates that this may be
// last attempt.
private final boolean maybeLastAttempt;
private static final ExpiredTransition EXPIRED_TRANSITION = private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition(); new ExpiredTransition();
@ -389,7 +394,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMContext rmContext, YarnScheduler scheduler, RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService, ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext, ApplicationSubmissionContext submissionContext,
Configuration conf, boolean isLastAttempt) { Configuration conf, boolean maybeLastAttempt) {
this.conf = conf; this.conf = conf;
this.applicationAttemptId = appAttemptId; this.applicationAttemptId = appAttemptId;
this.rmContext = rmContext; this.rmContext = rmContext;
@ -403,7 +408,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.writeLock = lock.writeLock(); this.writeLock = lock.writeLock();
this.proxiedTrackingUrl = generateProxyUriWithScheme(null); this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
this.isLastAttempt = isLastAttempt; this.maybeLastAttempt = maybeLastAttempt;
this.stateMachine = stateMachineFactory.make(this); this.stateMachine = stateMachineFactory.make(this);
} }
@ -565,6 +570,15 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
} }
public int getAMContainerExitStatus() {
this.readLock.lock();
try {
return this.amContainerExitStatus;
} finally {
this.readLock.unlock();
}
}
@Override @Override
public float getProgress() { public float getProgress() {
this.readLock.lock(); this.readLock.lock();
@ -671,6 +685,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
+ attemptState.getState()); + attemptState.getState());
diagnostics.append("Attempt recovered after RM restart"); diagnostics.append("Attempt recovered after RM restart");
diagnostics.append(attemptState.getDiagnostics()); diagnostics.append(attemptState.getDiagnostics());
this.amContainerExitStatus = attemptState.getAMContainerExitStatus();
setMasterContainer(attemptState.getMasterContainer()); setMasterContainer(attemptState.getMasterContainer());
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials()); recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
this.recoveredFinalState = attemptState.getState(); this.recoveredFinalState = attemptState.getState();
@ -931,7 +946,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
String diags = null; String diags = null;
String finalTrackingUrl = null; String finalTrackingUrl = null;
FinalApplicationStatus finalStatus = null; FinalApplicationStatus finalStatus = null;
int exitStatus = ContainerExitStatus.INVALID;
switch (event.getType()) { switch (event.getType()) {
case LAUNCH_FAILED: case LAUNCH_FAILED:
RMAppAttemptLaunchFailedEvent launchFaileEvent = RMAppAttemptLaunchFailedEvent launchFaileEvent =
@ -952,6 +967,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptContainerFinishedEvent finishEvent = RMAppAttemptContainerFinishedEvent finishEvent =
(RMAppAttemptContainerFinishedEvent) event; (RMAppAttemptContainerFinishedEvent) event;
diags = getAMContainerCrashedDiagnostics(finishEvent); diags = getAMContainerCrashedDiagnostics(finishEvent);
exitStatus = finishEvent.getContainerStatus().getExitStatus();
break; break;
case KILL: case KILL:
break; break;
@ -966,9 +982,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime, rmStore.getCredentialsFromAppAttempt(this), startTime,
stateToBeStored, finalTrackingUrl, diags, finalStatus); stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus);
LOG.info("Updating application attempt " + applicationAttemptId LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState); + " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus);
rmStore.updateApplicationAttemptState(attemptState); rmStore.updateApplicationAttemptState(attemptState);
} }
@ -1061,11 +1078,19 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// don't leave the tracking URL pointing to a non-existent AM // don't leave the tracking URL pointing to a non-existent AM
appAttempt.setTrackingUrlToRMAppPage(); appAttempt.setTrackingUrlToRMAppPage();
appAttempt.invalidateAMHostAndPort(); appAttempt.invalidateAMHostAndPort();
if (appAttempt.submissionContext if (appAttempt.submissionContext
.getKeepContainersAcrossApplicationAttempts() .getKeepContainersAcrossApplicationAttempts()
&& !appAttempt.isLastAttempt
&& !appAttempt.submissionContext.getUnmanagedAM()) { && !appAttempt.submissionContext.getUnmanagedAM()) {
// See if we should retain containers for non-unmanaged applications
if (appAttempt.isPreempted()) {
// Premption doesn't count towards app-failures and so we should
// retain containers.
keepContainersAcrossAppAttempts = true; keepContainersAcrossAppAttempts = true;
} else if (!appAttempt.maybeLastAttempt) {
// Not preemption. Not last-attempt too - keep containers.
keepContainersAcrossAppAttempts = true;
}
} }
appEvent = appEvent =
new RMAppFailedAttemptEvent(applicationId, new RMAppFailedAttemptEvent(applicationId,
@ -1106,6 +1131,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
} }
@Override
public boolean isPreempted() {
return getAMContainerExitStatus() == ContainerExitStatus.PREEMPTED;
}
private static final class UnmanagedAMAttemptSavedTransition private static final class UnmanagedAMAttemptSavedTransition
extends AMLaunchedTransition { extends AMLaunchedTransition {
@Override @Override
@ -1208,14 +1238,22 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.rmContext.getAMLivelinessMonitor().unregister( appAttempt.rmContext.getAMLivelinessMonitor().unregister(
appAttempt.getAppAttemptId()); appAttempt.getAppAttemptId());
// Setup diagnostic message // Setup diagnostic message and exit status
appAttempt.diagnostics appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent);
.append(getAMContainerCrashedDiagnostics(finishEvent));
// Tell the app, scheduler // Tell the app, scheduler
super.transition(appAttempt, finishEvent); super.transition(appAttempt, finishEvent);
} }
} }
private void setAMContainerCrashedDiagnosticsAndExitStatus(
RMAppAttemptContainerFinishedEvent finishEvent) {
ContainerStatus status = finishEvent.getContainerStatus();
String diagnostics = getAMContainerCrashedDiagnostics(finishEvent);
this.diagnostics.append(diagnostics);
this.amContainerExitStatus = status.getExitStatus();
}
private static String getAMContainerCrashedDiagnostics( private static String getAMContainerCrashedDiagnostics(
RMAppAttemptContainerFinishedEvent finishEvent) { RMAppAttemptContainerFinishedEvent finishEvent) {
ContainerStatus status = finishEvent.getContainerStatus(); ContainerStatus status = finishEvent.getContainerStatus();
@ -1437,13 +1475,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override @Override
public void public void
transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
RMAppAttemptContainerFinishedEvent containerFinishedEvent = RMAppAttemptContainerFinishedEvent finishEvent =
(RMAppAttemptContainerFinishedEvent) event; (RMAppAttemptContainerFinishedEvent) event;
// container associated with AM. must not be unmanaged // container associated with AM. must not be unmanaged
assert appAttempt.submissionContext.getUnmanagedAM() == false; assert appAttempt.submissionContext.getUnmanagedAM() == false;
// Setup diagnostic message // Setup diagnostic message and exit status
appAttempt.diagnostics appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent);
.append(getAMContainerCrashedDiagnostics(containerFinishedEvent));
new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt, new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt,
event); event);
} }
@ -1644,4 +1681,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
return attemptReport; return attemptReport;
} }
// for testing
public boolean mayBeLastAttempt() {
return maybeLastAttempt;
}
} }

View File

@ -1079,10 +1079,8 @@ public class CapacityScheduler extends
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("KILL_CONTAINER: container" + cont.toString()); LOG.debug("KILL_CONTAINER: container" + cont.toString());
} }
completedContainer(cont, completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
SchedulerUtils.createPreemptedContainerStatus( cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
cont.getContainerId(),"Container being forcibly preempted:"
+ cont.getContainerId()),
RMContainerEventType.KILL); RMContainerEventType.KILL);
} }

View File

@ -87,7 +87,7 @@ public class MockNM {
return httpPort; return httpPort;
} }
void setResourceTrackerService(ResourceTrackerService resourceTracker) { public void setResourceTrackerService(ResourceTrackerService resourceTracker) {
this.resourceTracker = resourceTracker; this.resourceTracker = resourceTracker;
} }

View File

@ -166,6 +166,19 @@ public class MockRM extends ResourceManager {
} }
} }
public MockAM waitForNewAMToLaunchAndRegister(ApplicationId appId, int attemptSize,
MockNM nm) throws Exception {
RMApp app = getRMContext().getRMApps().get(appId);
Assert.assertNotNull(app);
while (app.getAppAttempts().size() != attemptSize) {
System.out.println("Application " + appId
+ " is waiting for AM to restart. Current has "
+ app.getAppAttempts().size() + " attempts.");
Thread.sleep(200);
}
return launchAndRegisterAM(app, this, nm);
}
public void waitForState(MockNM nm, ContainerId containerId, public void waitForState(MockNM nm, ContainerId containerId,
RMContainerState containerState) throws Exception { RMContainerState containerState) throws Exception {
RMContainer container = getResourceScheduler().getRMContainer(containerId); RMContainer container = getResourceScheduler().getRMContainer(containerId);
@ -551,6 +564,7 @@ public class MockRM extends ResourceManager {
throws Exception { throws Exception {
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt(); RMAppAttempt attempt = app.getCurrentAppAttempt();
System.out.println("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);

View File

@ -22,13 +22,12 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import org.junit.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -38,22 +37,23 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
/**
* Test to restart the AM on failure.
*
*/
public class TestAMRestart { public class TestAMRestart {
@Test @Test(timeout = 30000)
public void testAMRestartWithExistingContainers() throws Exception { public void testAMRestartWithExistingContainers() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@ -245,7 +245,7 @@ public class TestAMRestart {
} }
} }
@Test @Test(timeout = 30000)
public void testNMTokensRebindOnAMRestart() throws Exception { public void testNMTokensRebindOnAMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
@ -345,4 +345,122 @@ public class TestAMRestart {
Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens)); Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
rm1.stop(); rm1.stop();
} }
// AM container preempted should not be counted towards AM max retry count.
@Test(timeout = 20000)
public void testAMPreemptedNotCountedForAMFailures() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// explicitly set max-am-retry count as 1.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
CapacityScheduler scheduler =
(CapacityScheduler) rm1.getResourceScheduler();
ContainerId amContainer =
ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
// Preempt the first attempt;
scheduler.killContainer(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(attempt1.isPreempted());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// AM should be restarted even though max-am-attempt is 1.
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
// Preempt the second attempt.
ContainerId amContainer2 =
ContainerId.newInstance(am2.getApplicationAttemptId(), 1);
scheduler.killContainer(scheduler.getRMContainer(amContainer2));
am2.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(attempt2.isPreempted());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
// fail the AM normally
nm1.nodeHeartbeat(am3.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am3.waitForState(RMAppAttemptState.FAILED);
Assert.assertFalse(attempt3.isPreempted());
// AM should not be restarted.
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
Assert.assertEquals(3, app1.getAppAttempts().size());
rm1.stop();
}
// Test RM restarts after AM container is preempted, new RM should not count
// AM preemption failure towards the max-retry-account and should be able to
// re-launch the AM.
@Test(timeout = 20000)
public void testPreemptedAMRestartOnRMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
// explicitly set max-am-retry count as 1.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
CapacityScheduler scheduler =
(CapacityScheduler) rm1.getResourceScheduler();
ContainerId amContainer =
ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
// Forcibly preempt the am container;
scheduler.killContainer(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(attempt1.isPreempted());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// state store has 1 attempt stored.
ApplicationState appState =
memStore.getState().getApplicationState().get(app1.getApplicationId());
Assert.assertEquals(1, appState.getAttemptCount());
// attempt stored has the preempted container exit status.
Assert.assertEquals(ContainerExitStatus.PREEMPTED,
appState.getAttempt(am1.getApplicationAttemptId())
.getAMContainerExitStatus());
// Restart rm.
MockRM rm2 = new MockRM(conf, memStore);
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm1.registerNode();
rm2.start();
// Restarted RM should re-launch the am.
MockAM am2 =
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
RMAppAttempt attempt2 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
.getCurrentAppAttempt();
Assert.assertFalse(attempt2.isPreempted());
Assert.assertEquals(ContainerExitStatus.INVALID,
appState.getAttempt(am2.getApplicationAttemptId())
.getAMContainerExitStatus());
rm1.stop();
rm2.stop();
}
} }

View File

@ -267,6 +267,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
// attempt1 is loaded correctly // attempt1 is loaded correctly
assertNotNull(attemptState); assertNotNull(attemptState);
assertEquals(attemptId1, attemptState.getAttemptId()); assertEquals(attemptId1, attemptState.getAttemptId());
assertEquals(-1000, attemptState.getAMContainerExitStatus());
// attempt1 container is loaded correctly // attempt1 container is loaded correctly
assertEquals(containerId1, attemptState.getMasterContainer().getId()); assertEquals(containerId1, attemptState.getMasterContainer().getId());
// attempt1 applicationToken is loaded correctly // attempt1 applicationToken is loaded correctly
@ -308,7 +309,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED); FinalApplicationStatus.SUCCEEDED, 100);
store.updateApplicationAttemptState(newAttemptState); store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not // test updating the state of an app/attempt whose initial state was not
@ -331,7 +332,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED); FinalApplicationStatus.SUCCEEDED, 111);
store.updateApplicationAttemptState(dummyAttempt); store.updateApplicationAttemptState(dummyAttempt);
// let things settle down // let things settle down
@ -370,6 +371,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState()); assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl()); assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());
assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics()); assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics());
assertEquals(100, updatedAttemptState.getAMContainerExitStatus());
assertEquals(FinalApplicationStatus.SUCCEEDED, assertEquals(FinalApplicationStatus.SUCCEEDED,
updatedAttemptState.getFinalApplicationStatus()); updatedAttemptState.getFinalApplicationStatus());