YARN-2074. Changed ResourceManager to not count AM preemptions towards app failures. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1605106 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cb2b34b01c
commit
d16470025a
|
@ -175,6 +175,9 @@ Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-2195. Clean a piece of code in ResourceRequest. (Wei Yan via devaraj)
|
YARN-2195. Clean a piece of code in ResourceRequest. (Wei Yan via devaraj)
|
||||||
|
|
||||||
|
YARN-2074. Changed ResourceManager to not count AM preemptions towards app
|
||||||
|
failures. (Jian He via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -127,6 +127,7 @@ message ApplicationAttemptStateDataProto {
|
||||||
optional string diagnostics = 6 [default = "N/A"];
|
optional string diagnostics = 6 [default = "N/A"];
|
||||||
optional int64 start_time = 7;
|
optional int64 start_time = 7;
|
||||||
optional FinalApplicationStatusProto final_application_status = 8;
|
optional FinalApplicationStatusProto final_application_status = 8;
|
||||||
|
optional int32 am_container_exit_status = 9 [default = -1000];
|
||||||
}
|
}
|
||||||
|
|
||||||
message RMStateVersionProto {
|
message RMStateVersionProto {
|
||||||
|
|
|
@ -216,7 +216,8 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
attemptStateData.getState(),
|
attemptStateData.getState(),
|
||||||
attemptStateData.getFinalTrackingUrl(),
|
attemptStateData.getFinalTrackingUrl(),
|
||||||
attemptStateData.getDiagnostics(),
|
attemptStateData.getDiagnostics(),
|
||||||
attemptStateData.getFinalApplicationStatus());
|
attemptStateData.getFinalApplicationStatus(),
|
||||||
|
attemptStateData.getAMContainerExitStatus());
|
||||||
|
|
||||||
// assert child node name is same as application attempt id
|
// assert child node name is same as application attempt id
|
||||||
assert attemptId.equals(attemptState.getAttemptId());
|
assert attemptId.equals(attemptState.getAttemptId());
|
||||||
|
|
|
@ -152,7 +152,8 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
attemptStateData.getStartTime(), attemptStateData.getState(),
|
attemptStateData.getStartTime(), attemptStateData.getState(),
|
||||||
attemptStateData.getFinalTrackingUrl(),
|
attemptStateData.getFinalTrackingUrl(),
|
||||||
attemptStateData.getDiagnostics(),
|
attemptStateData.getDiagnostics(),
|
||||||
attemptStateData.getFinalApplicationStatus());
|
attemptStateData.getFinalApplicationStatus(),
|
||||||
|
attemptStateData.getAMContainerExitStatus());
|
||||||
|
|
||||||
ApplicationState appState =
|
ApplicationState appState =
|
||||||
state.getApplicationState().get(
|
state.getApplicationState().get(
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
|
@ -258,19 +259,21 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
RMAppAttemptState state;
|
RMAppAttemptState state;
|
||||||
String finalTrackingUrl = "N/A";
|
String finalTrackingUrl = "N/A";
|
||||||
String diagnostics;
|
String diagnostics;
|
||||||
|
int exitStatus = ContainerExitStatus.INVALID;
|
||||||
FinalApplicationStatus amUnregisteredFinalStatus;
|
FinalApplicationStatus amUnregisteredFinalStatus;
|
||||||
|
|
||||||
public ApplicationAttemptState(ApplicationAttemptId attemptId,
|
public ApplicationAttemptState(ApplicationAttemptId attemptId,
|
||||||
Container masterContainer, Credentials appAttemptCredentials,
|
Container masterContainer, Credentials appAttemptCredentials,
|
||||||
long startTime) {
|
long startTime) {
|
||||||
this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
|
this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
|
||||||
null, "", null);
|
null, "", null, ContainerExitStatus.INVALID);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ApplicationAttemptState(ApplicationAttemptId attemptId,
|
public ApplicationAttemptState(ApplicationAttemptId attemptId,
|
||||||
Container masterContainer, Credentials appAttemptCredentials,
|
Container masterContainer, Credentials appAttemptCredentials,
|
||||||
long startTime, RMAppAttemptState state, String finalTrackingUrl,
|
long startTime, RMAppAttemptState state, String finalTrackingUrl,
|
||||||
String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus) {
|
String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
|
||||||
|
int exitStatus) {
|
||||||
this.attemptId = attemptId;
|
this.attemptId = attemptId;
|
||||||
this.masterContainer = masterContainer;
|
this.masterContainer = masterContainer;
|
||||||
this.appAttemptCredentials = appAttemptCredentials;
|
this.appAttemptCredentials = appAttemptCredentials;
|
||||||
|
@ -279,6 +282,7 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
this.finalTrackingUrl = finalTrackingUrl;
|
this.finalTrackingUrl = finalTrackingUrl;
|
||||||
this.diagnostics = diagnostics == null ? "" : diagnostics;
|
this.diagnostics = diagnostics == null ? "" : diagnostics;
|
||||||
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
|
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
|
||||||
|
this.exitStatus = exitStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Container getMasterContainer() {
|
public Container getMasterContainer() {
|
||||||
|
@ -305,6 +309,9 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
public FinalApplicationStatus getFinalApplicationStatus() {
|
public FinalApplicationStatus getFinalApplicationStatus() {
|
||||||
return amUnregisteredFinalStatus;
|
return amUnregisteredFinalStatus;
|
||||||
}
|
}
|
||||||
|
public int getAMContainerExitStatus(){
|
||||||
|
return this.exitStatus;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -539,11 +539,11 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
ApplicationAttemptState attemptState =
|
ApplicationAttemptState attemptState =
|
||||||
new ApplicationAttemptState(attemptId,
|
new ApplicationAttemptState(attemptId,
|
||||||
attemptStateData.getMasterContainer(), credentials,
|
attemptStateData.getMasterContainer(), credentials,
|
||||||
attemptStateData.getStartTime(),
|
attemptStateData.getStartTime(), attemptStateData.getState(),
|
||||||
attemptStateData.getState(),
|
|
||||||
attemptStateData.getFinalTrackingUrl(),
|
attemptStateData.getFinalTrackingUrl(),
|
||||||
attemptStateData.getDiagnostics(),
|
attemptStateData.getDiagnostics(),
|
||||||
attemptStateData.getFinalApplicationStatus());
|
attemptStateData.getFinalApplicationStatus(),
|
||||||
|
attemptStateData.getAMContainerExitStatus());
|
||||||
|
|
||||||
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ public abstract class ApplicationAttemptStateData {
|
||||||
ApplicationAttemptId attemptId, Container container,
|
ApplicationAttemptId attemptId, Container container,
|
||||||
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
|
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
|
||||||
String finalTrackingUrl, String diagnostics,
|
String finalTrackingUrl, String diagnostics,
|
||||||
FinalApplicationStatus amUnregisteredFinalStatus) {
|
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
|
||||||
ApplicationAttemptStateData attemptStateData =
|
ApplicationAttemptStateData attemptStateData =
|
||||||
Records.newRecord(ApplicationAttemptStateData.class);
|
Records.newRecord(ApplicationAttemptStateData.class);
|
||||||
attemptStateData.setAttemptId(attemptId);
|
attemptStateData.setAttemptId(attemptId);
|
||||||
|
@ -54,6 +54,7 @@ public abstract class ApplicationAttemptStateData {
|
||||||
attemptStateData.setDiagnostics(diagnostics);
|
attemptStateData.setDiagnostics(diagnostics);
|
||||||
attemptStateData.setStartTime(startTime);
|
attemptStateData.setStartTime(startTime);
|
||||||
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
|
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
|
||||||
|
attemptStateData.setAMContainerExitStatus(exitStatus);
|
||||||
return attemptStateData;
|
return attemptStateData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,9 +70,9 @@ public abstract class ApplicationAttemptStateData {
|
||||||
return newInstance(attemptState.getAttemptId(),
|
return newInstance(attemptState.getAttemptId(),
|
||||||
attemptState.getMasterContainer(), appAttemptTokens,
|
attemptState.getMasterContainer(), appAttemptTokens,
|
||||||
attemptState.getStartTime(), attemptState.getState(),
|
attemptState.getStartTime(), attemptState.getState(),
|
||||||
attemptState.getFinalTrackingUrl(),
|
attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
|
||||||
attemptState.getDiagnostics(),
|
attemptState.getFinalApplicationStatus(),
|
||||||
attemptState.getFinalApplicationStatus());
|
attemptState.getAMContainerExitStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract ApplicationAttemptStateDataProto getProto();
|
public abstract ApplicationAttemptStateDataProto getProto();
|
||||||
|
@ -150,5 +151,10 @@ public abstract class ApplicationAttemptStateData {
|
||||||
*/
|
*/
|
||||||
public abstract FinalApplicationStatus getFinalApplicationStatus();
|
public abstract FinalApplicationStatus getFinalApplicationStatus();
|
||||||
|
|
||||||
public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState);
|
public abstract void setFinalApplicationStatus(
|
||||||
|
FinalApplicationStatus finishState);
|
||||||
|
|
||||||
|
public abstract int getAMContainerExitStatus();
|
||||||
|
|
||||||
|
public abstract void setAMContainerExitStatus(int exitStatus);
|
||||||
}
|
}
|
||||||
|
|
|
@ -252,6 +252,19 @@ public class ApplicationAttemptStateDataPBImpl extends
|
||||||
return getProto().hashCode();
|
return getProto().hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getAMContainerExitStatus() {
|
||||||
|
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
return p.getAmContainerExitStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setAMContainerExitStatus(int exitStatus) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setAmContainerExitStatus(exitStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object other) {
|
public boolean equals(Object other) {
|
||||||
if (other == null)
|
if (other == null)
|
||||||
|
@ -281,5 +294,4 @@ public class ApplicationAttemptStateDataPBImpl extends
|
||||||
private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
|
private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
|
||||||
return ProtoUtils.convertFromProtoFormat(s);
|
return ProtoUtils.convertFromProtoFormat(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,7 +78,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||||
import org.apache.hadoop.yarn.state.StateMachine;
|
import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
||||||
|
|
||||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
public class RMAppImpl implements RMApp, Recoverable {
|
public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
@ -686,7 +685,11 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
|
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
|
||||||
RMAppAttempt attempt =
|
RMAppAttempt attempt =
|
||||||
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
|
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
|
||||||
submissionContext, conf, maxAppAttempts == attempts.size());
|
submissionContext, conf,
|
||||||
|
// The newly created attempt maybe last attempt if (number of
|
||||||
|
// previously NonPreempted attempts + 1) equal to the max-attempt
|
||||||
|
// limit.
|
||||||
|
maxAppAttempts == (getNumNonPreemptedAppAttempts() + 1));
|
||||||
attempts.put(appAttemptId, attempt);
|
attempts.put(appAttemptId, attempt);
|
||||||
currentAttempt = attempt;
|
currentAttempt = attempt;
|
||||||
}
|
}
|
||||||
|
@ -794,7 +797,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|
||||||
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
|
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
|
||||||
|| (app.currentAttempt.getState() == RMAppAttemptState.FAILED
|
|| (app.currentAttempt.getState() == RMAppAttemptState.FAILED
|
||||||
&& app.attempts.size() == app.maxAppAttempts))) {
|
&& app.getNumNonPreemptedAppAttempts() == app.maxAppAttempts))) {
|
||||||
return RMAppState.ACCEPTED;
|
return RMAppState.ACCEPTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -885,7 +888,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
msg = "Unmanaged application " + this.getApplicationId()
|
msg = "Unmanaged application " + this.getApplicationId()
|
||||||
+ " failed due to " + failedEvent.getDiagnostics()
|
+ " failed due to " + failedEvent.getDiagnostics()
|
||||||
+ ". Failing the application.";
|
+ ". Failing the application.";
|
||||||
} else if (this.attempts.size() >= this.maxAppAttempts) {
|
} else if (getNumNonPreemptedAppAttempts() >= this.maxAppAttempts) {
|
||||||
msg = "Application " + this.getApplicationId() + " failed "
|
msg = "Application " + this.getApplicationId() + " failed "
|
||||||
+ this.maxAppAttempts + " times due to "
|
+ this.maxAppAttempts + " times due to "
|
||||||
+ failedEvent.getDiagnostics() + ". Failing the application.";
|
+ failedEvent.getDiagnostics() + ". Failing the application.";
|
||||||
|
@ -1102,6 +1105,17 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getNumNonPreemptedAppAttempts() {
|
||||||
|
int completedAttempts = 0;
|
||||||
|
// Do not count AM preemption as attempt failure.
|
||||||
|
for (RMAppAttempt attempt : attempts.values()) {
|
||||||
|
if (!attempt.isPreempted()) {
|
||||||
|
completedAttempts++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return completedAttempts;
|
||||||
|
}
|
||||||
|
|
||||||
private static final class AttemptFailedTransition implements
|
private static final class AttemptFailedTransition implements
|
||||||
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
|
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
|
||||||
|
|
||||||
|
@ -1113,8 +1127,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||||
|
|
||||||
if (!app.submissionContext.getUnmanagedAM()
|
if (!app.submissionContext.getUnmanagedAM()
|
||||||
&& app.attempts.size() < app.maxAppAttempts) {
|
&& app.getNumNonPreemptedAppAttempts() < app.maxAppAttempts) {
|
||||||
boolean transferStateFromPreviousAttempt = false;
|
boolean transferStateFromPreviousAttempt = false;
|
||||||
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
|
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
|
||||||
transferStateFromPreviousAttempt =
|
transferStateFromPreviousAttempt =
|
||||||
|
|
|
@ -196,4 +196,9 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
||||||
*/
|
*/
|
||||||
ApplicationAttemptReport createApplicationAttemptReport();
|
ApplicationAttemptReport createApplicationAttemptReport();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the flag which indicates whether the attempt is preempted by the
|
||||||
|
* scheduler.
|
||||||
|
*/
|
||||||
|
boolean isPreempted();
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,11 +48,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
@ -146,9 +146,14 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
// if an RMAppAttemptUnregistrationEvent occurs
|
// if an RMAppAttemptUnregistrationEvent occurs
|
||||||
private FinalApplicationStatus finalStatus = null;
|
private FinalApplicationStatus finalStatus = null;
|
||||||
private final StringBuilder diagnostics = new StringBuilder();
|
private final StringBuilder diagnostics = new StringBuilder();
|
||||||
|
private int amContainerExitStatus = ContainerExitStatus.INVALID;
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private final boolean isLastAttempt;
|
// Since AM preemption is not counted towards AM failure count,
|
||||||
|
// even if this flag is true, a new attempt can still be re-created if this
|
||||||
|
// attempt is eventually preempted. So this flag indicates that this may be
|
||||||
|
// last attempt.
|
||||||
|
private final boolean maybeLastAttempt;
|
||||||
private static final ExpiredTransition EXPIRED_TRANSITION =
|
private static final ExpiredTransition EXPIRED_TRANSITION =
|
||||||
new ExpiredTransition();
|
new ExpiredTransition();
|
||||||
|
|
||||||
|
@ -389,7 +394,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
RMContext rmContext, YarnScheduler scheduler,
|
RMContext rmContext, YarnScheduler scheduler,
|
||||||
ApplicationMasterService masterService,
|
ApplicationMasterService masterService,
|
||||||
ApplicationSubmissionContext submissionContext,
|
ApplicationSubmissionContext submissionContext,
|
||||||
Configuration conf, boolean isLastAttempt) {
|
Configuration conf, boolean maybeLastAttempt) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.applicationAttemptId = appAttemptId;
|
this.applicationAttemptId = appAttemptId;
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
|
@ -403,7 +408,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
this.writeLock = lock.writeLock();
|
this.writeLock = lock.writeLock();
|
||||||
|
|
||||||
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
|
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
|
||||||
this.isLastAttempt = isLastAttempt;
|
this.maybeLastAttempt = maybeLastAttempt;
|
||||||
this.stateMachine = stateMachineFactory.make(this);
|
this.stateMachine = stateMachineFactory.make(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -565,6 +570,15 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getAMContainerExitStatus() {
|
||||||
|
this.readLock.lock();
|
||||||
|
try {
|
||||||
|
return this.amContainerExitStatus;
|
||||||
|
} finally {
|
||||||
|
this.readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public float getProgress() {
|
public float getProgress() {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
|
@ -671,6 +685,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
+ attemptState.getState());
|
+ attemptState.getState());
|
||||||
diagnostics.append("Attempt recovered after RM restart");
|
diagnostics.append("Attempt recovered after RM restart");
|
||||||
diagnostics.append(attemptState.getDiagnostics());
|
diagnostics.append(attemptState.getDiagnostics());
|
||||||
|
this.amContainerExitStatus = attemptState.getAMContainerExitStatus();
|
||||||
setMasterContainer(attemptState.getMasterContainer());
|
setMasterContainer(attemptState.getMasterContainer());
|
||||||
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
|
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
|
||||||
this.recoveredFinalState = attemptState.getState();
|
this.recoveredFinalState = attemptState.getState();
|
||||||
|
@ -931,7 +946,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
String diags = null;
|
String diags = null;
|
||||||
String finalTrackingUrl = null;
|
String finalTrackingUrl = null;
|
||||||
FinalApplicationStatus finalStatus = null;
|
FinalApplicationStatus finalStatus = null;
|
||||||
|
int exitStatus = ContainerExitStatus.INVALID;
|
||||||
switch (event.getType()) {
|
switch (event.getType()) {
|
||||||
case LAUNCH_FAILED:
|
case LAUNCH_FAILED:
|
||||||
RMAppAttemptLaunchFailedEvent launchFaileEvent =
|
RMAppAttemptLaunchFailedEvent launchFaileEvent =
|
||||||
|
@ -952,6 +967,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
RMAppAttemptContainerFinishedEvent finishEvent =
|
RMAppAttemptContainerFinishedEvent finishEvent =
|
||||||
(RMAppAttemptContainerFinishedEvent) event;
|
(RMAppAttemptContainerFinishedEvent) event;
|
||||||
diags = getAMContainerCrashedDiagnostics(finishEvent);
|
diags = getAMContainerCrashedDiagnostics(finishEvent);
|
||||||
|
exitStatus = finishEvent.getContainerStatus().getExitStatus();
|
||||||
break;
|
break;
|
||||||
case KILL:
|
case KILL:
|
||||||
break;
|
break;
|
||||||
|
@ -966,9 +982,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
ApplicationAttemptState attemptState =
|
ApplicationAttemptState attemptState =
|
||||||
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
|
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
|
||||||
rmStore.getCredentialsFromAppAttempt(this), startTime,
|
rmStore.getCredentialsFromAppAttempt(this), startTime,
|
||||||
stateToBeStored, finalTrackingUrl, diags, finalStatus);
|
stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus);
|
||||||
LOG.info("Updating application attempt " + applicationAttemptId
|
LOG.info("Updating application attempt " + applicationAttemptId
|
||||||
+ " with final state: " + targetedFinalState);
|
+ " with final state: " + targetedFinalState + ", and exit status: "
|
||||||
|
+ exitStatus);
|
||||||
rmStore.updateApplicationAttemptState(attemptState);
|
rmStore.updateApplicationAttemptState(attemptState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1061,11 +1078,19 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
// don't leave the tracking URL pointing to a non-existent AM
|
// don't leave the tracking URL pointing to a non-existent AM
|
||||||
appAttempt.setTrackingUrlToRMAppPage();
|
appAttempt.setTrackingUrlToRMAppPage();
|
||||||
appAttempt.invalidateAMHostAndPort();
|
appAttempt.invalidateAMHostAndPort();
|
||||||
|
|
||||||
if (appAttempt.submissionContext
|
if (appAttempt.submissionContext
|
||||||
.getKeepContainersAcrossApplicationAttempts()
|
.getKeepContainersAcrossApplicationAttempts()
|
||||||
&& !appAttempt.isLastAttempt
|
|
||||||
&& !appAttempt.submissionContext.getUnmanagedAM()) {
|
&& !appAttempt.submissionContext.getUnmanagedAM()) {
|
||||||
|
// See if we should retain containers for non-unmanaged applications
|
||||||
|
if (appAttempt.isPreempted()) {
|
||||||
|
// Premption doesn't count towards app-failures and so we should
|
||||||
|
// retain containers.
|
||||||
keepContainersAcrossAppAttempts = true;
|
keepContainersAcrossAppAttempts = true;
|
||||||
|
} else if (!appAttempt.maybeLastAttempt) {
|
||||||
|
// Not preemption. Not last-attempt too - keep containers.
|
||||||
|
keepContainersAcrossAppAttempts = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
appEvent =
|
appEvent =
|
||||||
new RMAppFailedAttemptEvent(applicationId,
|
new RMAppFailedAttemptEvent(applicationId,
|
||||||
|
@ -1106,6 +1131,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isPreempted() {
|
||||||
|
return getAMContainerExitStatus() == ContainerExitStatus.PREEMPTED;
|
||||||
|
}
|
||||||
|
|
||||||
private static final class UnmanagedAMAttemptSavedTransition
|
private static final class UnmanagedAMAttemptSavedTransition
|
||||||
extends AMLaunchedTransition {
|
extends AMLaunchedTransition {
|
||||||
@Override
|
@Override
|
||||||
|
@ -1208,14 +1238,22 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
appAttempt.rmContext.getAMLivelinessMonitor().unregister(
|
appAttempt.rmContext.getAMLivelinessMonitor().unregister(
|
||||||
appAttempt.getAppAttemptId());
|
appAttempt.getAppAttemptId());
|
||||||
|
|
||||||
// Setup diagnostic message
|
// Setup diagnostic message and exit status
|
||||||
appAttempt.diagnostics
|
appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent);
|
||||||
.append(getAMContainerCrashedDiagnostics(finishEvent));
|
|
||||||
// Tell the app, scheduler
|
// Tell the app, scheduler
|
||||||
super.transition(appAttempt, finishEvent);
|
super.transition(appAttempt, finishEvent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setAMContainerCrashedDiagnosticsAndExitStatus(
|
||||||
|
RMAppAttemptContainerFinishedEvent finishEvent) {
|
||||||
|
ContainerStatus status = finishEvent.getContainerStatus();
|
||||||
|
String diagnostics = getAMContainerCrashedDiagnostics(finishEvent);
|
||||||
|
this.diagnostics.append(diagnostics);
|
||||||
|
this.amContainerExitStatus = status.getExitStatus();
|
||||||
|
}
|
||||||
|
|
||||||
private static String getAMContainerCrashedDiagnostics(
|
private static String getAMContainerCrashedDiagnostics(
|
||||||
RMAppAttemptContainerFinishedEvent finishEvent) {
|
RMAppAttemptContainerFinishedEvent finishEvent) {
|
||||||
ContainerStatus status = finishEvent.getContainerStatus();
|
ContainerStatus status = finishEvent.getContainerStatus();
|
||||||
|
@ -1437,13 +1475,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
@Override
|
@Override
|
||||||
public void
|
public void
|
||||||
transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
|
transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
|
||||||
RMAppAttemptContainerFinishedEvent containerFinishedEvent =
|
RMAppAttemptContainerFinishedEvent finishEvent =
|
||||||
(RMAppAttemptContainerFinishedEvent) event;
|
(RMAppAttemptContainerFinishedEvent) event;
|
||||||
// container associated with AM. must not be unmanaged
|
// container associated with AM. must not be unmanaged
|
||||||
assert appAttempt.submissionContext.getUnmanagedAM() == false;
|
assert appAttempt.submissionContext.getUnmanagedAM() == false;
|
||||||
// Setup diagnostic message
|
// Setup diagnostic message and exit status
|
||||||
appAttempt.diagnostics
|
appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent);
|
||||||
.append(getAMContainerCrashedDiagnostics(containerFinishedEvent));
|
|
||||||
new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt,
|
new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt,
|
||||||
event);
|
event);
|
||||||
}
|
}
|
||||||
|
@ -1644,4 +1681,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
}
|
}
|
||||||
return attemptReport;
|
return attemptReport;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for testing
|
||||||
|
public boolean mayBeLastAttempt() {
|
||||||
|
return maybeLastAttempt;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1076,13 +1076,11 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void killContainer(RMContainer cont) {
|
public void killContainer(RMContainer cont) {
|
||||||
if(LOG.isDebugEnabled()){
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("KILL_CONTAINER: container" + cont.toString());
|
LOG.debug("KILL_CONTAINER: container" + cont.toString());
|
||||||
}
|
}
|
||||||
completedContainer(cont,
|
completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
|
||||||
SchedulerUtils.createPreemptedContainerStatus(
|
cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
|
||||||
cont.getContainerId(),"Container being forcibly preempted:"
|
|
||||||
+ cont.getContainerId()),
|
|
||||||
RMContainerEventType.KILL);
|
RMContainerEventType.KILL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,7 +87,7 @@ public class MockNM {
|
||||||
return httpPort;
|
return httpPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setResourceTrackerService(ResourceTrackerService resourceTracker) {
|
public void setResourceTrackerService(ResourceTrackerService resourceTracker) {
|
||||||
this.resourceTracker = resourceTracker;
|
this.resourceTracker = resourceTracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -166,6 +166,19 @@ public class MockRM extends ResourceManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MockAM waitForNewAMToLaunchAndRegister(ApplicationId appId, int attemptSize,
|
||||||
|
MockNM nm) throws Exception {
|
||||||
|
RMApp app = getRMContext().getRMApps().get(appId);
|
||||||
|
Assert.assertNotNull(app);
|
||||||
|
while (app.getAppAttempts().size() != attemptSize) {
|
||||||
|
System.out.println("Application " + appId
|
||||||
|
+ " is waiting for AM to restart. Current has "
|
||||||
|
+ app.getAppAttempts().size() + " attempts.");
|
||||||
|
Thread.sleep(200);
|
||||||
|
}
|
||||||
|
return launchAndRegisterAM(app, this, nm);
|
||||||
|
}
|
||||||
|
|
||||||
public void waitForState(MockNM nm, ContainerId containerId,
|
public void waitForState(MockNM nm, ContainerId containerId,
|
||||||
RMContainerState containerState) throws Exception {
|
RMContainerState containerState) throws Exception {
|
||||||
RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
||||||
|
@ -551,6 +564,7 @@ public class MockRM extends ResourceManager {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
|
System.out.println("Launch AM " + attempt.getAppAttemptId());
|
||||||
nm.nodeHeartbeat(true);
|
nm.nodeHeartbeat(true);
|
||||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||||
|
|
|
@ -22,13 +22,12 @@ import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
@ -38,22 +37,23 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
|
||||||
* Test to restart the AM on failure.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class TestAMRestart {
|
public class TestAMRestart {
|
||||||
|
|
||||||
@Test
|
@Test(timeout = 30000)
|
||||||
public void testAMRestartWithExistingContainers() throws Exception {
|
public void testAMRestartWithExistingContainers() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
@ -245,7 +245,7 @@ public class TestAMRestart {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout = 30000)
|
||||||
public void testNMTokensRebindOnAMRestart() throws Exception {
|
public void testNMTokensRebindOnAMRestart() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
|
||||||
|
@ -345,4 +345,122 @@ public class TestAMRestart {
|
||||||
Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
|
Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AM container preempted should not be counted towards AM max retry count.
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testAMPreemptedNotCountedForAMFailures() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
// explicitly set max-am-retry count as 1.
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
RMApp app1 = rm1.submitApp(200);
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
CapacityScheduler scheduler =
|
||||||
|
(CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
ContainerId amContainer =
|
||||||
|
ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
|
||||||
|
// Preempt the first attempt;
|
||||||
|
scheduler.killContainer(scheduler.getRMContainer(amContainer));
|
||||||
|
|
||||||
|
am1.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
Assert.assertTrue(attempt1.isPreempted());
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
// AM should be restarted even though max-am-attempt is 1.
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
||||||
|
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
|
||||||
|
|
||||||
|
// Preempt the second attempt.
|
||||||
|
ContainerId amContainer2 =
|
||||||
|
ContainerId.newInstance(am2.getApplicationAttemptId(), 1);
|
||||||
|
scheduler.killContainer(scheduler.getRMContainer(amContainer2));
|
||||||
|
|
||||||
|
am2.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
Assert.assertTrue(attempt2.isPreempted());
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
|
||||||
|
Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
|
||||||
|
|
||||||
|
// fail the AM normally
|
||||||
|
nm1.nodeHeartbeat(am3.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
|
am3.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
Assert.assertFalse(attempt3.isPreempted());
|
||||||
|
|
||||||
|
// AM should not be restarted.
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||||
|
Assert.assertEquals(3, app1.getAppAttempts().size());
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test RM restarts after AM container is preempted, new RM should not count
|
||||||
|
// AM preemption failure towards the max-retry-account and should be able to
|
||||||
|
// re-launch the AM.
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
// explicitly set max-am-retry count as 1.
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
RMApp app1 = rm1.submitApp(200);
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
CapacityScheduler scheduler =
|
||||||
|
(CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
ContainerId amContainer =
|
||||||
|
ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
|
||||||
|
|
||||||
|
// Forcibly preempt the am container;
|
||||||
|
scheduler.killContainer(scheduler.getRMContainer(amContainer));
|
||||||
|
|
||||||
|
am1.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
Assert.assertTrue(attempt1.isPreempted());
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
|
||||||
|
// state store has 1 attempt stored.
|
||||||
|
ApplicationState appState =
|
||||||
|
memStore.getState().getApplicationState().get(app1.getApplicationId());
|
||||||
|
Assert.assertEquals(1, appState.getAttemptCount());
|
||||||
|
// attempt stored has the preempted container exit status.
|
||||||
|
Assert.assertEquals(ContainerExitStatus.PREEMPTED,
|
||||||
|
appState.getAttempt(am1.getApplicationAttemptId())
|
||||||
|
.getAMContainerExitStatus());
|
||||||
|
// Restart rm.
|
||||||
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
rm2.start();
|
||||||
|
|
||||||
|
// Restarted RM should re-launch the am.
|
||||||
|
MockAM am2 =
|
||||||
|
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
|
||||||
|
MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
|
||||||
|
RMAppAttempt attempt2 =
|
||||||
|
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
||||||
|
.getCurrentAppAttempt();
|
||||||
|
Assert.assertFalse(attempt2.isPreempted());
|
||||||
|
Assert.assertEquals(ContainerExitStatus.INVALID,
|
||||||
|
appState.getAttempt(am2.getApplicationAttemptId())
|
||||||
|
.getAMContainerExitStatus());
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -267,6 +267,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
// attempt1 is loaded correctly
|
// attempt1 is loaded correctly
|
||||||
assertNotNull(attemptState);
|
assertNotNull(attemptState);
|
||||||
assertEquals(attemptId1, attemptState.getAttemptId());
|
assertEquals(attemptId1, attemptState.getAttemptId());
|
||||||
|
assertEquals(-1000, attemptState.getAMContainerExitStatus());
|
||||||
// attempt1 container is loaded correctly
|
// attempt1 container is loaded correctly
|
||||||
assertEquals(containerId1, attemptState.getMasterContainer().getId());
|
assertEquals(containerId1, attemptState.getMasterContainer().getId());
|
||||||
// attempt1 applicationToken is loaded correctly
|
// attempt1 applicationToken is loaded correctly
|
||||||
|
@ -308,7 +309,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
oldAttemptState.getAppAttemptCredentials(),
|
oldAttemptState.getAppAttemptCredentials(),
|
||||||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||||
"myTrackingUrl", "attemptDiagnostics",
|
"myTrackingUrl", "attemptDiagnostics",
|
||||||
FinalApplicationStatus.SUCCEEDED);
|
FinalApplicationStatus.SUCCEEDED, 100);
|
||||||
store.updateApplicationAttemptState(newAttemptState);
|
store.updateApplicationAttemptState(newAttemptState);
|
||||||
|
|
||||||
// test updating the state of an app/attempt whose initial state was not
|
// test updating the state of an app/attempt whose initial state was not
|
||||||
|
@ -331,7 +332,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
oldAttemptState.getAppAttemptCredentials(),
|
oldAttemptState.getAppAttemptCredentials(),
|
||||||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||||
"myTrackingUrl", "attemptDiagnostics",
|
"myTrackingUrl", "attemptDiagnostics",
|
||||||
FinalApplicationStatus.SUCCEEDED);
|
FinalApplicationStatus.SUCCEEDED, 111);
|
||||||
store.updateApplicationAttemptState(dummyAttempt);
|
store.updateApplicationAttemptState(dummyAttempt);
|
||||||
|
|
||||||
// let things settle down
|
// let things settle down
|
||||||
|
@ -370,6 +371,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
|
assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
|
||||||
assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());
|
assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());
|
||||||
assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics());
|
assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics());
|
||||||
|
assertEquals(100, updatedAttemptState.getAMContainerExitStatus());
|
||||||
assertEquals(FinalApplicationStatus.SUCCEEDED,
|
assertEquals(FinalApplicationStatus.SUCCEEDED,
|
||||||
updatedAttemptState.getFinalApplicationStatus());
|
updatedAttemptState.getFinalApplicationStatus());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue