From c8dfed19babe5b0574f3e114591a888708c86daf Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 24 Jun 2014 15:16:36 +0000 Subject: [PATCH] YARN-2074. Changed ResourceManager to not count AM preemptions towards app failures. Contributed by Jian He. svn merge --ignore-ancestry -c 1605106 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1605107 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + ...erver_resourcemanager_service_protos.proto | 1 + .../recovery/FileSystemRMStateStore.java | 3 +- .../recovery/MemoryRMStateStore.java | 3 +- .../recovery/RMStateStore.java | 11 +- .../recovery/ZKRMStateStore.java | 12 +- .../records/ApplicationAttemptStateData.java | 20 ++- .../pb/ApplicationAttemptStateDataPBImpl.java | 14 +- .../resourcemanager/rmapp/RMAppImpl.java | 25 +++- .../rmapp/attempt/RMAppAttempt.java | 5 + .../rmapp/attempt/RMAppAttemptImpl.java | 76 +++++++--- .../scheduler/capacity/CapacityScheduler.java | 10 +- .../yarn/server/resourcemanager/MockNM.java | 2 +- .../yarn/server/resourcemanager/MockRM.java | 14 ++ .../applicationsmanager/TestAMRestart.java | 134 ++++++++++++++++-- .../recovery/RMStateStoreTestBase.java | 6 +- 16 files changed, 282 insertions(+), 57 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3f13cc8564e..cdd561bf8a9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -160,6 +160,9 @@ Release 2.5.0 - UNRELEASED YARN-2195. Clean a piece of code in ResourceRequest. (Wei Yan via devaraj) + YARN-2074. Changed ResourceManager to not count AM preemptions towards app + failures. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index aa9e0c625d0..db86d394e9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -127,6 +127,7 @@ message ApplicationAttemptStateDataProto { optional string diagnostics = 6 [default = "N/A"]; optional int64 start_time = 7; optional FinalApplicationStatusProto final_application_status = 8; + optional int32 am_container_exit_status = 9 [default = -1000]; } message RMStateVersionProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 7f4dad83fe9..37f08cf17b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -216,7 +216,8 @@ public class FileSystemRMStateStore extends RMStateStore { attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus()); + attemptStateData.getFinalApplicationStatus(), + attemptStateData.getAMContainerExitStatus()); // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index a43b20da392..fb0ce1a5b4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -152,7 +152,8 @@ public class MemoryRMStateStore extends RMStateStore { attemptStateData.getStartTime(), attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus()); + attemptStateData.getFinalApplicationStatus(), + attemptStateData.getAMContainerExitStatus()); ApplicationState appState = state.getApplicationState().get( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index affc6f9d865..b18a8748b92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -258,19 +259,21 @@ public abstract class RMStateStore extends AbstractService { RMAppAttemptState state; String finalTrackingUrl = "N/A"; String diagnostics; + int exitStatus = ContainerExitStatus.INVALID; FinalApplicationStatus amUnregisteredFinalStatus; public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, long startTime) { this(attemptId, masterContainer, appAttemptCredentials, startTime, null, - null, "", null); + null, "", null, ContainerExitStatus.INVALID); } public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, long startTime, RMAppAttemptState state, String finalTrackingUrl, - String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus) { + String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, + int exitStatus) { this.attemptId = attemptId; this.masterContainer = masterContainer; this.appAttemptCredentials = appAttemptCredentials; @@ -279,6 +282,7 @@ public abstract class RMStateStore extends AbstractService { this.finalTrackingUrl = finalTrackingUrl; this.diagnostics = diagnostics == null ? "" : diagnostics; this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; + this.exitStatus = exitStatus; } public Container getMasterContainer() { @@ -305,6 +309,9 @@ public abstract class RMStateStore extends AbstractService { public FinalApplicationStatus getFinalApplicationStatus() { return amUnregisteredFinalStatus; } + public int getAMContainerExitStatus(){ + return this.exitStatus; + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 63ae990732c..9ff128d790b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -538,12 +538,12 @@ public class ZKRMStateStore extends RMStateStore { ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), - attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus()); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getStartTime(), attemptStateData.getState(), + attemptStateData.getFinalTrackingUrl(), + attemptStateData.getDiagnostics(), + attemptStateData.getFinalApplicationStatus(), + attemptStateData.getAMContainerExitStatus()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 6af048b2e3d..90fb3ec0d2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -43,7 +43,7 @@ public abstract class ApplicationAttemptStateData { ApplicationAttemptId attemptId, Container container, ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, String finalTrackingUrl, String diagnostics, - FinalApplicationStatus amUnregisteredFinalStatus) { + FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) { ApplicationAttemptStateData attemptStateData = Records.newRecord(ApplicationAttemptStateData.class); attemptStateData.setAttemptId(attemptId); @@ -54,6 +54,7 @@ public abstract class ApplicationAttemptStateData { attemptStateData.setDiagnostics(diagnostics); attemptStateData.setStartTime(startTime); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); + attemptStateData.setAMContainerExitStatus(exitStatus); return attemptStateData; } @@ -67,11 +68,11 @@ public abstract class ApplicationAttemptStateData { appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); } return newInstance(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens, - attemptState.getStartTime(), attemptState.getState(), - attemptState.getFinalTrackingUrl(), - attemptState.getDiagnostics(), - attemptState.getFinalApplicationStatus()); + attemptState.getMasterContainer(), appAttemptTokens, + attemptState.getStartTime(), attemptState.getState(), + attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), + attemptState.getFinalApplicationStatus(), + attemptState.getAMContainerExitStatus()); } public abstract ApplicationAttemptStateDataProto getProto(); @@ -150,5 +151,10 @@ public abstract class ApplicationAttemptStateData { */ public abstract FinalApplicationStatus getFinalApplicationStatus(); - public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState); + public abstract void setFinalApplicationStatus( + FinalApplicationStatus finishState); + + public abstract int getAMContainerExitStatus(); + + public abstract void setAMContainerExitStatus(int exitStatus); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index e3ebe5e0893..a90bda49030 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -252,6 +252,19 @@ public class ApplicationAttemptStateDataPBImpl extends return getProto().hashCode(); } + @Override + public int getAMContainerExitStatus() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getAmContainerExitStatus(); + } + + @Override + public void setAMContainerExitStatus(int exitStatus) { + maybeInitBuilder(); + builder.setAmContainerExitStatus(exitStatus); + } + + @Override public boolean equals(Object other) { if (other == null) @@ -281,5 +294,4 @@ public class ApplicationAttemptStateDataPBImpl extends private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { return ProtoUtils.convertFromProtoFormat(s); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 3f9ef641a93..b6ca684a739 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -78,7 +78,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; -import org.apache.hadoop.yarn.util.resource.Resources; @SuppressWarnings({ "rawtypes", "unchecked" }) public class RMAppImpl implements RMApp, Recoverable { @@ -686,7 +685,11 @@ public class RMAppImpl implements RMApp, Recoverable { ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, - submissionContext, conf, maxAppAttempts == attempts.size()); + submissionContext, conf, + // The newly created attempt maybe last attempt if (number of + // previously NonPreempted attempts + 1) equal to the max-attempt + // limit. + maxAppAttempts == (getNumNonPreemptedAppAttempts() + 1)); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } @@ -794,7 +797,7 @@ public class RMAppImpl implements RMApp, Recoverable { && (app.currentAttempt.getState() == RMAppAttemptState.KILLED || app.currentAttempt.getState() == RMAppAttemptState.FINISHED || (app.currentAttempt.getState() == RMAppAttemptState.FAILED - && app.attempts.size() == app.maxAppAttempts))) { + && app.getNumNonPreemptedAppAttempts() == app.maxAppAttempts))) { return RMAppState.ACCEPTED; } @@ -885,7 +888,7 @@ public class RMAppImpl implements RMApp, Recoverable { msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; - } else if (this.attempts.size() >= this.maxAppAttempts) { + } else if (getNumNonPreemptedAppAttempts() >= this.maxAppAttempts) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@ -1102,6 +1105,17 @@ public class RMAppImpl implements RMApp, Recoverable { }; } + private int getNumNonPreemptedAppAttempts() { + int completedAttempts = 0; + // Do not count AM preemption as attempt failure. + for (RMAppAttempt attempt : attempts.values()) { + if (!attempt.isPreempted()) { + completedAttempts++; + } + } + return completedAttempts; + } + private static final class AttemptFailedTransition implements MultipleArcTransition { @@ -1113,8 +1127,9 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + if (!app.submissionContext.getUnmanagedAM() - && app.attempts.size() < app.maxAppAttempts) { + && app.getNumNonPreemptedAppAttempts() < app.maxAppAttempts) { boolean transferStateFromPreviousAttempt = false; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index d472ad412e5..42c37a93aba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -196,4 +196,9 @@ public interface RMAppAttempt extends EventHandler { */ ApplicationAttemptReport createApplicationAttemptReport(); + /** + * Return the flag which indicates whether the attempt is preempted by the + * scheduler. + */ + boolean isPreempted(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 5e71c930299..4ac64ef1257 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -48,11 +48,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.EventHandler; @@ -146,9 +146,14 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { // if an RMAppAttemptUnregistrationEvent occurs private FinalApplicationStatus finalStatus = null; private final StringBuilder diagnostics = new StringBuilder(); + private int amContainerExitStatus = ContainerExitStatus.INVALID; private Configuration conf; - private final boolean isLastAttempt; + // Since AM preemption is not counted towards AM failure count, + // even if this flag is true, a new attempt can still be re-created if this + // attempt is eventually preempted. So this flag indicates that this may be + // last attempt. + private final boolean maybeLastAttempt; private static final ExpiredTransition EXPIRED_TRANSITION = new ExpiredTransition(); @@ -389,7 +394,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { RMContext rmContext, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationSubmissionContext submissionContext, - Configuration conf, boolean isLastAttempt) { + Configuration conf, boolean maybeLastAttempt) { this.conf = conf; this.applicationAttemptId = appAttemptId; this.rmContext = rmContext; @@ -403,7 +408,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { this.writeLock = lock.writeLock(); this.proxiedTrackingUrl = generateProxyUriWithScheme(null); - this.isLastAttempt = isLastAttempt; + this.maybeLastAttempt = maybeLastAttempt; this.stateMachine = stateMachineFactory.make(this); } @@ -565,6 +570,15 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { } } + public int getAMContainerExitStatus() { + this.readLock.lock(); + try { + return this.amContainerExitStatus; + } finally { + this.readLock.unlock(); + } + } + @Override public float getProgress() { this.readLock.lock(); @@ -671,6 +685,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { + attemptState.getState()); diagnostics.append("Attempt recovered after RM restart"); diagnostics.append(attemptState.getDiagnostics()); + this.amContainerExitStatus = attemptState.getAMContainerExitStatus(); setMasterContainer(attemptState.getMasterContainer()); recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials()); this.recoveredFinalState = attemptState.getState(); @@ -931,7 +946,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { String diags = null; String finalTrackingUrl = null; FinalApplicationStatus finalStatus = null; - + int exitStatus = ContainerExitStatus.INVALID; switch (event.getType()) { case LAUNCH_FAILED: RMAppAttemptLaunchFailedEvent launchFaileEvent = @@ -952,6 +967,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { RMAppAttemptContainerFinishedEvent finishEvent = (RMAppAttemptContainerFinishedEvent) event; diags = getAMContainerCrashedDiagnostics(finishEvent); + exitStatus = finishEvent.getContainerStatus().getExitStatus(); break; case KILL: break; @@ -966,9 +982,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { ApplicationAttemptState attemptState = new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), rmStore.getCredentialsFromAppAttempt(this), startTime, - stateToBeStored, finalTrackingUrl, diags, finalStatus); + stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus); LOG.info("Updating application attempt " + applicationAttemptId - + " with final state: " + targetedFinalState); + + " with final state: " + targetedFinalState + ", and exit status: " + + exitStatus); rmStore.updateApplicationAttemptState(attemptState); } @@ -1061,11 +1078,19 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { // don't leave the tracking URL pointing to a non-existent AM appAttempt.setTrackingUrlToRMAppPage(); appAttempt.invalidateAMHostAndPort(); + if (appAttempt.submissionContext .getKeepContainersAcrossApplicationAttempts() - && !appAttempt.isLastAttempt && !appAttempt.submissionContext.getUnmanagedAM()) { - keepContainersAcrossAppAttempts = true; + // See if we should retain containers for non-unmanaged applications + if (appAttempt.isPreempted()) { + // Premption doesn't count towards app-failures and so we should + // retain containers. + keepContainersAcrossAppAttempts = true; + } else if (!appAttempt.maybeLastAttempt) { + // Not preemption. Not last-attempt too - keep containers. + keepContainersAcrossAppAttempts = true; + } } appEvent = new RMAppFailedAttemptEvent(applicationId, @@ -1105,7 +1130,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.getClientTokenMasterKey()); } } - + + @Override + public boolean isPreempted() { + return getAMContainerExitStatus() == ContainerExitStatus.PREEMPTED; + } + private static final class UnmanagedAMAttemptSavedTransition extends AMLaunchedTransition { @Override @@ -1208,14 +1238,22 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.rmContext.getAMLivelinessMonitor().unregister( appAttempt.getAppAttemptId()); - // Setup diagnostic message - appAttempt.diagnostics - .append(getAMContainerCrashedDiagnostics(finishEvent)); + // Setup diagnostic message and exit status + appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent); + // Tell the app, scheduler super.transition(appAttempt, finishEvent); } } + private void setAMContainerCrashedDiagnosticsAndExitStatus( + RMAppAttemptContainerFinishedEvent finishEvent) { + ContainerStatus status = finishEvent.getContainerStatus(); + String diagnostics = getAMContainerCrashedDiagnostics(finishEvent); + this.diagnostics.append(diagnostics); + this.amContainerExitStatus = status.getExitStatus(); + } + private static String getAMContainerCrashedDiagnostics( RMAppAttemptContainerFinishedEvent finishEvent) { ContainerStatus status = finishEvent.getContainerStatus(); @@ -1437,13 +1475,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptContainerFinishedEvent containerFinishedEvent = + RMAppAttemptContainerFinishedEvent finishEvent = (RMAppAttemptContainerFinishedEvent) event; // container associated with AM. must not be unmanaged assert appAttempt.submissionContext.getUnmanagedAM() == false; - // Setup diagnostic message - appAttempt.diagnostics - .append(getAMContainerCrashedDiagnostics(containerFinishedEvent)); + // Setup diagnostic message and exit status + appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent); new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt, event); } @@ -1644,4 +1681,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { } return attemptReport; } + + // for testing + public boolean mayBeLastAttempt() { + return maybeLastAttempt; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5de407ddf8b..74eb1964076 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1076,14 +1076,12 @@ public class CapacityScheduler extends @Override public void killContainer(RMContainer cont) { - if(LOG.isDebugEnabled()){ + if (LOG.isDebugEnabled()) { LOG.debug("KILL_CONTAINER: container" + cont.toString()); } - completedContainer(cont, - SchedulerUtils.createPreemptedContainerStatus( - cont.getContainerId(),"Container being forcibly preempted:" - + cont.getContainerId()), - RMContainerEventType.KILL); + completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( + cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 0a838fc0eeb..5078b2c8773 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -87,7 +87,7 @@ public class MockNM { return httpPort; } - void setResourceTrackerService(ResourceTrackerService resourceTracker) { + public void setResourceTrackerService(ResourceTrackerService resourceTracker) { this.resourceTracker = resourceTracker; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 45c2e5f28bc..6949a812b26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -166,6 +166,19 @@ public class MockRM extends ResourceManager { } } + public MockAM waitForNewAMToLaunchAndRegister(ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); + while (app.getAppAttempts().size() != attemptSize) { + System.out.println("Application " + appId + + " is waiting for AM to restart. Current has " + + app.getAppAttempts().size() + " attempts."); + Thread.sleep(200); + } + return launchAndRegisterAM(app, this, nm); + } + public void waitForState(MockNM nm, ContainerId containerId, RMContainerState containerState) throws Exception { RMContainer container = getResourceScheduler().getRMContainer(containerId); @@ -551,6 +564,7 @@ public class MockRM extends ResourceManager { throws Exception { rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); + System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index bcd8c1b5086..5fcb475d259 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -22,13 +22,12 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -38,22 +37,23 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.junit.Assert; import org.junit.Test; -/** - * Test to restart the AM on failure. - * - */ public class TestAMRestart { - @Test + @Test(timeout = 30000) public void testAMRestartWithExistingContainers() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); @@ -245,7 +245,7 @@ public class TestAMRestart { } } - @Test + @Test(timeout = 30000) public void testNMTokensRebindOnAMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); @@ -345,4 +345,122 @@ public class TestAMRestart { Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens)); rm1.stop(); } + + // AM container preempted should not be counted towards AM max retry count. + @Test(timeout = 20000) + public void testAMPreemptedNotCountedForAMFailures() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // explicitly set max-am-retry count as 1. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 1); + // Preempt the first attempt; + scheduler.killContainer(scheduler.getRMContainer(amContainer)); + + am1.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(attempt1.isPreempted()); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + // AM should be restarted even though max-am-attempt is 1. + MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + + // Preempt the second attempt. + ContainerId amContainer2 = + ContainerId.newInstance(am2.getApplicationAttemptId(), 1); + scheduler.killContainer(scheduler.getRMContainer(amContainer2)); + + am2.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(attempt2.isPreempted()); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt()); + + // fail the AM normally + nm1.nodeHeartbeat(am3.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am3.waitForState(RMAppAttemptState.FAILED); + Assert.assertFalse(attempt3.isPreempted()); + + // AM should not be restarted. + rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); + Assert.assertEquals(3, app1.getAppAttempts().size()); + rm1.stop(); + } + + // Test RM restarts after AM container is preempted, new RM should not count + // AM preemption failure towards the max-retry-account and should be able to + // re-launch the AM. + @Test(timeout = 20000) + public void testPreemptedAMRestartOnRMRestart() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 1. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 1); + + // Forcibly preempt the am container; + scheduler.killContainer(scheduler.getRMContainer(amContainer)); + + am1.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(attempt1.isPreempted()); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // state store has 1 attempt stored. + ApplicationState appState = + memStore.getState().getApplicationState().get(app1.getApplicationId()); + Assert.assertEquals(1, appState.getAttemptCount()); + // attempt stored has the preempted container exit status. + Assert.assertEquals(ContainerExitStatus.PREEMPTED, + appState.getAttempt(am1.getApplicationAttemptId()) + .getAMContainerExitStatus()); + // Restart rm. + MockRM rm2 = new MockRM(conf, memStore); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(); + rm2.start(); + + // Restarted RM should re-launch the am. + MockAM am2 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2); + RMAppAttempt attempt2 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()) + .getCurrentAppAttempt(); + Assert.assertFalse(attempt2.isPreempted()); + Assert.assertEquals(ContainerExitStatus.INVALID, + appState.getAttempt(am2.getApplicationAttemptId()) + .getAMContainerExitStatus()); + rm1.stop(); + rm2.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 507e164eca4..00f5f570631 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -267,6 +267,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ // attempt1 is loaded correctly assertNotNull(attemptState); assertEquals(attemptId1, attemptState.getAttemptId()); + assertEquals(-1000, attemptState.getAMContainerExitStatus()); // attempt1 container is loaded correctly assertEquals(containerId1, attemptState.getMasterContainer().getId()); // attempt1 applicationToken is loaded correctly @@ -308,7 +309,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED); + FinalApplicationStatus.SUCCEEDED, 100); store.updateApplicationAttemptState(newAttemptState); // test updating the state of an app/attempt whose initial state was not @@ -331,7 +332,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED); + FinalApplicationStatus.SUCCEEDED, 111); store.updateApplicationAttemptState(dummyAttempt); // let things settle down @@ -370,6 +371,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState()); assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl()); assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics()); + assertEquals(100, updatedAttemptState.getAMContainerExitStatus()); assertEquals(FinalApplicationStatus.SUCCEEDED, updatedAttemptState.getFinalApplicationStatus());