From 71a8953ae9688b1bad72f0766e4a18ed27b7c319 Mon Sep 17 00:00:00 2001
From: Vinod Kumar Vavilapalli
Date: Sat, 8 Mar 2014 04:44:02 +0000
Subject: [PATCH] YARN-1410. Added tests to validate that clients can fail-over
to a new RM after getting an application-ID but before submission and can
still submit to the newly active RM with no issues. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1575478 ../../trunk/
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1575479 13f79535-47bb-0310-9956-ffa450edef68
---
hadoop-yarn-project/CHANGES.txt | 4 +
.../GetNewApplicationResponse.java | 3 +
.../ApplicationIdNotProvidedException.java | 47 +++++
.../hadoop/yarn/client/api/YarnClient.java | 6 +
.../yarn/client/api/impl/YarnClientImpl.java | 9 +-
.../yarn/client/api/impl/TestYarnClient.java | 19 ++
.../yarn/server/resourcemanager/MockRM.java | 34 ++-
.../server/resourcemanager/RMHATestBase.java | 193 ++++++++++++++++++
.../TestKillApplicationWithRMHA.java | 159 +--------------
.../TestSubmitApplicationWithRMHA.java | 89 ++++++++
10 files changed, 398 insertions(+), 165 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationIdNotProvidedException.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a3d594c9e90..6e6c3e2186d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -260,6 +260,10 @@ Release 2.4.0 - UNRELEASED
utilization for local disks so as to be able to offline full disks. (Varun
Vasudev via vinodkv)
+ YARN-1410. Added tests to validate that clients can fail-over to a new RM
+ after getting an application-ID but before submission and can still submit to
+ the newly active RM with no issues. (Xuan Gong via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetNewApplicationResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetNewApplicationResponse.java
index 045e132caf7..21ba2b19eb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetNewApplicationResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetNewApplicationResponse.java
@@ -31,6 +31,9 @@ import org.apache.hadoop.yarn.util.Records;
* The response sent by the ResourceManager
to the client for
* a request to get a new {@link ApplicationId} for submitting applications.
*
+ * Clients can submit an application with the returned
+ * {@link ApplicationId}.
+ *
* @see ApplicationClientProtocol#getNewApplication(GetNewApplicationRequest)
*/
@Public
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationIdNotProvidedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationIdNotProvidedException.java
new file mode 100644
index 00000000000..c77821e7670
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationIdNotProvidedException.java
@@ -0,0 +1,47 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+/**
+ * Exception to be thrown when Client submit an application without
+ * providing {@link ApplicationId} in {@link ApplicationSubmissionContext}.
+ */
+@Public
+@Unstable
+public class ApplicationIdNotProvidedException extends YarnException{
+
+ private static final long serialVersionUID = 911754350L;
+
+ public ApplicationIdNotProvidedException(Throwable cause) {
+ super(cause);
+ }
+
+ public ApplicationIdNotProvidedException(String message) {
+ super(message);
+ }
+
+ public ApplicationIdNotProvidedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
index c6db3abb5de..01491304336 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -88,6 +89,11 @@ public abstract class YarnClient extends AbstractService {
* application has been submitted and accepted by the ResourceManager.
*
*
+ *
+ * Should provide an {@link ApplicationId} when submits a new application,
+ * otherwise, it will throw the {@link ApplicationIdNotProvidedException}
+ *
+ *
* @param appContext
* {@link ApplicationSubmissionContext} containing all the details
* needed to submit a new application
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index c36d0c4ec54..cb88fde4eb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.client.api.AHSClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -172,15 +173,21 @@ public class YarnClientImpl extends YarnClient {
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
- appContext.setApplicationId(applicationId);
+ if (applicationId == null) {
+ throw new ApplicationIdNotProvidedException(
+ "ApplicationId is not provided in ApplicationSubmissionContext");
+ }
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
+
+ //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
rmClient.submitApplication(request);
int pollCount = 0;
long startTime = System.currentTimeMillis();
+ //TODO: YARN-1764:Handle RM fail overs after the submitApplication call.
while (true) {
YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 7c3496656be..471baac20da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -110,6 +111,24 @@ public class TestYarnClient {
YarnApplicationState.FAILED,
YarnApplicationState.KILLED
};
+
+ // Submit an application without ApplicationId provided
+ // Should get ApplicationIdNotProvidedException
+ ApplicationSubmissionContext contextWithoutApplicationId =
+ mock(ApplicationSubmissionContext.class);
+ try {
+ client.submitApplication(contextWithoutApplicationId);
+ Assert.fail("Should throw the ApplicationIdNotProvidedException");
+ } catch (YarnException e) {
+ Assert.assertTrue(e instanceof ApplicationIdNotProvidedException);
+ Assert.assertTrue(e.getMessage().contains(
+ "ApplicationId is not provided in ApplicationSubmissionContext"));
+ } catch (IOException e) {
+ Assert.fail("IOException is not expected.");
+ }
+
+ // Submit the application with applicationId provided
+ // Should be successful
for (int i = 0; i < exitStates.length; ++i) {
ApplicationSubmissionContext context =
mock(ApplicationSubmissionContext.class);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 2efdfe1d248..e37f482e771 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -40,6 +42,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -221,13 +224,24 @@ public class MockRM extends ResourceManager {
public RMApp submitApp(int masterMemory, String name, String user,
Map acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
- boolean waitForAccepted, boolean keepContainers)
- throws Exception {
- ApplicationClientProtocol client = getClientRMService();
- GetNewApplicationResponse resp = client.getNewApplication(Records
- .newRecord(GetNewApplicationRequest.class));
- ApplicationId appId = resp.getApplicationId();
+ boolean waitForAccepted, boolean keepContainers) throws Exception {
+ return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
+ false, null);
+ }
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+ ApplicationId applicationId) throws Exception {
+ ApplicationId appId = isAppIdProvided ? applicationId : null;
+ ApplicationClientProtocol client = getClientRMService();
+ if (! isAppIdProvided) {
+ GetNewApplicationResponse resp = client.getNewApplication(Records
+ .newRecord(GetNewApplicationRequest.class));
+ appId = resp.getApplicationId();
+ }
SubmitApplicationRequest req = Records
.newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext sub = Records
@@ -502,4 +516,12 @@ public class MockRM extends ResourceManager {
return am;
}
+ public ApplicationReport getApplicationReport(ApplicationId appId)
+ throws YarnException, IOException {
+ ApplicationClientProtocol client = getClientRMService();
+ GetApplicationReportResponse response =
+ client.getApplicationReport(GetApplicationReportRequest
+ .newInstance(appId));
+ return response.getApplicationReport();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
new file mode 100644
index 00000000000..6cbea253774
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+
+public class RMHATestBase extends ClientBaseWithFixes{
+
+ private static final int ZK_TIMEOUT_MS = 5000;
+ private static StateChangeRequestInfo requestInfo =
+ new StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+ protected Configuration configuration = new YarnConfiguration();
+ static MockRM rm1 = null;
+ static MockRM rm2 = null;
+ Configuration confForRM1;
+ Configuration confForRM2;
+
+ @Before
+ public void setup() throws Exception {
+ configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
+ configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ configuration.set(YarnConfiguration.RM_STORE,
+ ZKRMStateStore.class.getName());
+ configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
+ configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+ configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
+ int base = 100;
+ for (String confKey : YarnConfiguration
+ .getServiceAddressConfKeys(configuration)) {
+ configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
+ + (base + 20));
+ configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
+ + (base + 40));
+ base = base * 2;
+ }
+ confForRM1 = new Configuration(configuration);
+ confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1");
+ confForRM2 = new Configuration(configuration);
+ confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2");
+ }
+
+ @After
+ public void teardown() {
+ if (rm1 != null) {
+ rm1.stop();
+ }
+ if (rm2 != null) {
+ rm2.stop();
+ }
+ }
+
+ protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+ throws Exception {
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ nm.nodeHeartbeat(true);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+ rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+ rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptState.RUNNING);
+ return am;
+ }
+
+ protected void startRMs() throws IOException {
+ rm1 = new MockRM(confForRM1);
+ rm2 = new MockRM(confForRM2);
+ startRMs(rm1, confForRM1, rm2, confForRM2);
+
+ }
+
+ protected void startRMsWithCustomizedRMAppManager() throws IOException {
+ final Configuration conf1 = new Configuration(confForRM1);
+
+ rm1 = new MockRM(conf1) {
+ @Override
+ protected RMAppManager createRMAppManager() {
+ return new MyRMAppManager(this.rmContext, this.scheduler,
+ this.masterService, this.applicationACLsManager, conf1);
+ }
+ };
+
+ rm2 = new MockRM(confForRM2);
+
+ startRMs(rm1, conf1, rm2, confForRM2);
+ }
+
+ private static class MyRMAppManager extends RMAppManager {
+
+ private Configuration conf;
+ private RMContext rmContext;
+
+ public MyRMAppManager(RMContext context, YarnScheduler scheduler,
+ ApplicationMasterService masterService,
+ ApplicationACLsManager applicationACLsManager, Configuration conf) {
+ super(context, scheduler, masterService, applicationACLsManager, conf);
+ this.conf = conf;
+ this.rmContext = context;
+ }
+
+ @Override
+ protected void submitApplication(
+ ApplicationSubmissionContext submissionContext, long submitTime,
+ String user, boolean isRecovered, RMState state) throws YarnException {
+ //Do nothing, just add the application to RMContext
+ RMAppImpl application =
+ new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
+ this.conf, submissionContext.getApplicationName(), user,
+ submissionContext.getQueue(), submissionContext,
+ this.rmContext.getScheduler(),
+ this.rmContext.getApplicationMasterService(),
+ submitTime, submissionContext.getApplicationType(),
+ submissionContext.getApplicationTags());
+ this.rmContext.getRMApps().put(submissionContext.getApplicationId(),
+ application);
+ //Do not send RMAppEventType.START event
+ //so the state of Application will not reach to NEW_SAVING state.
+ }
+ }
+
+ protected boolean isFinalState(RMAppState state) {
+ return state.equals(RMAppState.FINISHING)
+ || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
+ || state.equals(RMAppState.KILLED);
+ }
+
+ protected void explicitFailover() throws IOException {
+ rm1.adminService.transitionToStandby(requestInfo);
+ rm2.adminService.transitionToActive(requestInfo);
+ Assert.assertTrue(rm1.getRMContext().getHAServiceState()
+ == HAServiceState.STANDBY);
+ Assert.assertTrue(rm2.getRMContext().getHAServiceState()
+ == HAServiceState.ACTIVE);
+ }
+
+ protected void startRMs(MockRM rm1, Configuration confForRM1, MockRM rm2,
+ Configuration confForRM2) throws IOException {
+ rm1.init(confForRM1);
+ rm1.start();
+ Assert.assertTrue(rm1.getRMContext().getHAServiceState()
+ == HAServiceState.STANDBY);
+
+ rm2.init(confForRM2);
+ rm2.start();
+ Assert.assertTrue(rm2.getRMContext().getHAServiceState()
+ == HAServiceState.STANDBY);
+
+ rm1.adminService.transitionToActive(requestInfo);
+ Assert.assertTrue(rm1.getRMContext().getHAServiceState()
+ == HAServiceState.ACTIVE);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java
index 918de801581..c0a623d4be4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java
@@ -24,86 +24,29 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ClientBaseWithFixes;
-import org.apache.hadoop.ha.HAServiceProtocol;
-import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
-import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
-public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
+public class TestKillApplicationWithRMHA extends RMHATestBase{
public static final Log LOG = LogFactory
.getLog(TestKillApplicationWithRMHA.class);
- private static final int ZK_TIMEOUT_MS = 5000;
- private static StateChangeRequestInfo requestInfo =
- new StateChangeRequestInfo(
- HAServiceProtocol.RequestSource.REQUEST_BY_USER);
- private Configuration configuration = new YarnConfiguration();
- static MockRM rm1 = null;
- static MockRM rm2 = null;
- Configuration confForRM1;
- Configuration confForRM2;
-
- @Before
- public void setup() throws Exception {
- configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
- configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
- configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
- configuration.set(YarnConfiguration.RM_STORE,
- ZKRMStateStore.class.getName());
- configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
- configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
- configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
- configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
- int base = 100;
- for (String confKey : YarnConfiguration
- .getServiceAddressConfKeys(configuration)) {
- configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
- + (base + 20));
- configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
- + (base + 40));
- base = base * 2;
- }
- confForRM1 = new Configuration(configuration);
- confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1");
- confForRM2 = new Configuration(configuration);
- confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2");
- }
-
- @After
- public void teardown() {
- if (rm1 != null) {
- rm1.stop();
- }
- if (rm2 != null) {
- rm2.stop();
- }
- }
@Test (timeout = 20000)
public void testKillAppWhenFailoverHappensAtNewState()
@@ -221,18 +164,6 @@ public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
}
- private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
- throws Exception {
- RMAppAttempt attempt = app.getCurrentAppAttempt();
- nm.nodeHeartbeat(true);
- MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
- am.registerAppAttempt();
- rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
- rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
- RMAppAttemptState.RUNNING);
- return am;
- }
-
private void failOverAndKillApp(ApplicationId appId,
ApplicationAttemptId appAttemptId, RMAppState initialRMAppState,
RMAppAttemptState initialRMAppAttemptState,
@@ -256,29 +187,6 @@ public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
killApplication(rm2, appId, null, initialRMAppState);
}
- private void startRMs() throws IOException {
- rm1 = new MockRM(confForRM1);
- rm2 = new MockRM(confForRM2);
- startRMs(rm1, confForRM1, rm2, confForRM2);
-
- }
-
- private void startRMsWithCustomizedRMAppManager() throws IOException {
- final Configuration conf1 = new Configuration(confForRM1);
-
- rm1 = new MockRM(conf1) {
- @Override
- protected RMAppManager createRMAppManager() {
- return new MyRMAppManager(this.rmContext, this.scheduler,
- this.masterService, this.applicationACLsManager, conf1);
- }
- };
-
- rm2 = new MockRM(confForRM2);
-
- startRMs(rm1, conf1, rm2, confForRM2);
- }
-
private void startRMsWithCustomizedClientRMService() throws IOException {
final Configuration conf1 = new Configuration(confForRM1);
@@ -296,39 +204,6 @@ public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
startRMs(rm1, conf1, rm2, confForRM2);
}
- private static class MyRMAppManager extends RMAppManager {
-
- private Configuration conf;
- private RMContext rmContext;
-
- public MyRMAppManager(RMContext context, YarnScheduler scheduler,
- ApplicationMasterService masterService,
- ApplicationACLsManager applicationACLsManager, Configuration conf) {
- super(context, scheduler, masterService, applicationACLsManager, conf);
- this.conf = conf;
- this.rmContext = context;
- }
-
- @Override
- protected void submitApplication(
- ApplicationSubmissionContext submissionContext, long submitTime,
- String user, boolean isRecovered, RMState state) throws YarnException {
- //Do nothing, just add the application to RMContext
- RMAppImpl application =
- new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
- this.conf, submissionContext.getApplicationName(), user,
- submissionContext.getQueue(), submissionContext,
- this.rmContext.getScheduler(),
- this.rmContext.getApplicationMasterService(),
- submitTime, submissionContext.getApplicationType(),
- submissionContext.getApplicationTags());
- this.rmContext.getRMApps().put(submissionContext.getApplicationId(),
- application);
- //Do not send RMAppEventType.START event
- //so the state of Application will not reach to NEW_SAVING state.
- }
- }
-
private static class MyClientRMService extends ClientRMService {
private RMContext rmContext;
@@ -366,21 +241,6 @@ public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
}
}
- private boolean isFinalState(RMAppState state) {
- return state.equals(RMAppState.FINISHING)
- || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
- || state.equals(RMAppState.KILLED);
- }
-
- private void explicitFailover() throws IOException {
- rm1.adminService.transitionToStandby(requestInfo);
- rm2.adminService.transitionToActive(requestInfo);
- Assert.assertTrue(rm1.getRMContext().getHAServiceState()
- == HAServiceState.STANDBY);
- Assert.assertTrue(rm2.getRMContext().getHAServiceState()
- == HAServiceState.ACTIVE);
- }
-
private void killApplication(MockRM rm, ApplicationId appId,
ApplicationAttemptId appAttemptId, RMAppState rmAppState)
throws Exception {
@@ -396,21 +256,4 @@ public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
// no new attempt is created.
Assert.assertEquals(1, loadedApp0.getAppAttempts().size());
}
-
- private void startRMs(MockRM rm1, Configuration confForRM1, MockRM rm2,
- Configuration confForRM2) throws IOException {
- rm1.init(confForRM1);
- rm1.start();
- Assert.assertTrue(rm1.getRMContext().getHAServiceState()
- == HAServiceState.STANDBY);
-
- rm2.init(confForRM2);
- rm2.start();
- Assert.assertTrue(rm2.getRMContext().getHAServiceState()
- == HAServiceState.STANDBY);
-
- rm1.adminService.transitionToActive(requestInfo);
- Assert.assertTrue(rm1.getRMContext().getHAServiceState()
- == HAServiceState.ACTIVE);
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
new file mode 100644
index 00000000000..d02d43b4083
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.junit.Test;
+
+
+public class TestSubmitApplicationWithRMHA extends RMHATestBase{
+
+ public static final Log LOG = LogFactory
+ .getLog(TestSubmitApplicationWithRMHA.class);
+
+ @Test
+ public void
+ testHandleRMHABeforeSubmitApplicationCallWithSavedApplicationState()
+ throws Exception {
+ // start two RMs, and transit rm1 to active, rm2 to standby
+ startRMs();
+
+ // get a new applicationId from rm1
+ ApplicationId appId = rm1.getNewAppId().getApplicationId();
+
+ // Do the failover
+ explicitFailover();
+
+ // submit the application with previous assigned applicationId
+ // to current active rm: rm2
+ RMApp app1 =
+ rm2.submitApp(200, "", UserGroupInformation
+ .getCurrentUser().getShortUserName(), null, false, null,
+ configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ false, false, true, appId);
+
+ // verify application submission
+ verifySubmitApp(rm2, app1, appId);
+ }
+
+ private void verifySubmitApp(MockRM rm, RMApp app,
+ ApplicationId expectedAppId) throws Exception {
+ int maxWaittingTimes = 20;
+ int count = 0;
+ while (true) {
+ YarnApplicationState state =
+ rm.getApplicationReport(app.getApplicationId())
+ .getYarnApplicationState();
+ if (!state.equals(YarnApplicationState.NEW) &&
+ !state.equals(YarnApplicationState.NEW_SAVING)) {
+ break;
+ }
+ if (count > maxWaittingTimes) {
+ break;
+ }
+ Thread.sleep(200);
+ count++;
+ }
+ // Verify submittion is successful
+ Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
+ .getYarnApplicationState() == YarnApplicationState.NEW);
+ Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
+ .getYarnApplicationState() == YarnApplicationState.NEW_SAVING);
+ Assert.assertEquals(expectedAppId, app.getApplicationId());
+ }
+}