From 4aeef4181dee0719aeb458870985631894922007 Mon Sep 17 00:00:00 2001
From: Vinod Kumar Vavilapalli
Date: Tue, 17 Dec 2013 02:16:59 +0000
Subject: [PATCH] YARN-1446. Changed client API to retry killing application
till RM acknowledges so as to account for RM crashes/failover. Contributed by
Jian He. svn merge --ignore-ancestry -c 1551444 ../../trunk/
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1551445 13f79535-47bb-0310-9956-ffa450edef68
---
hadoop-yarn-project/CHANGES.txt | 3 +
.../KillApplicationResponse.java | 36 +++++++--
.../hadoop/yarn/conf/YarnConfiguration.java | 13 +++-
.../src/main/proto/yarn_service_protos.proto | 1 +
.../yarn/client/api/impl/YarnClientImpl.java | 40 ++++++++--
.../yarn/client/api/impl/TestYarnClient.java | 29 +++++++
.../pb/KillApplicationResponsePBImpl.java | 21 +++++
.../src/main/resources/yarn-default.xml | 10 +--
.../ApplicationMasterService.java | 12 +--
.../resourcemanager/ClientRMService.java | 17 ++--
.../server/resourcemanager/rmapp/RMApp.java | 10 +--
.../resourcemanager/rmapp/RMAppEventType.java | 1 -
.../resourcemanager/rmapp/RMAppImpl.java | 67 ++++++++++------
.../resourcemanager/rmapp/RMAppState.java | 1 +
.../rmapp/attempt/RMAppAttemptImpl.java | 2 +
.../yarn/server/resourcemanager/MockRM.java | 9 +--
.../server/resourcemanager/TestRMRestart.java | 77 +++++++++++++++++--
.../applicationsmanager/MockAsm.java | 2 +-
.../resourcemanager/rmapp/MockRMApp.java | 2 +-
.../rmapp/TestRMAppTransitions.java | 31 ++++----
20 files changed, 290 insertions(+), 94 deletions(-)
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cfabc4fe523..606af1e7d0e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -160,6 +160,9 @@ Release 2.4.0 - UNRELEASED
YARN-1435. Modified Distributed Shell to accept either the command or the
custom script. (Xuan Gong via zjshen)
+ YARN-1446. Changed client API to retry killing application till RM
+ acknowledges so as to account for RM crashes/failover. (Jian He via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
index 71aa28b0ae0..77bb71d6796 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
@@ -26,10 +26,21 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.util.Records;
/**
- * The response sent by the ResourceManager
to the client
- * aborting a submitted application.
- *
- * Currently it's empty.
+ *
+ * The response sent by the ResourceManager
to the client aborting
+ * a submitted application.
+ *
+ *
+ * The response, includes:
+ *
+ * - A flag which indicates that the process of killing the application is
+ * completed or not.
+ *
+ * Note: user is recommended to wait until this flag becomes true, otherwise if
+ * the ResourceManager
crashes before the process of killing the
+ * application is completed, the ResourceManager
may retry this
+ * application on recovery.
+ *
*
* @see ApplicationClientProtocol#forceKillApplication(KillApplicationRequest)
*/
@@ -38,9 +49,24 @@ import org.apache.hadoop.yarn.util.Records;
public abstract class KillApplicationResponse {
@Private
@Unstable
- public static KillApplicationResponse newInstance() {
+ public static KillApplicationResponse newInstance(boolean isKillCompleted) {
KillApplicationResponse response =
Records.newRecord(KillApplicationResponse.class);
+ response.setIsKillCompleted(isKillCompleted);
return response;
}
+
+ /**
+ * Get the flag which indicates that the process of killing application is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsKillCompleted();
+
+ /**
+ * Set the flag which indicates that the process of killing application is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsKillCompleted(boolean isKillCompleted);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 12b5e2f5e77..de420b05e35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
@@ -882,14 +881,22 @@ public class YarnConfiguration extends Configuration {
////////////////////////////////
/**
+ * Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead.
* The interval of the yarn client's querying application state after
* application submission. The unit is millisecond.
*/
+ @Deprecated
public static final String YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
YARN_PREFIX + "client.app-submission.poll-interval";
- public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
- 1000;
+ /**
+ * The interval that the yarn client library uses to poll the completion
+ * status of the asynchronous API of application client protocol.
+ */
+ public static final String YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS =
+ YARN_PREFIX + "client.application-client-protocol.poll-interval-ms";
+ public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS =
+ 200;
/**
* Max number of threads in NMClientAsync to process container management
* events
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 332be813627..a4631d11b6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -116,6 +116,7 @@ message KillApplicationRequestProto {
}
message KillApplicationResponseProto {
+ optional bool is_kill_completed = 1 [default = false];
}
message GetClusterMetricsRequestProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index d35e1a4300d..7c446045fe5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -79,7 +80,8 @@ public class YarnClientImpl extends YarnClient {
protected ApplicationClientProtocol rmClient;
protected InetSocketAddress rmAddress;
- protected long statePollIntervalMillis;
+ protected long submitPollIntervalMillis;
+ private long asyncApiPollIntervalMillis;
private static final String ROOT = "root";
@@ -92,12 +94,20 @@ public class YarnClientImpl extends YarnClient {
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
}
+ @SuppressWarnings("deprecation")
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.rmAddress = getRmAddress(conf);
- statePollIntervalMillis = conf.getLong(
+ asyncApiPollIntervalMillis =
+ conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
+ submitPollIntervalMillis = asyncApiPollIntervalMillis;
+ if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS)
+ != null) {
+ submitPollIntervalMillis = conf.getLong(
YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
- YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
+ YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
+ }
super.serviceInit(conf);
}
@@ -165,7 +175,7 @@ public class YarnClientImpl extends YarnClient {
" is still in " + state);
}
try {
- Thread.sleep(statePollIntervalMillis);
+ Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
@@ -179,11 +189,29 @@ public class YarnClientImpl extends YarnClient {
@Override
public void killApplication(ApplicationId applicationId)
throws YarnException, IOException {
- LOG.info("Killing application " + applicationId);
KillApplicationRequest request =
Records.newRecord(KillApplicationRequest.class);
request.setApplicationId(applicationId);
- rmClient.forceKillApplication(request);
+
+ try {
+ int pollCount = 0;
+ while (true) {
+ KillApplicationResponse response =
+ rmClient.forceKillApplication(request);
+ if (response.getIsKillCompleted()) {
+ break;
+ }
+ if (++pollCount % 10 == 0) {
+ LOG.info("Watiting for application " + applicationId
+ + " to be killed.");
+ }
+ Thread.sleep(asyncApiPollIntervalMillis);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for application " + applicationId
+ + " to be killed.");
+ }
+ LOG.info("Killed application " + applicationId);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 826433d5048..966995c99ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -89,6 +91,7 @@ public class TestYarnClient {
rm.stop();
}
+ @SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSubmitApplication() {
Configuration conf = new Configuration();
@@ -128,6 +131,23 @@ public class TestYarnClient {
client.stop();
}
+ @Test
+ public void testKillApplication() throws Exception {
+ MockRM rm = new MockRM();
+ rm.start();
+ RMApp app = rm.submitApp(2000);
+
+ Configuration conf = new Configuration();
+ @SuppressWarnings("resource")
+ final YarnClient client = new MockYarnClient();
+ client.init(conf);
+ client.start();
+
+ client.killApplication(app.getApplicationId());
+ verify(((MockYarnClient) client).getRMClient(), times(2))
+ .forceKillApplication(any(KillApplicationRequest.class));
+ }
+
@Test(timeout = 30000)
public void testApplicationType() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
@@ -234,6 +254,11 @@ public class TestYarnClient {
GetApplicationReportRequest.class))).thenReturn(mockResponse);
when(rmClient.getApplications(any(GetApplicationsRequest.class)))
.thenReturn(mockAppResponse);
+ // return false for 1st kill request, and true for the 2nd.
+ when(rmClient.forceKillApplication(any(
+ KillApplicationRequest.class)))
+ .thenReturn(KillApplicationResponse.newInstance(false)).thenReturn(
+ KillApplicationResponse.newInstance(true));
} catch (YarnException e) {
Assert.fail("Exception is not expected.");
} catch (IOException e) {
@@ -242,6 +267,10 @@ public class TestYarnClient {
when(mockResponse.getApplicationReport()).thenReturn(mockReport);
}
+ public ApplicationClientProtocol getRMClient() {
+ return rmClient;
+ }
+
@Override
public List getApplications(
Set applicationTypes, EnumSet applicationStates)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
index 14e0c1f74af..1c937de2d39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@@ -67,4 +68,24 @@ public class KillApplicationResponsePBImpl extends KillApplicationResponse {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = KillApplicationResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public boolean getIsKillCompleted() {
+ KillApplicationResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsKillCompleted();
+ }
+
+ @Override
+ public void setIsKillCompleted(boolean isKillCompleted) {
+ maybeInitBuilder();
+ builder.setIsKillCompleted(isKillCompleted);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c43dc1a4446..9673826c2ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -945,10 +945,10 @@
- The interval of the yarn client's querying application state
- after application submission. The unit is millisecond.
- yarn.client.app-submission.poll-interval
- 1000
+ The interval that the yarn client library uses to poll the
+ completion status of the asynchronous API of application client protocol.
+
+ yarn.client.application-client-protocol.poll-interval-ms
+ 200
-
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index f070f28f86f..787ed9fa656 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -292,15 +292,15 @@ public class ApplicationMasterService extends AbstractService implements
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
- rmContext.getDispatcher().getEventHandler().handle(
+ if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
+ .isAppSafeToTerminate()) {
+ return FinishApplicationMasterResponse.newInstance(true);
+ } else {
+ // keep sending the unregister event as RM may crash in the meanwhile.
+ rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
-
- if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
- .isAppSafeToUnregister()) {
- return FinishApplicationMasterResponse.newInstance(true);
- } else {
return FinishApplicationMasterResponse.newInstance(false);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index f0e85534900..cd2226fc3fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -380,14 +380,15 @@ public class ClientRMService extends AbstractService implements
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.KILL));
-
- RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
- AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId);
- KillApplicationResponse response = recordFactory
- .newRecordInstance(KillApplicationResponse.class);
- return response;
+ if (application.isAppSafeToTerminate()) {
+ RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
+ AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
+ return KillApplicationResponse.newInstance(true);
+ } else {
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId, RMAppEventType.KILL));
+ return KillApplicationResponse.newInstance(false);
+ }
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fadaa3b00e4..1809a4bb470 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -197,13 +197,13 @@ public interface RMApp extends EventHandler {
String getApplicationType();
/**
- * Check whether this application is safe to unregister.
- * An application is deemed to be safe to unregister if it is an unmanaged
- * AM or its state has been removed from state store.
+ * Check whether this application is safe to terminate.
+ * An application is deemed to be safe to terminate if it is an unmanaged
+ * AM or its state has been saved in state store.
* @return the flag which indicates whether this application is safe to
- * unregister.
+ * terminate.
*/
- boolean isAppSafeToUnregister();
+ boolean isAppSafeToTerminate();
/**
* Create the external user-facing state of ApplicationMaster from the
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index a2fa0e24eb0..ad3f20d23d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -37,5 +37,4 @@ public enum RMAppEventType {
// Source: RMStateStore
APP_NEW_SAVED,
APP_UPDATE_SAVED,
- APP_REMOVED
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 5a70cc21165..0bf7c817454 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -110,10 +110,14 @@ public class RMAppImpl implements RMApp, Recoverable {
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
private static final AppFinishedTransition FINISHED_TRANSITION =
new AppFinishedTransition();
+
+ // These states stored are only valid when app is at killing or final_saving.
+ private RMAppState stateBeforeKilling;
private RMAppState stateBeforeFinalSaving;
private RMAppEvent eventCausingFinalSaving;
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;
+
Object transitionTodo;
private static final StateMachineFactory= 1);
+
+ rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
+ rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+ Assert.assertEquals(1, ((TestMemoryRMStateStore) memStore).updateAttempt);
+ Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
+ }
+
+ public class TestMemoryRMStateStore extends MemoryRMStateStore {
+ int count = 0;
+ public int updateApp = 0;
+ public int updateAttempt = 0;
+
+ @Override
+ public void updateApplicationStateInternal(String appId,
+ ApplicationStateDataPBImpl appStateData) throws Exception {
+ updateApp = ++count;
+ super.updateApplicationStateInternal(appId, appStateData);
+ }
+
+ @Override
+ public synchronized void
+ updateApplicationAttemptStateInternal(String attemptIdStr,
+ ApplicationAttemptStateDataPBImpl attemptStateData)
+ throws Exception {
+ updateAttempt = ++count;
+ super.updateApplicationAttemptStateInternal(attemptIdStr,
+ attemptStateData);
+ }
+ }
+
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index b90c711c3db..aa116bf85b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -145,7 +145,7 @@ public abstract class MockAsm extends MockApps {
}
@Override
- public boolean isAppSafeToUnregister() {
+ public boolean isAppSafeToTerminate() {
throw new UnsupportedOperationException("Not supported yet.");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index bcb2f6f111b..debcffe97dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -218,7 +218,7 @@ public class MockRMApp implements RMApp {
}
@Override
- public boolean isAppSafeToUnregister() {
+ public boolean isAppSafeToTerminate() {
return true;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 5b687236a6f..ba255d339ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -301,12 +301,9 @@ public class TestRMAppTransitions {
private void assertAppAndAttemptKilled(RMApp application)
throws InterruptedException {
+ sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertKilled(application);
- // send attempt final state saved event.
- application.getCurrentAppAttempt().handle(
- new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
- .getAppAttemptId(), null));
Assert.assertEquals(RMAppAttemptState.KILLED, application
.getCurrentAppAttempt().getAppAttemptState());
assertAppFinalStateSaved(application);
@@ -329,6 +326,12 @@ public class TestRMAppTransitions {
rmDispatcher.await();
}
+ private void sendAttemptUpdateSavedEvent(RMApp application) {
+ application.getCurrentAppAttempt().handle(
+ new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
+ .getAppAttemptId(), null));
+ }
+
protected RMApp testCreateAppNewSaving(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext);
@@ -624,11 +627,12 @@ public class TestRMAppTransitions {
rmDispatcher.await();
// Ignore Attempt_Finished if we were supposed to go to Finished.
- assertAppState(RMAppState.FINAL_SAVING, application);
+ assertAppState(RMAppState.KILLING, application);
RMAppEvent finishEvent =
new RMAppFinishedAttemptEvent(application.getApplicationId(), null);
application.handle(finishEvent);
- assertAppState(RMAppState.FINAL_SAVING, application);
+ assertAppState(RMAppState.KILLING, application);
+ sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertKilled(application);
}
@@ -686,8 +690,8 @@ public class TestRMAppTransitions {
}
@Test
- public void testAppFinishingKill() throws IOException {
- LOG.info("--- START: testAppFinishedFinished ---");
+ public void testAppAtFinishingIgnoreKill() throws IOException {
+ LOG.info("--- START: testAppAtFinishingIgnoreKill ---");
RMApp application = testCreateAppFinishing(null);
// FINISHING => FINISHED event RMAppEventType.KILL
@@ -695,7 +699,7 @@ public class TestRMAppTransitions {
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
- assertAppState(RMAppState.FINISHED, application);
+ assertAppState(RMAppState.FINISHING, application);
}
// While App is at FINAL_SAVING, Attempt_Finished event may come before
@@ -780,6 +784,7 @@ public class TestRMAppTransitions {
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
+ sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
@@ -801,14 +806,6 @@ public class TestRMAppTransitions {
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
- // KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED
- event =
- new RMAppEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_KILLED);
- application.handle(event);
- rmDispatcher.await();
- assertTimesAtFinish(application);
- assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.KILL
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);